-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs #19788
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) | ||
n += 1 | ||
totalSize += status.getSizeForBlock(part) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n
can be numPartitions
, and directly get by endPartition - startPartition
?
@yucai would you mind adding more explanations to your PR description? |
@jerryshao the description has been updated, does it look clear now? |
Sure, I will do it tomorrow. |
ok to test. |
Test build #84132 has finished for PR 19788 at commit
|
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { | ||
// The block is actually going to be a range of a single map output file for this map, so | ||
// find out the consolidated file, then the offset within that from our index | ||
logDebug(s"Fetch block data for $blockId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary to add this, I guess this is mainly for your debug purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this info, it looks hard to know continuous shuffle block read really happens, and getLocalBytes
had similar debug info also.
logDebug(s"Getting local block $blockId as bytes")
How about keeping it?
try { | ||
ByteStreams.skipFully(in, blockId.reduceId * 8) | ||
val offset = in.readLong() | ||
ByteStreams.skipFully(in, (blockId.length - 1) * 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt this line is not correct, this seems change the semantics, for example if startPartition is 3, endPartition is 8, originally it should be (3*8), now it changes to (4*8), can you please explain more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if length is "1", then this will always be Zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, for example, when startPartition = 3, endPartition = 8, it means we need [3, 8) and length = 5.
Line 204: ByteStreams.skipFully(3 * 8), will skip 0, 1, 2
Line 205: offset = in. readLong, we got startPartition(3)'s offset
Line 206: ByteStreams.skipFully((5 - 1) * 8), will skip 4, 5, 6, 7
Line 207: nextOffset = in.readLong(), now we got endPartition(8)'s offset
When length is "1", zero should be correct. We don't need to skip anything, and Line 207's readLong will get endPartition's offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point, thanks for the explanation.
for (part <- startPartition until endPartition) { | ||
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += | ||
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) | ||
totalSize += status.getSizeForBlock(part) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified like: val totalSize = (startPartition until endPartition).map(status.getSizeForXXX).sum
.
Test build #84148 has finished for PR 19788 at commit
|
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, length); | ||
} | ||
|
||
public ManagedBuffer getBlockData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should move the original comment here, and explain the different usages of these two functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will update.
assertEquals(sortBlock1, block1); | ||
|
||
InputStream block01Stream = | ||
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please follow the above indents format.
InputStream block01Stream = | ||
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream(); | ||
String block01 = CharStreams.toString( | ||
new InputStreamReader(block01Stream, StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated!
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId | ||
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, length: Int = 1) | ||
extends BlockId { | ||
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + "_" + length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe s"shuffle_$shuffleId_$mapId_$reduceId_$length"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are semi-public interfaces, can we create a new block id ContinuousShuffleBlockIds
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ContinuousShuffleBlockIds
is a good idea, let me try.
Also cc @cloud-fan |
think about shuffle as a server-client framework, does your change need to update the server side? i.e. do users need to upgrade their external shuffle service for new spark version with this feature? |
Test build #84165 has finished for PR 19788 at commit
|
Currently users need update their external shuffle service for this feature, because we change the format of |
RDDBlockId(rddId.toInt, splitIndex.toInt) | ||
case SHUFFLE(shuffleId, mapId, reduceId) => | ||
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) | ||
case SHUFFLE(shuffleId, mapId, reduceId, n) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:nit length?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good catch! I will change here after using ContinuousShuffleBlockId
@yucai I'm thinking of the necessity to add this new configuration |
Can we just add the |
@jerryshao @cloud-fan @gczsjdy Because this feature is only used in adaptive execution, how about this way:
With above solution, user no needs upgrade their external shuffle service for new spark version if they don't use adaptive execution (very likely). If user wants to use adaptive execution, they have to upgrade external shuffle service because old way does not know |
Sounds good, it would be great if we could document it clearly that if user wants to use adaptive execution, they have to update the external shuffle service. |
Is there an implicit assumption here that contiguous partitions' data can be decompressed / deserialized in a single stream? If the shuffled data is written with a non-relocatable serializer (Java serialization) or non-concatenatable compression format then I'm not sure that you'll actually be able to successfully deserialize a multi-reducer range of the map output using a single decompression / deserialization stream. |
I will update a new version. |
} | ||
this.shuffleId = Integer.parseInt(blockId0Parts[1]); | ||
mapIdAndReduceIds = new int[2 * blockIds.length]; | ||
mapIdAndReduceIds = new int[3 * blockIds.length]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update description of the variable as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
int mapId, | ||
int reduceId) { | ||
int reduceId, | ||
int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename the variable - length
is incorrect (here and other places), please rename to make it clear : numBlocks
perhaps ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will use numBlocks.
val totalSize: Long = (startPartition until endPartition).map(status.getSizeForBlock).sum | ||
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += | ||
((ShuffleBlockId(shuffleId, mapId, startPartition, endPartition - startPartition), | ||
totalSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to create some very heavy shuffle fetches - and looks incorrect.
This merge should not be happening here, but in ShuffleBlockFetcherIterator
this.execId = execId; | ||
String[] blockId0Parts = blockIds[0].split("_"); | ||
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { | ||
if (blockId0Parts.length != 5 || !blockId0Parts[0].equals("shuffle")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This format change can cause incompatibility between shuffle service and spark application - causing a restart of the cluster and update of all spark applications .... I wish we had a better way to encode this information which was not so brittle.
String execId, | ||
int shuffleId, | ||
int mapId, | ||
int reduceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this method ? We dont need it anymore
* Get index offset for a particular reducer. | ||
*/ | ||
public ShuffleIndexRecord getIndex(int reduceId) { | ||
public ShuffleIndexRecord getIndex(int reduceId, int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps require
that length (number of Blocks) is >= 1
Test build #86938 has finished for PR 19788 at commit
|
Test build #101433 has finished for PR 19788 at commit
|
@yucai :
Can you provide some details regarding the new solution ? I did not see any updates in the JIRA or in this PR. |
@mridulm thanks for concerning this feature! I have updated the PR's description, could you take a look at? Any comments will be highly appreciated! |
Test build #101446 has finished for PR 19788 at commit
|
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
Show resolved
Hide resolved
int mapId, | ||
int reduceId) { | ||
int reduceId, | ||
int numBlocks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it seems like the name numBlocks
doesn't fit well for this method. This method tries to get the block of a specific reducer from a specific mapper, so a better name would be
public ManagedBuffer getBlockData(
...
int startReducerId,
int numReducers)
Then we can say that this method is to get the block of several consecutive reducers from a specific mapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we name it startReducerId
, do you think we need name mapId
to mapperId
also?
return new int[] { Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) }; | ||
} | ||
|
||
static public ArrayList<ArrayList<int[]>> mergeContinuousShuffleBlockIds(String[] blockIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add doc to explain the assumption: block ids of same mapper id are consecutive in the input blockIds
.
|
||
static public ArrayList<ArrayList<int[]>> mergeContinuousShuffleBlockIds(String[] blockIds) { | ||
ArrayList<int[]> shuffleBlockIds = new ArrayList<>(); | ||
ArrayList<ArrayList<int[]>> arrayShuffleBlockIds = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only need to return ArrayList<int[]>
, and the int[]
has 3 parts: mapId, reduceId and numBlocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I want to keep it the same as BlockManager.mergeContinuousShuffleBlockIds
, but agree with you, ArrayList<int[]>
is much simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, seems like numBlocks is not enough, which includes possible zero size blocks.
And this function will be reused in OneForOneBlockFetcher
, there needs real size infor.
private void initShuffleBlockIdIndices(String[] blockIds) {
ArrayList<ArrayList<int[]>> arrayShuffleBlockIds =
ExternalShuffleBlockResolver.mergeContinuousShuffleBlockIds(blockIds);
assert(arrayShuffleBlockIds.size() == streamHandle.numChunks);
blockIdIndices = new int[arrayShuffleBlockIds.size() + 1];
blockIdIndices[0] = 0;
for (int i = 0; i < arrayShuffleBlockIds.size(); i++) {
blockIdIndices[i + 1] = blockIdIndices[i] + arrayShuffleBlockIds.get(i).size();
}
}
*/ | ||
private ManagedBuffer getSortBasedShuffleBlockData( | ||
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { | ||
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto about the naming concern
private final TransportClient client; | ||
private final OpenBlocks openMessage; | ||
private final String[] blockIds; | ||
private int[] blockIdIndices = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment to explain the relationship between blocks and chunks.
public void onSuccess(int chunkIndex, ManagedBuffer buffer) { | ||
// On receipt of a chunk, pass it upwards as a block. | ||
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer); | ||
listener.onBlockFetchSuccess(Arrays.copyOfRange(blockIds, blockIdIndices[chunkIndex], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way to avoid copy? e.g. if we change the callback interface to not take Array
but Seq
, then maybe we can create a special Seq
which re-maps the index of blockIds
, to avoid array copy.
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
Show resolved
Hide resolved
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; | ||
|
||
/** TestOpenBlocks is used to test OpenBlocks backward compatibility only */ | ||
public class TestOpenBlocks extends BlockTransferMessage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use write scala to test java classes...
private def shouldCompress(blockId: BlockId): Boolean = { | ||
blockId match { | ||
case _: ShuffleBlockId => compressShuffle | ||
case _: ArrayShuffleBlockId => compressShuffle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will ArrayShuffleBlockId
go through network?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but ShuffleBlockFetcherIterator
needs uncompress to detect corrupt.
var isStreamCopied: Boolean = false
try {
input = streamWrapper(arrayBlockId, in)
But I think I can use below to avoid this.
var isStreamCopied: Boolean = false
try {
input = streamWrapper(arrayBlockId.blockIds.head, in)
Test build #101514 has finished for PR 19788 at commit
|
9b60ded
to
039ae85
Compare
Test build #101523 has finished for PR 19788 at commit
|
Test build #101540 has finished for PR 19788 at commit
|
So just looking at the description, this implementation is simply having the server side read from the separate map output files and send them out in one stream when the reducer actually reads, correct? Meaning you are still getting disk seeks on the server side, but on the client side it see's one stream that contains the multiple map outputs, correct? I'm curious what specific performance benefits you were seeing from this? Is it just the client side or is there something on the server side that I might not be thinking about? |
Note, I'm not against this change just want to understand better, thanks for working on this. |
@tgravescs thanks for looking at this! The shuffle block is stored like below: format is s"shuffle_$shuffleId_$mapId_$reduceId", refering to BlockId.scala. Before this PR, to read map output file 0's data (e.g.: reducer 5 to 10), whose block Ids are from shuffle_0_0_5 to shuffle_0_0_10, Spark needs 6 disk IOs + 6 network IOs. After this PR, to read the same map output file 0's data, we only needs 1 disk IO and 1 network IO, this way can reduce IO dramatically. We did this kind of merge in both client and server side. In my previous benchmark testing, merge IO will have very obvious improvment in shuffle read when adaptive execution enabled. |
So just to make sure I'm following, are you saying reducer tasks 5 to 10 happen to run on the same executor so its fetching those all at once? Perhaps this is combined with your adaptive scheduling logic to automatically set reducer number, so for example originally the map thought it had 20,000 reducers and wrote the map output files accordingly but the adaptive scheduling says you really only need 2,000. In that case each reducer really reads the output for 10 reducers the map originally created? |
@tgravescs, yes, exactly as you understood. |
Can one of the admins verify this patch? |
As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like apache#19788 and apache#24110 very awkward. In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol. Existing and new added UT. Closes apache#24565 from xuanyuanking/SPARK-27665. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 8949bc7)
What changes were proposed in this pull request?
In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. For example, originally the map thought it had 20,000 reducers and wrote the map output files accordingly, but the adaptive scheduling thought Spark really only need 2,000. In that case, each reducer really reads the output for 10 reducers the map originally created.
Currently, each reducer needs to fetch those 10 reducer blocks one by one, this way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way).
The shuffle block is stored like below:
format is s"shuffle_$shuffleId_$mapId_$reduceId", refering to BlockId.scala.
In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file.
After this PR, Spark only needs 1 disk IO and 1 network IO, this way can reduce IO dramatically.
High Level Design
BlockStoreShuffleReader.shouldFetchContinuousShuffleBlocksInBatch
.ShuffleBlockFetcherIterator.fetchLocalBlocks
If remote server supports merge, it will merge blocks and the returned StreamHandle.numChunks < OpenBlocks.blockIds.length. The client will check and know merge happens, so it will work accordingly.
If remote server does not support merges (like external shuffle service < 3.0), the returned StreamHandle.numChunks == OpenBlocks.blockId.length. The client will check and know merge does not happen , it will work accordingly also.
See
NettyBlockRpcServer.receive
,ExternalShuffleBlockHandler.handleMessage
andOneForOneBlockFetcher.start
.Backward Compatibility
One important thing is to be compatible with previous Spark version including both client and server.
This PR uses a similar way like PR#23510 to maintain backward compatibility.
This PR extends OpenBlocks to add an optional
fetchContinuousShuffleBlocksInBatch
boolean flag.It will only be encoded to the message when it's true. OpenBlocks from old clients do not have this flag, which means this flag is false for them.
This is fully compatible:
How was this patch tested?
Add new UTs.