Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2387: remove stage barrier #1328

Closed
wants to merge 91 commits into from

Conversation

lirui-apache
Copy link

This PR is a PoC implementation of SPARK-2387.

When a ShuffleMapTask finishes, DAGScheduler will check resource usage. And if there’re free slots, DAGScheduler chooses a stage from the waiting list whose parent stages have all started, and pre-starts this waiting stage. All the in-progress parent stages will then register the map outputs progressively with MapOutputTrackerMaster. A flag is added to MapOutputTracker to indicate whether the map statuses for a shuffle is partial or not, so that we can distinguish partial registration from failed shuffle map stage.
When the downstream task tries to fetch shuffle blocks, it will get an array of map outputs that has “holes” (unfinished map tasks) in it. We created PartialBlockFetcherIterator to handle this map output array. PartialBlockFetcherIterator keeps an array of conventional iterators (BasicBlockFetcherIterator or NettyBlockFetcherIterator). When some new map outputs become available, PartialBlockFetcherIterator will delegate these outputs to a new conventional iterator and relies on these conventional iterators for “hasNext” and “next” methods. When all the delegated map statuses run out, PartialBlockFetcherIterator contacts local MapOutputTrackerWorker for updated map outputs. MapOutputTrackerWorker uses an "updater" thread to communicate with MapOutputTrackerMaster to update the map statuses and informs the downstream tasks to continue when the map statuses get updated.

This PoC feature is mainly intended and tested against standalone cluster. I used a 7-node cluster for performance test. Each node runs an executor with 32 CPUs and 90GB memory. I used graphx.SynthBenchmark for the test and the testcase used is:
graphx.SynthBenchmark -partStrategy=EdgePartition2D -numEPart=112 -nverts=10000000 -niter=3
The feature improves the whole job by roughly 10% (reduces the creation time from 128s to 116s and run time from 126s to 115s).

lirui added 30 commits May 5, 2014 13:21
Signed-off-by: lirui <rui.li@intel.com>
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sryza
Copy link
Contributor

sryza commented Jul 8, 2014

SPARK-2099 is adding a general executor->driver heartbeat. It might be worth piggybacking the communication between the MapOutputTrackerWorker and MapOutputTrackerMaster on this.

@lirui-apache
Copy link
Author

Thanks @sryza for the idea. I think it's OK to piggy back the communication in a heartbeat, but we should also allow the worker to explicitly ask the master for map statuses when a task demands more outputs. I'll look into it once you have that feature merged.

@colorant
Copy link
Contributor

I think there might be several issues need to be addressed here to make this more robust and sound solution:

  1. A solution to avoid pre-start stage occupy too many CPU resource which starve the parent stage and prevent it from finish in time. will need to control how many pre-start task can launch and adjust according to parent stage's status. And need some config to allow user to fine tune the behavior, after all, what's the best time to pre-start might depends on actual case.
  2. might need a more common Interface, seems a lot of specific class implementation is assumed.
  3. Might need to take more care of the memory management issue, say how to prevent pre-start stage occupy too many memory (say by cache etc) which complicate the overall cache eviction and GC problem etc.

Just my suggestion.

@@ -340,6 +459,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
with mutable.SynchronizedMap[Int, Array[MapStatus]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ConcurrentHashMap is better in most cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this PR be merged soon? If not, I hope this line can be merged soon because it solves a critical concurrent issue of mapStatuses.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing thanks for the comments. Maybe it's better to make it ConcurrentHashMap in the base class.
I don't think this PR can be merged soon... So maybe you can open another JIRA to fix this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to make it ConcurrentHashMap in the base class.

Because MapOutputTrackerMaster uses TimeStampedHashMap which is not a ConcurrentHashMap, MapOutputTracker still needs to use Map. Nevertheless, I can add a comment on MapOutputTracker.mapStatuses to mark that it should be a thread-safe map.

@pwendell
Copy link
Contributor

I'd like to close this issue for now pending more of a design discussion on the JIRA. These Proof of Concept patches are useful to have, but I'd rather not have them lingering for a long time in the PR queue.

I will post a link on the JIRA to this diff so we have it as a reference:
https://github.com/lirui-intel/spark/compare/removeStageBarrier

@asfgit asfgit closed this in d112a6c Sep 21, 2014
asfgit pushed a commit that referenced this pull request Sep 26, 2014
…shMap

MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon.

Author: zsxwing <zsxwing@gmail.com>

Closes #1541 from zsxwing/SPARK-2634 and squashes the following commits:

d450053 [zsxwing] SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants