-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort #5868
Closed
Closed
Changes from all commits
Commits
Show all changes
96 commits
Select commit
Hold shift + click to select a range
81d52c5
WIP on UnsafeSorter
JoshRosen abf7bfe
Add basic test case.
JoshRosen 57a4ea0
Make initialSize configurable in UnsafeSorter
JoshRosen e900152
Add test for empty iterator in UnsafeSorter
JoshRosen 767d3ca
Fix invalid range in UnsafeSorter.
JoshRosen 3db12de
Minor simplification and sanity checks in UnsafeSorter
JoshRosen 4d2f5e1
WIP
JoshRosen 8e3ec20
Begin code cleanup.
JoshRosen 253f13e
More cleanup
JoshRosen 9c6cf58
Refactor to use DiskBlockObjectWriter.
JoshRosen e267cee
Fix compilation of UnsafeSorterSuite
JoshRosen e2d96ca
Expand serializer API and use new function to help control when new U…
JoshRosen d3cc310
Flag that SparkSqlSerializer2 supports relocation
JoshRosen 87e721b
Renaming and comments
JoshRosen 0748458
Port UnsafeShuffleWriter to Java.
JoshRosen 026b497
Re-use a buffer in UnsafeShuffleWriter
JoshRosen 1433b42
Store record length as int instead of long.
JoshRosen 240864c
Remove PrefixComputer and require prefix to be specified as part of i…
JoshRosen bfc12d3
Add tests for serializer relocation property.
JoshRosen b8a09fe
Back out accidental log4j.properties change
JoshRosen c2fca17
Small refactoring of SerializerPropertiesSuite to enable test re-use:
JoshRosen f17fa8f
Add missing newline
JoshRosen 8958584
Fix bug in calculating free space in current page.
JoshRosen 595923a
Remove some unused variables.
JoshRosen 5e100b2
Super-messy WIP on external sort
JoshRosen 2776aca
First passing test for ExternalSorter.
JoshRosen f156a8f
Hacky metrics integration; refactor some interfaces.
JoshRosen 3490512
Misc. cleanup
JoshRosen 3aeaff7
More refactoring and cleanup; begin cleaning iterator interfaces
JoshRosen 7ee918e
Re-order imports in tests
JoshRosen 69232fd
Enable compressible address encoding for off-heap mode.
JoshRosen 57f1ec0
WIP towards packed record pointers for use in optimized shuffle sort.
JoshRosen f480fb2
WIP in mega-refactoring towards shuffle-specific sort.
JoshRosen 133c8c9
WIP towards testing UnsafeShuffleWriter.
JoshRosen 4f70141
Fix merging; now passes UnsafeShuffleSuite tests.
JoshRosen aaea17b
Add comments to UnsafeShuffleSpillWriter.
JoshRosen b674412
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen 11feeb6
Update TODOs related to shuffle write metrics.
JoshRosen 8a6fe52
Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter
JoshRosen cfe0ec4
Address a number of minor review comments:
JoshRosen e67f1ea
Remove upper type bound in ShuffleWriter interface.
JoshRosen 5e8cf75
More minor cleanup
JoshRosen 1ce1300
More minor cleanup
JoshRosen b95e642
Refactor and document logic that decides when to spill.
JoshRosen 9883e30
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen 722849b
Add workaround for transferTo() bug in merging code; refactor tests.
JoshRosen 7cd013b
Begin refactoring to enable proper tests for spilling.
JoshRosen 9b7ebed
More defensive programming RE: cleaning up spill files and memory aft…
JoshRosen e8718dd
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen 1929a74
Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolve…
JoshRosen 01afc74
Actually read data in UnsafeShuffleWriterSuite
JoshRosen 8f5061a
Strengthen assertion to check partitioning
JoshRosen 67d25ba
Update Exchange operator's copying logic to account for new shuffle m…
JoshRosen fd4bb9e
Use own ByteBufferOutputStream rather than Kryo's
JoshRosen 9d1ee7c
Fix MiMa excludes for ShuffleWriter change
JoshRosen fcd9a3c
Add notes + tests for maximum record / page sizes.
JoshRosen 27b18b0
That for inserting records AT the max record size.
JoshRosen 4a01c45
Remove unnecessary log message
JoshRosen f780fb1
Add test demonstrating which compression codecs support concatenation.
JoshRosen b57c17f
Disable some overly-verbose logs that rendered DEBUG useless.
JoshRosen 1ef56c7
Revise compression codec support in merger; test cross product of con…
JoshRosen b3b1924
Properly implement close() and flush() in DummySerializerInstance.
JoshRosen 0d4d199
Bump up shuffle.memoryFraction to make tests pass.
JoshRosen ec6d626
Add notes on maximum # of supported shuffle partitions.
JoshRosen ae538dc
Document UnsafeShuffleManager.
JoshRosen ea4f85f
Roll back an unnecessary change in Spillable.
JoshRosen 1e3ad52
Delete unused ByteBufferOutputStream class.
JoshRosen 39434f9
Avoid integer multiplication overflow in getMemoryUsage (thanks FindB…
JoshRosen e1855e5
Fix a handful of misc. IntelliJ inspections
JoshRosen 7c953f9
Add test that covers UnsafeShuffleSortDataFormat.swap().
JoshRosen 8531286
Add tests that automatically trigger spills.
JoshRosen 69d5899
Remove some unnecessary override vals
JoshRosen d4e6d89
Update to bit shifting constants
JoshRosen 4f0b770
Attempt to implement proper shuffle write metrics.
JoshRosen e58a6b4
Add more tests for PackedRecordPointer encoding.
JoshRosen e995d1a
Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
JoshRosen 56781a1
Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter
JoshRosen 0ad34da
Fix off-by-one in nextInt() call
JoshRosen 85da63f
Cleanup in UnsafeShuffleSorterIterator.
JoshRosen fdcac08
Guard against overflow when expanding sort buffer.
JoshRosen 2d4e4f4
Address some minor comments in UnsafeShuffleExternalSorter.
JoshRosen 57312c9
Clarify fileBufferSize units
JoshRosen 6276168
Remove ability to disable spilling in UnsafeShuffleExternalSorter.
JoshRosen 4a2c785
rename 'sort buffer' to 'pointer array'
JoshRosen e3b8855
Cleanup in UnsafeShuffleWriter
JoshRosen c2ce78e
Fix a missed usage of MAX_PARTITION_ID
JoshRosen d5779c6
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen 5e189c6
Track time spend closing / flushing files; split TimeTrackingOutputSt…
JoshRosen df07699
Attempt to clarify confusing metrics update code
JoshRosen de40b9d
More comments to try to explain metrics code
JoshRosen 4023fa4
Add @Private annotation to some Java classes.
JoshRosen 51812a7
Change shuffle manager sort name to tungsten-sort
JoshRosen 52a9981
Fix some bugs in the address packing code.
JoshRosen d494ffe
Fix deserialization of JavaSerializer instances.
JoshRosen 7610f2f
Add tests for proper cleanup of shuffle data.
JoshRosen ef0a86e
Fix scalastyle errors
JoshRosen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
93 changes: 93 additions & 0 deletions
93
core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.unsafe; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.nio.ByteBuffer; | ||
|
||
import scala.reflect.ClassTag; | ||
|
||
import org.apache.spark.serializer.DeserializationStream; | ||
import org.apache.spark.serializer.SerializationStream; | ||
import org.apache.spark.serializer.SerializerInstance; | ||
import org.apache.spark.unsafe.PlatformDependent; | ||
|
||
/** | ||
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. | ||
* Our shuffle write path doesn't actually use this serializer (since we end up calling the | ||
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work | ||
* around this, we pass a dummy no-op serializer. | ||
*/ | ||
final class DummySerializerInstance extends SerializerInstance { | ||
|
||
public static final DummySerializerInstance INSTANCE = new DummySerializerInstance(); | ||
|
||
private DummySerializerInstance() { } | ||
|
||
@Override | ||
public SerializationStream serializeStream(final OutputStream s) { | ||
return new SerializationStream() { | ||
@Override | ||
public void flush() { | ||
// Need to implement this because DiskObjectWriter uses it to flush the compression stream | ||
try { | ||
s.flush(); | ||
} catch (IOException e) { | ||
PlatformDependent.throwException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// Need to implement this because DiskObjectWriter uses it to close the compression stream | ||
try { | ||
s.close(); | ||
} catch (IOException e) { | ||
PlatformDependent.throwException(e); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public DeserializationStream deserializeStream(InputStream s) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} |
92 changes: 92 additions & 0 deletions
92
core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.unsafe; | ||
|
||
/** | ||
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer. | ||
* <p> | ||
* Within the long, the data is laid out as follows: | ||
* <pre> | ||
* [24 bit partition number][13 bit memory page number][27 bit offset in page] | ||
* </pre> | ||
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that | ||
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the | ||
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this | ||
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task. | ||
* <p> | ||
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this | ||
* optimization to future work as it will require more careful design to ensure that addresses are | ||
* properly aligned (e.g. by padding records). | ||
*/ | ||
final class PackedRecordPointer { | ||
|
||
static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes | ||
|
||
/** | ||
* The maximum partition identifier that can be encoded. Note that partition ids start from 0. | ||
*/ | ||
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215 | ||
|
||
/** Bit mask for the lower 40 bits of a long. */ | ||
private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1; | ||
|
||
/** Bit mask for the upper 24 bits of a long */ | ||
private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS; | ||
|
||
/** Bit mask for the lower 27 bits of a long. */ | ||
private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1; | ||
|
||
/** Bit mask for the lower 51 bits of a long. */ | ||
private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1; | ||
|
||
/** Bit mask for the upper 13 bits of a long */ | ||
private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS; | ||
|
||
/** | ||
* Pack a record address and partition id into a single word. | ||
* | ||
* @param recordPointer a record pointer encoded by TaskMemoryManager. | ||
* @param partitionId a shuffle partition id (maximum value of 2^24). | ||
* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class. | ||
*/ | ||
public static long packPointer(long recordPointer, int partitionId) { | ||
assert (partitionId <= MAXIMUM_PARTITION_ID); | ||
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. | ||
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. | ||
final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24; | ||
final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS); | ||
return (((long) partitionId) << 40) | compressedAddress; | ||
} | ||
|
||
private long packedRecordPointer; | ||
|
||
public void set(long packedRecordPointer) { | ||
this.packedRecordPointer = packedRecordPointer; | ||
} | ||
|
||
public int getPartitionId() { | ||
return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40); | ||
} | ||
|
||
public long getRecordPointer() { | ||
final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS; | ||
final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS; | ||
return pageNumber | offsetInPage; | ||
} | ||
|
||
} |
37 changes: 37 additions & 0 deletions
37
core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.unsafe; | ||
|
||
import java.io.File; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: wrong import order |
||
|
||
import org.apache.spark.storage.TempShuffleBlockId; | ||
|
||
/** | ||
* Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}. | ||
*/ | ||
final class SpillInfo { | ||
final long[] partitionLengths; | ||
final File file; | ||
final TempShuffleBlockId blockId; | ||
|
||
public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) { | ||
this.partitionLengths = new long[numPartitions]; | ||
this.file = file; | ||
this.blockId = blockId; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add more documentation about addressing space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this in my most recent commits and added some tests for writing records at the maximum record size and greater than it.