-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#940] improvement: Optimize columnar shuffle integration #958
Conversation
shuffleWriteMetrics.incRecordsWritten(1L); | ||
// records is a row based semantic | ||
if (isRowBased) { | ||
shuffleWriteMetrics.incRecordsWritten(1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add more comments to explain why we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a common case? I think it binded the implement of gluten. Could we have more common interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments are added, Yes it's a common case.
All columnar shuffle use it's own serializer, all the serializer related work that is binded with implementation of the columnar framework not limited to gluten should be handled in the rss columnar shuffle writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this code to the method addRecord
? Columnar shuffle won't call the addRecord
method, will it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea
@@ -141,7 +142,8 @@ public WriteBufferManager( | |||
this.requireMemoryRetryMax = bufferManagerOptions.getRequireMemoryRetryMax(); | |||
this.arrayOutputStream = new WrappedByteArrayOutputStream(serializerBufferSize); | |||
// in columnar shuffle, the serializer here is never used | |||
if (serializer != null) { | |||
this.isRowBased = (serializer != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it common case? if we support other columnar shuffle like rapids, the other columnar shuffle may use non-null serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can columnar shuffle be supported by adding configuration or adding a new constructor? This judgment seems a bit hard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, all columnar shuffle should pass null serializer to WriterBufferManager
.
WriterBufferManager
should only do the buffer related work as it's name mean- Columnar data framework is various, the integration work should be done in the implementation of
rss columnar shuffle writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can pass a non-serializer and ColumnarBatch to addRecord
method although we use columnar shuffle. It's not equivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can pass a non-serializer and ColumnarBatch to
addRecord
method although we use columnar shuffle. It's not equivalent.Not only serialization but also partitioning should be handled and are both related with the implementation of the third party columnar framework, we'd better handle them outside of
WriterBufferManager
For a shuffle, we can handle partitioning by the partitioner
. You mean that partitioner is null for gluten, won't it?
In another word, we may use null serializer row based shuffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerqi When in columnar shuffle, an element of the iterator is a ColumnarBatch
, it's consists of many rows which will be partitioned to different partitionIds, current spark partitioner api cannot handle this scenario.
Actually the java partitioner is fake when use columnar shuffle in gluten, partitioner is implemented in cpp layer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have null serializer for row based shuffle in the future? I think the answer is yes. Because some spark data don't need serialization although the data is organized by row format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about indicate row based shuffle false in rssConf
@jerqi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok for me.
shuffleWriteMetrics.incRecordsWritten(1L); | ||
// records is a row based semantic | ||
if (isRowBased) { | ||
shuffleWriteMetrics.incRecordsWritten(1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a common case? I think it binded the implement of gluten. Could we have more common interface?
Should we make all fields as protected variables? |
private final Map<Integer, Integer> shuffleIdToNumMapTasks = Maps.newConcurrentMap(); | ||
private ShuffleManagerGrpcService service; | ||
private GrpcServer shuffleManagerServer; | ||
protected final String clientType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change all fields to protected
? If this is necessary, it is also need to add some code comments to explain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow the suggestion
No, fix this @xianjingfeng PTAL |
Codecov Report
@@ Coverage Diff @@
## master #958 +/- ##
============================================
- Coverage 55.00% 54.18% -0.83%
- Complexity 2466 2474 +8
============================================
Files 367 355 -12
Lines 19237 17996 -1241
Branches 1579 1726 +147
============================================
- Hits 10582 9751 -831
+ Misses 8008 7643 -365
+ Partials 647 602 -45
... and 43 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing about this integration, do we have any integration tests for this, so that we can caught any incompatibility before releasing.
@@ -141,7 +142,8 @@ public WriteBufferManager( | |||
this.requireMemoryRetryMax = bufferManagerOptions.getRequireMemoryRetryMax(); | |||
this.arrayOutputStream = new WrappedByteArrayOutputStream(serializerBufferSize); | |||
// in columnar shuffle, the serializer here is never used | |||
if (serializer != null) { | |||
this.isRowBased = (serializer != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can columnar shuffle be supported by adding configuration or adding a new constructor? This judgment seems a bit hard
+1. It seems a bit counterintuitive to rely on serializer == null
to check whether the buffer writer manager should support columnar shuffle or not.
Is is possible for gluten to pass additional conf items to spark conf and in Uniffle side we can add a columnarSupport
field in the BufferManagerOptions class.
protected AtomicReference<String> id = new AtomicReference<>(); | ||
protected SparkConf sparkConf; | ||
protected ShuffleWriteClient shuffleWriteClient; | ||
protected DataPusher dataPusher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you point to the gluten impl again?
It doesn't feel right to just expose these fields here, especially the id and sparkConf fields, they are not exposed in Spark's original shuffle manager.
I think these fields should be accessed at least via a getter/method, so uniffle is free to change its implementation later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
U'r right, id
and dataPusher
need to be extract to a method.
but sparkConf and shuffleWriteClient
are just used in some constructor.
Since Uniffle is kind of dependency for gluten, the integration tests should be placed in gluten side, this will be done after the release. For example we shall not test spark integration in |
// that is handled by rss shuffle writer implementation | ||
if (isRowBased) { | ||
shuffleWriteMetrics.incRecordsWritten(1L); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
List<ShuffleBlockInfo> dataPartition = addPartitionData(partitionId, serializedData, serializedDataLength, start);
if (isRowBased) {
shuffleWriteMetrics.incRecordsWritten(1L);
}
return dataPartition;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's better
@@ -127,7 +127,7 @@ public RssShuffleWriter( | |||
ShuffleWriteClient shuffleWriteClient, | |||
RssShuffleHandle<K, V, C> rssHandle, | |||
Function<String, Boolean> taskFailureCallback) { | |||
LOG.warn("RssShuffle start write taskAttemptId data" + taskAttemptId); | |||
LOG.warn("RssShuffleaskAttempt start write taskAttemptId data" + taskAttemptId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a misspell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let @loukey-lj take another look.
gentle ping @xianjingfeng PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merged. Thanks all. |
What changes were proposed in this pull request?
Why are the changes needed?
#940
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test