-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-2045 Sort-based shuffle #1499
Conversation
QA tests have started for PR 1499. This patch merges cleanly. |
* The tracking code is copied from SizeTrackingAppendOnlyMap -- we'll factor that out soon. | ||
*/ | ||
private[spark] class SizeTrackingBuffer[T <: AnyRef](initialCapacity: Int = 64) | ||
extends SizeTrackingCollection[T] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be replaced with the buffer in #1165 except we also need destructiveSortedIterator. And to sort stuff in-place, that in turn requires T to be a subclass of AnyRef (otherwise we'd have to pass the Array[T] to a different Arrays.sort() method based on its type).
QA tests have started for PR 1499. This patch merges cleanly. |
* the user. | ||
*/ | ||
def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) | ||
: Iterator[(Int, Iterator[Product2[K, C]])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add @return to the javadoc to explain what the (int, iterator) represents.
QA results for PR 1499: |
QA results for PR 1499: |
QA tests have started for PR 1499. This patch merges cleanly. |
QA results for PR 1499: |
|
||
private val blockManager = SparkEnv.get.blockManager | ||
private val diskBlockManager = blockManager.diskBlockManager | ||
private val ser = Serializer.getSerializer(serializer.getOrElse(null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it appears that you could just do Serializer.getSerializer(serializer)
, thanks to code you added last month :)
QA tests have started for PR 1499. This patch merges cleanly. |
QA results for PR 1499: |
QA tests have started for PR 1499. This patch merges cleanly. |
I've now updated this to support partial aggregation across spilled files and even if we don't have an Ordering, using hash code comparison similar to ExternalAppendOnlyMap. It also now fully implements the behavior in the docs, namely sorting the data if you pass an Ordering, etc. It looks like Aaron found a problem with the size-tracking code -- will try to fix that in ExternalAppendOnlyMap as well. Once this class is in though it could replace ExternalAppendOnlyMap in most use cases, though its one downside is that it creates another object for each key of the in-memory collection (since we have |
QA tests have started for PR 1499. This patch merges cleanly. |
QA results for PR 1499: |
QA results for PR 1499: |
writer.close() | ||
} catch { | ||
case e: Exception => | ||
writer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: For the purposes of slightly more defensive programming, consider putting this inside a Utils.tryLog { }
. We probably don't want to leak files.
By the way, is this logic correct for an exception? It looks like we'll just keep merrily chugging along -- should we propagate the exception upwards?
…reams Before it could give ArrayIndexOutOfBoundsException after we read the very last element
This feature allowed for more efficient serialization in some cases, but made it less efficient in the more common case of (K, V) because it required the data structures in shuffle to hold Tuple2 objects instead of unpacking them into parallel arrays. Moreover, it was broken for ShuffledRDDs with aggregation, because there wasn't any way to construct the user's Product2 type so we would return Tuple2. It seemed better to disable it. One unfortunate effect is that OrderedRDDFunctions used to have a public API that let it work on RDD[P <: Product2] and return RDD[P] from sortByKey. This can't work now, so I changed it to return RDD[(K, V)]. In practice anyone using OrderedRDDFunctions through the implicit conversions would get RDD[(K, V)] anyway, so it only mattered for people who somehow tried to create it directly. I've made OrderedRDDFunctions's constructor a @DeveloperAPI to discourage this. It will be possible to add back custom data types later, but we'll need a better interface that gives ShuffledRDD and such a mechanism to construct them and to manage buffers of them. It might also be possible to come up with an interface for "key-less" shuffles, where you shuffle an RDD[T] where each element is its own key. This would be a much better fit for sorting and some of the GraphX use cases.
This also required creating a BlockId subclass for shuffle index blocks so that the BlockManagers can report back their lists of blocks.
This fix isn't great because we sometimes allocate another Tuple2, but the only other easy option is to disable sending mutable pairs, which would be expensive.
Thanks everyone, I think I addressed all the comments. Anything else before we merge this? I'd like to merge it fairly soon because there are a few other issues I'd like fixed for 1.1 that will depend on this code: https://issues.apache.org/jira/browse/SPARK-2711, https://issues.apache.org/jira/browse/SPARK-983 @mridulm I haven't updated the spilling code to match your fixes in #1609 yet but I will do that once that PR is merged (in case we refactor the fixes in any way) |
Jenkins, test this please |
QA tests have started for PR 1499. This patch merges cleanly. |
QA tests have started for PR 1499. This patch merges cleanly. |
QA results for PR 1499: |
QA results for PR 1499: |
Jenkins, retest this please. |
while (partitionId < numPartitions && | ||
indexInPartition == spill.elementsPerPartition(partitionId)) { | ||
partitionId += 1 | ||
indexInPartition = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need to set indexInBatch = 0
here as well? Is it because batches straddle partition boundaries (i.e. one batch can have elements from multiple partitions)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the answer is yes because when we spill we iterate through ((partition ID, k), v), not just (k, v) per partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah exactly
test this please! |
QA tests have started for PR 1499. This patch merges cleanly. |
I took another pass over the patch and the changes look ready to me. I also tested this locally and verified that the shuffle files were actually cleaned up. There is still a lot of very similar code in the ExternalSorter and the EAOM, though this is not of 1.1 priority to resolve. I have filed a related JIRA as a reminder to clean this up some time after the release. |
Ok I merged this in master. |
QA results for PR 1499: |
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.) The main TODOs still left are: - [x] enabling ExternalSorter to merge across spilled files - [x] with an Ordering - [x] without an Ordering, using the keys' hash codes - [x] adding more tests (e.g. a version of our shuffle suite that runs on this) - [x] rebasing on top of the size-tracking refactoring in apache#1165 when that is merged - [x] disabling spilling if spark.shuffle.spill is set to false Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback. After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`. Author: Matei Zaharia <matei@databricks.com> Closes apache#1499 from mateiz/sort-based-shuffle and squashes the following commits: bd841f9 [Matei Zaharia] Various review comments d1c137f [Matei Zaharia] Various review comments a611159 [Matei Zaharia] Compile fixes due to rebase 62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s. f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic) 9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase 0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test 03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle 3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer 44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes 5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data: 5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition) e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it) c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark] 4988d16 [Matei Zaharia] tweak c1b7572 [Matei Zaharia] Small optimization ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering 4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given e1f84be [Matei Zaharia] Fix disk block manager test 5a40a1c [Matei Zaharia] More tests 614f1b4 [Matei Zaharia] Add spill metrics to map tasks cc52caf [Matei Zaharia] Add more error handling and tests for error cases bbf359d [Matei Zaharia] More work 3a56341 [Matei Zaharia] More partial work towards sort-based shuffle 7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.)
The main TODOs still left are:
Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback.
After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in
sortByKey
.