-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#940] feat: Support columnar shuffle with gluten #950
Conversation
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
Show resolved
Hide resolved
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #950 +/- ##
============================================
+ Coverage 55.72% 56.86% +1.13%
- Complexity 2372 2446 +74
============================================
Files 357 339 -18
Lines 18123 16677 -1446
Branches 1419 1566 +147
============================================
- Hits 10099 9483 -616
+ Misses 7438 6621 -817
+ Partials 586 573 -13
... and 24 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Could you share the changed code in gluten? @summaryzb |
5d4bf13
to
6b4ec01
Compare
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
Outdated
Show resolved
Hide resolved
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
Show resolved
Hide resolved
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
Outdated
Show resolved
Hide resolved
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
Outdated
Show resolved
Hide resolved
@@ -141,7 +140,11 @@ public WriteBufferManager( | |||
this.requireMemoryInterval = bufferManagerOptions.getRequireMemoryInterval(); | |||
this.requireMemoryRetryMax = bufferManagerOptions.getRequireMemoryRetryMax(); | |||
this.arrayOutputStream = new WrappedByteArrayOutputStream(serializerBufferSize); | |||
this.serializeStream = instance.serializeStream(arrayOutputStream); | |||
// columnar shuffle use the serialized data directly | |||
if (serializer != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if serializer == null
? I didn't special code in the addPartitionData
part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When use addPartitionData
to add byte already serialized, serializer is never used. What's more use ColumnarSerializer here in instance.serializeStream(arrayOutputStream)
cause exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't special code in the
addPartitionData
part.
please see here addPartitionData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
would you mind to update the JavaDoc of addPartitionData
to reflect this info.
And update the comment of L143 to indicate serializer might never be used when integrating with gluten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow suggestion
Yeah, I'll contribute the patch to gluten today and post the link here later |
|
Is it possible that UniffleShuffleManager extends RssShuffleManager and UniffleColumnarShuffleWriter extends RssShuffleWriter? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let @xianjingfeng take another look. @xianjingfeng You can @your colleague to ensure that this change is ok for him.
That's really good idea, it'll make the gluten side code more simplify. |
It is ok for me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -128,7 +128,11 @@ public WriteBufferManager( | |||
super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP); | |||
this.bufferSize = bufferManagerOptions.getBufferSize(); | |||
this.spillSize = bufferManagerOptions.getBufferSpillThreshold(); | |||
this.instance = serializer.newInstance(); | |||
// columnar shuffle reader use the serialized data directly | |||
if (serializer != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serializer = rssHandle.getDependency().serializer(). Even with gluten, it is not null here,In my gluten 0.5 practice, the error is reported here because the serializeStream method is not implemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loukey-lj In writer side the serializer is specify with null directly in gluten side code, serializeStream method is not implemented, because serialize work is done in native engine layer.
In reader side it's taken from rssHandle.getDependency().serializer()
, that's why only deserializeStream method is implemented
/** | ||
* add serialized columnar data directly when integrate with gluten | ||
*/ | ||
public List<ShuffleBlockInfo> addPartitionData(int partitionId, byte[] serializedData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.shuffle.writer.rssShuffleWriter#writeImpl logic can be divided into three parts: loop start, execution loop, loop end, addPartitionData This method is reserved for gluten to use, gluten's shuffleWrite logic will also have three parts, loop start, loop execution addPartitionData, end of loop, my question is loop start and loop end also need to execute uniffle logic, where is this part of the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap uniffle writer in gluten side writer. loop start and loop end
is called in it
* add serialized columnar data directly when integrate with gluten | ||
*/ | ||
public List<ShuffleBlockInfo> addPartitionData(int partitionId, byte[] serializedData) { | ||
return addPartitionData(partitionId, serializedData, 0, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to replace 0 with serializedData.length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loukey-lj The record and bytes metric is already counted in gluten side.
What changes were proposed in this pull request?
support read and write serialized columnar data
Why are the changes needed?
Fix: #940
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UnitTest, it's covered by the existing test