-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#133] feat(netty): Implement ShuffleServer interface. #879
Conversation
Codecov Report
@@ Coverage Diff @@
## master #879 +/- ##
============================================
+ Coverage 56.57% 57.28% +0.71%
+ Complexity 2174 2173 -1
============================================
Files 327 310 -17
Lines 15979 13897 -2082
Branches 1263 1278 +15
============================================
- Hits 9040 7961 -1079
+ Misses 6430 5505 -925
+ Partials 509 431 -78
... and 33 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -80,4 +80,9 @@ public static final byte[] readBytes(ByteBuf buf) { | |||
buf.resetReaderIndex(); | |||
return bytes; | |||
} | |||
|
|||
public static void readBytes(ByteBuf from, byte[] to, int offset, int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add UT for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
@smallzhongfeng Could you help me review this pr? |
return data; | ||
} | ||
|
||
public ByteBuffer getDataBuffer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to consider nioBufferCount != 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was my overlooked, since the increaseWriterIndex parameter is true when we call CompositeByteBuf.addComponent, CompositeByteBuf can be used as a normal ByteBuf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that we don't need consider nioBufferCount != 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we don't need, I have made some tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we don't need, I have made some tests.
Could you add some UT for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See ByteBufUtilsTest#test, CompositeByteBuf can be read as ByteBuf.
final int expectedLength = byteBuf.readableBytes() + byteBuf1.readableBytes();
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, byteBuf);
compositeByteBuf.addComponent(true, byteBuf1);
ByteBuf res = Unpooled.buffer(100);
ByteBufUtils.copyByteBuf(compositeByteBuf, res);
assertEquals(expectedLength, res.readableBytes() - Integer.BYTES);
res.clear();
ByteBufUtils.copyByteBuf(compositeByteBuf, res);
assertEquals(expectedLength, res.readableBytes() - Integer.BYTES);
|
||
addDecoder(ctx, magicByte); | ||
public void exceptionCaught(Throwable cause, TransportClient client) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some logs for Exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.
LocalFileServerReadHandler.getShuffleData() still uses byte[], we will not change it at present. |
cc @smallzhongfeng @jerqi PTAL. |
@@ -161,11 +163,10 @@ public synchronized ShuffleDataResult getShuffleData( | |||
updateBufferSegmentsAndResultBlocks( | |||
lastBlockId, readBufferSize, bufferSegments, readBlocks, expectedTaskIds); | |||
if (!bufferSegments.isEmpty()) { | |||
int length = calculateDataLength(bufferSegments); | |||
byte[] data = new byte[length]; | |||
CompositeByteBuf byteBuf = Unpooled.compositeBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will CompositeByteBuf
cause some extra copy because we may trigger resize action?
Will we use direct memory for CompositeByteBuf
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompositeByteBuf does not need to apply for memory. It depends on child ByteBuf. ByteBuf comes from netty server, so it depends on ByteBufAllocator in the end. I will fix this problem when init StreamServer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually PooledByteBufAllocator.DEFAULT use direct memory, we needn't fix any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually PooledByteBufAllocator.DEFAULT use direct memory, we needn't fix any more.
OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompositeByteBuf does not need to apply for memory. It depends on child ByteBuf. ByteBuf comes from netty server, so it depends on ByteBufAllocator in the end. I will fix this problem when init StreamServer.
Do you consider CompositeByteBuf#consolidateIfNeeded()
? In this method, if we exceed the maxComponentNum, we will trigger the logic of merging component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, but there is a problem which we need to discuss. For me, we could add some comments, 16 is enough if we only use it to read memory data in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
16 is not enough, because read buffer is 16M.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
16 is not enough, because read buffer is 16M.
We would better use a larger value to avoid data copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose the size of a block is small enough to be 16k, readBuffer=16M, we can make maxNumComponents=1024 to avoid data copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
import org.apache.uniffle.common.netty.client.TransportClient; | ||
import org.apache.uniffle.common.netty.protocol.RequestMessage; | ||
|
||
public class BaseMessageHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do this class have so many empty methods? Is it better to use abstract class?
import org.apache.uniffle.common.netty.client.TransportClient; | ||
import org.apache.uniffle.common.netty.protocol.RequestMessage; | ||
|
||
public abstract class BaseMessageHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use interface here? Because we don't use any default implement of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
cachedClient.getChannel().pipeline().get(TransportResponseHandler.class); | ||
TransportChannelHandler handler = | ||
cachedClient.getChannel().pipeline().get(TransportChannelHandler.class); | ||
synchronized (handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we instead optimistic lock of synchronized
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark
also like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @leixm @smallzhongfeng
… release readMemory (#1605) ### What changes were proposed in this pull request? 1. Add a `ChannelFutureListener` and use its callback mechanism to release `readMemory` only after the `writeAndFlush` method is truly completed. 2. Change the descriptions of configurations `rss.server.buffer.capacity.ratio` and `rss.server.read.buffer.capacity.ratio`. ### Why are the changes needed? This is actually a bug, which was introduced by PR #879. The issue has been present since the very beginning when the Netty feature was first integrated. Fix #1596. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I don't think we need new tests. Tested in our env. The new log will be: ``` [2024-03-26 23:11:51.039] [epollEventLoopGroup-3-158] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getLocalShuffleData for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1328], offset[0], length[14693742]. Took 1457 ms and retrieved 14693742 bytes of data [2024-03-26 23:11:51.040] [epollEventLoopGroup-3-130] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getMemoryShuffleData for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1262]. Took 1 ms and retrieved 0 bytes of data [2024-03-26 23:11:51.068] [epollEventLoopGroup-3-177] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getLocalShuffleIndex for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1366]. Took 918 ms and retrieved 1653600 bytes of data ```
What changes were proposed in this pull request?
Implement ShuffleServer interface.
Why are the changes needed?
For #133.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
existing UTs.