Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2774] - Set preferred locations for reduce tasks #1697

Closed
wants to merge 6 commits into from

Conversation

shivaram
Copy link
Contributor

Motivation for the change is in JIRA. There are a couple of things that I would like feedback about

  1. Should we sort the map outputs by size for every task -- This could be expensive if we have a large number of map outputs.
  2. The number of preferred locations to use. Technically we could set this to a larger number, but I am not sure how it will affect the locality / delay scheduling in TaskSetManager.

cc @rxin

@shivaram shivaram changed the title [SPARK-2774 - Set preferred locations for reduce tasks SPARK-2774 - Set preferred locations for reduce tasks Jul 31, 2014
@shivaram shivaram changed the title SPARK-2774 - Set preferred locations for reduce tasks [SPARK-2774] - Set preferred locations for reduce tasks Jul 31, 2014
@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1697. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17592/consoleFull

@@ -284,6 +290,24 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

// Return the list of locations and blockSizes for each reducer.
def getStatusByReducer(shuffleId: Int): Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on the thread safety

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also comment on the semantics of the return value (what does the Int mean - what does the index in the array mean, etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments -- This method is not thread safe as TimestampedHashMap is not thread safe. However we only call this from DAGScheduler which is single threaded AFAIK

@rxin
Copy link
Contributor

rxin commented Jul 31, 2014

I have some concern (maybe unfounded) about runtime. If we have 50k map tasks and 10k reduce tasks, this would reduce doing 10k sort, each on 50k items right?

@shivaram
Copy link
Contributor Author

Thanks for taking a look -- One thing I realized is that we only need top-5 and don't need to sort the data. I'll try to use the Guava Ordering class and do some benchmarks

Also add a unit test for ordering and address comments
@shivaram
Copy link
Contributor Author

shivaram commented Aug 1, 2014

I switched to using Guava's ordering function now and added another unit test for that. I plan to do a microbenchmark to see how long it takes to get top 5 from a list of Longs.

@kayousterhout -- Is there a way to benchmark the scheduler to see if a change introduces any performance regressions ?

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1697. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17631/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1697:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17631/consoleFull

@shivaram
Copy link
Contributor Author

shivaram commented Aug 1, 2014

I ran some microbenchmarks as outlined at https://gist.github.com/shivaram/63620c47f0ad50106e0a
The comments below the gist have some numbers that I got on my laptop.

Overall I think we should just use a upper bound on the number of map tasks and not return any preferred locations if we have more than say 1000 map tasks. There might be some more optimization we can do in terms of filtering out zeros etc. but a simple heuristic might be a good and safe start for now.

@rxin Thoughts ?

@shivaram
Copy link
Contributor Author

shivaram commented Aug 1, 2014

One more thing we can do is to coalesce sizes from all tasks on a machine and only do node-level locality. As map outputs are on disk there shouldn't be any difference for node vs. process level locality ?

…-locality

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1697. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17715/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1697:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17715/consoleFull

@shivaram
Copy link
Contributor Author

Ping @rxin -- Any thoughts on this ? I can merge to upstream and it'll be great to have this in 1.2

@rxin
Copy link
Contributor

rxin commented Nov 2, 2014

Can we bring this up to date, and:

  1. Add a switch to turn it on / off
  2. Add a config option to disable this automatically when num reduce / map tasks is greater than a certain threshold?

@JoshRosen
Copy link
Contributor

I agree with @rxin; I'd be totally fine with including this an an experimental feature, perhaps opt-in while we test it (like we did with sort-based shuffle).

@shivaram
Copy link
Contributor Author

Sure. I'll bring this up to date, put it behind a config flag this week and ping the PR.

@pwendell
Copy link
Contributor

Let's close this issue pending an update from @shivaram (just doing some JIRA clean-up).

@asfgit asfgit closed this in 1ac1c1d Jan 19, 2015
shivaram added a commit to shivaram/spark-1 that referenced this pull request Feb 12, 2015
This is another attempt at apache#1697 addressing some of the earlier concerns.
This adds a couple of thresholds based on number map and reduce tasks
beyond which we don't use preferred locations for reduce tasks.

This patch also fixes some bugs in DAGSchedulerSuite where the MapStatus
objects created didn't have the right number of reducers set.
asfgit pushed a commit that referenced this pull request Jun 10, 2015
Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each
shuffle. We then set the preferred locations to be any machines that have 20% of more of the output
that needs to be read by the reduce task.  This will result in at most 5 preferred locations for
each reduce task.

Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we
restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes #6652 from shivaram/reduce-locations and squashes the following commits:

492e25e [Shivaram Venkataraman] Remove unused import
2ef2d39 [Shivaram Venkataraman] Address code review comments
897a914 [Shivaram Venkataraman] Remove unused hash map
f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner
68bc29e [Shivaram Venkataraman] Fix line length
1090b58 [Shivaram Venkataraman] Change flag name
77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps
6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables
9d5831a [Shivaram Venkataraman] Address some more comments
8e31266 [Shivaram Venkataraman] Fix style
0df3180 [Shivaram Venkataraman] Address code review comments
e7d5449 [Shivaram Venkataraman] Fix merge issues
ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment
34d0283 [Shivaram Venkataraman] Fix style issues
3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at #1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each
shuffle. We then set the preferred locations to be any machines that have 20% of more of the output
that needs to be read by the reduce task.  This will result in at most 5 preferred locations for
each reduce task.

Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we
restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes apache#6652 from shivaram/reduce-locations and squashes the following commits:

492e25e [Shivaram Venkataraman] Remove unused import
2ef2d39 [Shivaram Venkataraman] Address code review comments
897a914 [Shivaram Venkataraman] Remove unused hash map
f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner
68bc29e [Shivaram Venkataraman] Fix line length
1090b58 [Shivaram Venkataraman] Change flag name
77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps
6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables
9d5831a [Shivaram Venkataraman] Address some more comments
8e31266 [Shivaram Venkataraman] Fix style
0df3180 [Shivaram Venkataraman] Address code review comments
e7d5449 [Shivaram Venkataraman] Fix merge issues
ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment
34d0283 [Shivaram Venkataraman] Fix style issues
3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at apache#1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.
shivaram added a commit to shivaram/spark-1 that referenced this pull request Nov 24, 2015
Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each
shuffle. We then set the preferred locations to be any machines that have 20% of more of the output
that needs to be read by the reduce task.  This will result in at most 5 preferred locations for
each reduce task.

Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we
restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes apache#6652 from shivaram/reduce-locations and squashes the following commits:

492e25e [Shivaram Venkataraman] Remove unused import
2ef2d39 [Shivaram Venkataraman] Address code review comments
897a914 [Shivaram Venkataraman] Remove unused hash map
f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner
68bc29e [Shivaram Venkataraman] Fix line length
1090b58 [Shivaram Venkataraman] Change flag name
77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps
6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables
9d5831a [Shivaram Venkataraman] Address some more comments
8e31266 [Shivaram Venkataraman] Fix style
0df3180 [Shivaram Venkataraman] Address code review comments
e7d5449 [Shivaram Venkataraman] Fix merge issues
ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment
34d0283 [Shivaram Venkataraman] Fix style issues
3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at apache#1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.

Conflicts:
	core/src/main/scala/org/apache/spark/MapOutputTracker.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants