Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1472][part-2] fix(server): Reuse ByteBuf when decoding shuffle blocks instead of reallocating it #1521

Merged
merged 1 commit into from
Feb 15, 2024

Conversation

rickyma
Copy link
Contributor

@rickyma rickyma commented Feb 12, 2024

What changes were proposed in this pull request?

Reuse ByteBuf when decoding shuffle blocks instead of reallocating it

Why are the changes needed?

A sub PR for: #1519

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@codecov-commenter
Copy link

codecov-commenter commented Feb 12, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (576a925) 54.27% compared to head (f85291a) 55.15%.
Report is 9 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1521      +/-   ##
============================================
+ Coverage     54.27%   55.15%   +0.87%     
+ Complexity     2807     2806       -1     
============================================
  Files           427      410      -17     
  Lines         24349    22048    -2301     
  Branches       2077     2082       +5     
============================================
- Hits          13215    12160    -1055     
+ Misses        10305     9129    -1176     
+ Partials        829      759      -70     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

github-actions bot commented Feb 12, 2024

Test Results

2 287 files  ±0  2 287 suites  ±0   4h 30m 33s ⏱️ + 1m 33s
  819 tests ±0    818 ✅ ±0   1 💤 ±0  0 ❌ ±0 
9 086 runs  ±0  9 073 ✅ ±0  13 💤 ±0  0 ❌ ±0 

Results for commit f85291a. ± Comparison against base commit d87dc90.

♻️ This comment has been updated with latest results.

@@ -47,8 +46,7 @@ public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
long crc = byteBuf.readLong();
long taskAttemptId = byteBuf.readLong();
int dataLength = byteBuf.readInt();
ByteBuf data = NettyUtils.getNettyBufferAllocator().directBuffer(dataLength);
data.writeBytes(byteBuf, dataLength);
ByteBuf data = byteBuf.retain().readSlice(dataLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will byteBuf be spitted into muliple parts? Every part will released multiple times? Will it bring errors?

Copy link
Contributor Author

@rickyma rickyma Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBuf will not be splitted into multiple parts. It will be used by a SendShuffleDataRequest as a whole.
It will not bring errors. Because we retain the ByteBuf(refCnf++) everytime when we do a readSlice.

public static SendShuffleDataRequest decode(ByteBuf byteBuf) {
    long requestId = byteBuf.readLong();
    String appId = ByteBufUtils.readLengthAndString(byteBuf);
    int shuffleId = byteBuf.readInt();
    long requireId = byteBuf.readLong();
    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = decodePartitionData(byteBuf);
    long timestamp = byteBuf.readLong();
    return new SendShuffleDataRequest(
        requestId, appId, shuffleId, requireId, partitionToBlocks, timestamp);
  }

But it might slow down the flushing process.
Because it will not trigger the actual flushing process util all the ShufflePartitionedData is flushed(refCnt decreased to 0):

List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(sendShuffleDataRequest);
...
for (ShufflePartitionedData spd : shufflePartitionedData) {
    ...
    ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
    ...
}

ByteBuf cannot be splitted, once splitted we have to allocate new ByteBufs.
So maybe we can hold this PR and find a better way to do this.
But it will speed up the decoding process on the other hand.

@rickyma rickyma closed this Feb 13, 2024
@rickyma rickyma reopened this Feb 13, 2024
@rickyma
Copy link
Contributor Author

rickyma commented Feb 13, 2024

I reopened this PR.

After stress testing the shuffle server without this PR, we will easily encounter OutOfDirectMemoryError, which means this PR is necessary.

[epollEventLoopGroup-3-45] [WARN] TransportChannelHandler.exceptionCaught - Exception in connection from /127.0.0.1:58767
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 161061273600, max: 161061273600)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:843)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:772)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212)
at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:194)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:397)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
at org.apache.uniffle.common.netty.protocol.Decoders.decodeShuffleBlockInfo(Decoders.java:50)
at org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest.decodePartitionData(SendShuffleDataRequest.java:95)
at org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest.decode(SendShuffleDataRequest.java:107)
at org.apache.uniffle.common.netty.protocol.Message.decode(Message.java:145)
at org.apache.uniffle.common.netty.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:77)

