SPARK-2638 MapOutputTracker concurrency improvement#1542
SPARK-2638 MapOutputTracker concurrency improvement#1542asfgit merged 0 commit intoapache:masterfrom javadba:master
Conversation
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
This is not a correct fix:
- Should always use the same lock to protect the access of
fetchingto set up the happen-before relation. fetching.wait()should always be used under the protection of thefetchinglock. See javadocs: http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#wait()
There was a problem hiding this comment.
In addition, using the monitor of an object from outside to synchronized makes it hard to analyse the codes because the monitor of this object may be used (or used in the future) in other place and the developer may not notice somebody has already used it here.
There was a problem hiding this comment.
Yes I see your point and agree there is a problem in the proposed fix.. My fix is incorrect in that we need to match the outer shuffleId.toString.intern.synchronized to the inner shuffleId.toString.intern.wait().
Then the code should look like this:
val monitor = shuffleId.toString.intern
monitor.synchronized {
..
monitor.wait()
}
Do you agree?
|
Hi, thanks for taking the time to review this PR again. A couple of things maybe I should have pointed out: a. The strategy here is not to use the container collection "fetching" to enforce the proper concurrency. Instead the monitor is on the contained object shuffleId.
This has been addressed in my latest fix. There is no place that the intern'ed monitor is not used within the synchronized block.
Thanks again for your help in reviewing this proposed improvement. |
|
Hi again. Upon closer inspection of the existing code/ functionality we do have an opportunity here to: Here is an initial version of the rewrite of getServerStatuses. Notice it has a single synchronized and reduces the SLOC from 53 to 32. I am going to write a thorough unit test to demonstrate (a) correctness and (b) improved concurrency of this new version versus the existing def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { btw the sync on statuses was moved to the private method : reduces code size and also avoids risk of someone not remembering to include synchronization: private def convertMapStatuses( |
|
As I commented on the JIRA, unless I've missed something I'm not sure that the original code exhibits significant concurrency issues: no time-consuming work is performed in the synchronized blocks and the actual MapOuputTracker requests proceed in parallel without holding any locks. The only potential issue that I see is many concurrent duplicate requests causing a big queue of waiters on the lock that results in many spurious wakeups as statuses are fetched; this seems unlikely to happen in practice, though. Do you have an actual use-case that demonstrates that the current synchronization strategy is a performance issue? |
|
@javadba, sorry that maybe my previous comment is not clear. I'm opposed to use def foo(bar: Int) {
bar.toString.intern.synchronized {
...
}
}This will definitely be hard to debug and track. What's more, I agree with @JoshRosen and also want to see an real test on these codes to demonstrate the improvement of your modification. |
|
I think asfgit got confused somehow; the linked commit looks unrelated. |
|
That was super scarey ! Thanks for clarifying @aarondav |
|
I dunno, merging a PR with no changed files doesn't sound too scary to me. Something is definitely messed up in this PR, with both |
|
It looks like this pull request was opened from @javadba's master branch. My hunch is that he force-pushed or otherwise reset that branch to bring it in sync with the ASF master repository, and when the git mirroring script caught up it noticed that a commit in this PR had been merged into the ASF master but was missing from GitHub, so it closed this PR. ("... and that's why you always create a feature branch"). |
|
HI Folks, AFA this particular PR. I did work through the original (prior to my PR) logic with a coworker. yes it is actually correct. It is also complicated. Now I am not attempting at this point to resurrect my PR. But I will still maintain that it is better code. a) Shorter, more concise and yes better performing - though as Josh points out correctly there is actually no long running code that is being locked in the original code : so the benefits in this case are insubstantial. b) AFA the - correct - protest about the use of interning numbers - which could be re-used in other parts of the code. I agree. But a small fix takes care of it. Simply prepend the interned ShuffleId with the className:objectName and now it is unique. e.g. s"o.a.s.MapOutputTracker.shuffleID$shuffleId". That would then not collide with potentially other usages of this tactic. I will actually look at the half dozen other cases that use the present Set checking tactic at some point. But we are all a bit tired of this topic right now - and 1.1.0 is in any case "a wrap". So this particular PR can safely remain closed. |
|
Since this same locking pattern occurs at several places in the code, I think it might make sense to abstract it behind a function or macro, which would give us a centralized place to experiment with different synchronization / locking strategies. |
|
Thanks for commenting Josh. I will see about putting together something on this including solid testcases. ETA later in the coming week. |
Spark-2638 Improve concurrency of fetching Map outputs
This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing "fetching" collection - which makes ALL fetches wait if any fetch were occurring.
The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility).
For further details please refer to the JIRA.