We can see that each time an out-of-direct-memory error occurs, it is caused by the code org.apache.uniffle.common.netty.protocol.Decoders.decodeShuffleBlockInfo(Decoders.java:50), which is ByteBuf data = NettyUtils.getNettyBufferAllocator().directBuffer(dataLength). This is the most direct trigger for insufficient direct memory.

Because when a large number of requests arrive simultaneously, there might be a brief period (before the TransportFrameDecoder has a chance to release the ByteBuf) during which the shuffle server has doubled the created ByteBuf. This means, for a very short time, the direct memory usage is doubled, which is extremely uncontrollable.
That is why it is very easy to cause an out-of-direct-memory error without this PR.

So, we need this PR anyway. It might slow down the flushing process a little bit, but the shuffle server will at least remain available during the whole stress test.

From the results of my stress tests, there doesn't seem to be any impact on performance. In fact, it may even be faster, as it can speed up the decoding process by not reallocating new ByteBufs on the other hand.
There have been no anomalies or performance issues caused by the slowing down of the flushing process. Eventually, all these buffers will be flushed, and all ByteBufs will be successfully released, with no memory leaks.

PTAL @jerqi

@jerqi
Copy link
Contributor

jerqi commented Feb 14, 2024

I reopened this PR.

After stress testing the shuffle server without this PR, we will easily encounter OutOfDirectMemoryError, which means this PR is necessary.

[epollEventLoopGroup-3-45] [WARN] TransportChannelHandler.exceptionCaught - Exception in connection from /127.0.0.1:58767 io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 161061273600, max: 161061273600) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:843) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:772) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212) at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:194) at io.netty.buffer.PoolArena.allocate(PoolArena.java:136) at io.netty.buffer.PoolArena.allocate(PoolArena.java:126) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:397) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179) at org.apache.uniffle.common.netty.protocol.Decoders.decodeShuffleBlockInfo(Decoders.java:50) at org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest.decodePartitionData(SendShuffleDataRequest.java:95) at org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest.decode(SendShuffleDataRequest.java:107) at org.apache.uniffle.common.netty.protocol.Message.decode(Message.java:145) at org.apache.uniffle.common.netty.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:77)

We can see that each time an out-of-direct-memory error occurs, it is caused by the code org.apache.uniffle.common.netty.protocol.Decoders.decodeShuffleBlockInfo(Decoders.java:50), which is ByteBuf data = NettyUtils.getNettyBufferAllocator().directBuffer(dataLength). This is the most direct trigger for insufficient direct memory.

Because when a large number of requests arrive simultaneously, there might be a brief period (before the TransportFrameDecoder has a chance to release the ByteBuf) during which the shuffle server has double the created ByteBuf. This means, for a very short time, the direct memory usage is doubled, which is extremely uncontrollable. That is why it is very easy to cause an out-of-direct-memory error without this PR.

So, we need this PR anyway. It might slow down the flushing process a little bit, but the shuffle server will at least remain available during the whole stress test.

From the results of my stress tests, there doesn't seem to be any impact on performance. In fact, it may even be faster, as it can speed up the decoding process by not reallocating new ByteBufs on the other hand. There have been no anomalies or performance issues caused by the slowing down of the flushing process. Eventually, all these buffers will be flushed, and all ByteBufs will be successfully released, with no memory leaks.

PTAL @jerqi

Maybe we should modify our flush strategy, too. Now we will flush a larger reduce partition. But if the map partition contains a smaller reduce partition. The memory won't be released, too.

@jerqi
Copy link
Contributor

jerqi commented Feb 14, 2024

I prefer adding a config option for this improvement.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 14, 2024

I reopened this PR.
After stress testing the shuffle server without this PR, we will easily encounter OutOfDirectMemoryError, which means this PR is necessary.
[epollEventLoopGroup-3-45] [WARN] TransportChannelHandler.exceptionCaught - Exception in connection from /127.0.0.1:58767 io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 161061273600, max: 161061273600) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:843) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:772) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212) at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:194) at io.netty.buffer.PoolArena.allocate(PoolArena.java:136) at io.netty.buffer.PoolArena.allocate(PoolArena.java:126) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:397) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179) at org.apache.uniffle.common.netty.protocol.Decoders.decodeShuffleBlockInfo(Decoders.java:50) at org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest.decodePartitionData(SendShuffleDataRequest.java:95) at org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest.decode(SendShuffleDataRequest.java:107) at org.apache.uniffle.common.netty.protocol.Message.decode(Message.java:145) at org.apache.uniffle.common.netty.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:77)
We can see that each time an out-of-direct-memory error occurs, it is caused by the code org.apache.uniffle.common.netty.protocol.Decoders.decodeShuffleBlockInfo(Decoders.java:50), which is ByteBuf data = NettyUtils.getNettyBufferAllocator().directBuffer(dataLength). This is the most direct trigger for insufficient direct memory.
Because when a large number of requests arrive simultaneously, there might be a brief period (before the TransportFrameDecoder has a chance to release the ByteBuf) during which the shuffle server has double the created ByteBuf. This means, for a very short time, the direct memory usage is doubled, which is extremely uncontrollable. That is why it is very easy to cause an out-of-direct-memory error without this PR.
So, we need this PR anyway. It might slow down the flushing process a little bit, but the shuffle server will at least remain available during the whole stress test.
From the results of my stress tests, there doesn't seem to be any impact on performance. In fact, it may even be faster, as it can speed up the decoding process by not reallocating new ByteBufs on the other hand. There have been no anomalies or performance issues caused by the slowing down of the flushing process. Eventually, all these buffers will be flushed, and all ByteBufs will be successfully released, with no memory leaks.
PTAL @jerqi

Maybe we should modify our flush strategy, too. Now we will flush a larger reduce partition. But if the map partition contains a smaller reduce partition. The memory won't be released, too.

Flushing strategy will be changed in the final PR.
image

@rickyma
Copy link
Contributor Author

rickyma commented Feb 14, 2024

I prefer adding a config option for this improvement.

This is not an improvement, it is actually a bug. Because the shuffle server won't be available during stress testing.
So I think it's a must rather than an improvement. We must avoid double allocation of ByteBuf at all costs.

@rickyma rickyma requested a review from jerqi February 14, 2024 08:48
Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @rickyma

@jerqi jerqi merged commit 7fbe7c9 into apache:master Feb 15, 2024
75 checks passed
zuston pushed a commit that referenced this pull request Feb 23, 2024
…mory issue causing OOM (#1534)

### What changes were proposed in this pull request?

When we use `UnpooledByteBufAllocator` to allocate off-heap `ByteBuf`, Netty directly requests off-heap memory from the operating system instead of allocating it according to `pageSize` and `chunkSize`. This way, we can obtain the exact `ByteBuf` size during the pre-allocation of memory, avoiding distortion of metrics such as `usedMemory`. 

Moreover, we have restored the code submission of the PR [#1521](#1521). We ensure that there is sufficient direct memory for the Netty server during decoding `sendShuffleDataRequest` by taking into account the `encodedLength` of `ByteBuf` in advance during the pre-allocation of memory, thus avoiding OOM during decoding `sendShuffleDataRequest`. 

Since we are not using `PooledByteBufAllocator`, the PR [#1524](#1524) is no longer needed.

### Why are the changes needed?

A sub PR for: #1519

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
@rickyma rickyma deleted the issue-1472-part-2 branch May 5, 2024 08:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants