Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1472][part-5] Inaccurate flow control leads to Shuffle server OOM when enabling Netty #1531

Closed
wants to merge 14 commits into from

Conversation

rickyma
Copy link
Contributor

@rickyma rickyma commented Feb 15, 2024

What changes were proposed in this pull request?

When the shuffle server enables Netty, during the pre-allocation of memory and flushing buffer, we should use the actual used direct memory(which is pinnedDirectMemory in PooledByteBufAllocator) for the if statement, instead of the previous usedMemory and capacity due to #1472.

When initializing the capacity variable, direct memory will be used.
When setting usedMemory variable, pinnedDirectMemory will be used.
usedMemory will be updated in NettyDirectMemoryTracker periodically.

Default values of rss.server.netty.directMemoryTracker.memoryUsage.updateMetricsIntervalMs and rss.server.netty.directMemoryTracker.memoryUsage.initialFetchDelayMs configurations are decreased to 1s.

Why are the changes needed?

A sub PR for: #1519

Does this PR introduce any user-facing change?

No.

How was this patch tested?

1、Modified existed UTs.
2、Fix #1008. It does not actually test GRPC_NETTY mode, because it uses ShuffleServerGrpcClient everywhere instead of ShuffleServerGrpcNettyClient.

Copy link

github-actions bot commented Feb 15, 2024

Test Results

2 289 files   - 140  2 289 suites   - 140   4h 33m 25s ⏱️ - 7m 6s
  816 tests  -   3    815 ✅  -   3   1 💤 ±0  0 ❌ ±0 
9 621 runs   -  92  9 607 ✅  -  92  14 💤 ±0  0 ❌ ±0 

Results for commit 7cdccde. ± Comparison against base commit b924aca.

This pull request removes 29 and adds 26 tests. Note that renamed tests count towards both.
org.apache.uniffle.server.TopNShuffleDataSizeOfAppCalcTaskTest ‑ testTopNShuffleDataSizeOfAppCalcTask
org.apache.uniffle.test.DiskErrorToleranceTest ‑ diskErrorTest
org.apache.uniffle.test.HybridStorageHadoopFallbackTest ‑ fallbackTest
org.apache.uniffle.test.HybridStorageLocalFileFallbackTest ‑ fallbackTest
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ hadoopWriteReadTest
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ testConcurrentWrite2Hadoop{int, int}[1]
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ testConcurrentWrite2Hadoop{int, int}[2]
org.apache.uniffle.test.ShuffleServerFaultToleranceTest ‑ testReadFaultTolerance
org.apache.uniffle.test.ShuffleServerGrpcTest ‑ sendDataAndRequireBufferTest
org.apache.uniffle.test.ShuffleServerGrpcTest ‑ sendDataWithoutRegisterTest
…
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ testConcurrentWrite2Hadoop{int, int, boolean}[1]
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ testConcurrentWrite2Hadoop{int, int, boolean}[2]
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ testConcurrentWrite2Hadoop{int, int, boolean}[3]
org.apache.uniffle.test.ShuffleServerConcurrentWriteOfHadoopTest ‑ testConcurrentWrite2Hadoop{int, int, boolean}[4]
org.apache.uniffle.test.ShuffleServerWithMemLocalHadoopTest ‑ memoryLocalFileHadoopReadWithFilterTest{boolean, boolean}[1]
org.apache.uniffle.test.ShuffleServerWithMemLocalHadoopTest ‑ memoryLocalFileHadoopReadWithFilterTest{boolean, boolean}[2]
org.apache.uniffle.test.SparkClientWithLocalTest ‑ readTest10{boolean}[1]
org.apache.uniffle.test.SparkClientWithLocalTest ‑ readTest10{boolean}[2]
org.apache.uniffle.test.SparkClientWithLocalTest ‑ readTest1{boolean}[1]
org.apache.uniffle.test.SparkClientWithLocalTest ‑ readTest1{boolean}[2]
…

♻️ This comment has been updated with latest results.

@codecov-commenter
Copy link

codecov-commenter commented Feb 15, 2024

Codecov Report

Attention: 140 lines in your changes are missing coverage. Please review.

Comparison is base (7fbe7c9) 54.15% compared to head (7cdccde) 54.40%.
Report is 4 commits behind head on master.

Files Patch % Lines
...iffle/server/buffer/NettyShuffleBufferManager.java 18.27% 74 Missing and 2 partials ⚠️
...ava/org/apache/uniffle/common/util/NettyUtils.java 6.66% 14 Missing ⚠️
...le/server/buffer/AbstractShuffleBufferManager.java 75.00% 8 Missing and 5 partials ⚠️
...ache/uniffle/server/buffer/NettyShuffleBuffer.java 0.00% 13 Missing ⚠️
...a/org/apache/uniffle/common/ShuffleServerInfo.java 0.00% 6 Missing ⚠️
...niffle/server/netty/ShuffleServerNettyHandler.java 0.00% 5 Missing ⚠️
.../org/apache/uniffle/server/ShuffleTaskManager.java 33.33% 3 Missing and 1 partial ⚠️
...niffle/server/buffer/GrpcShuffleBufferManager.java 95.74% 1 Missing and 3 partials ⚠️
...e/uniffle/server/buffer/AbstractShuffleBuffer.java 57.14% 3 Missing ⚠️
...g/apache/uniffle/server/ShuffleDataFlushEvent.java 80.00% 1 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1531      +/-   ##
============================================
+ Coverage     54.15%   54.40%   +0.24%     
- Complexity     2803     2808       +5     
============================================
  Files           430      415      -15     
  Lines         24417    22259    -2158     
  Branches       2081     2112      +31     
============================================
- Hits          13224    12110    -1114     
+ Misses        10361     9389     -972     
+ Partials        832      760      -72     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 15, 2024

PTAL @jerqi.

The main modifications are focused in the following files:
ShuffleBuffer.java
ShuffleBufferManager.java (Mostly in this file)
ShuffleServerNettyHandler.java
NettyDirectMemoryTracker.java
ShuffleDataFlushEvent.java
ShuffleServer.java
ShuffleTaskManager.java.

Other modifications are mainly in test files.

@jerqi
Copy link
Contributor

jerqi commented Feb 16, 2024

cc @zuston

flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, isHugePartition);
return;
}
}

public void flushIfNecessary() {
// if data size in buffer > highWaterMark, do the flush
if (usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() > highWaterMark) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extract a method to make logic more clearer?

Copy link
Contributor Author

@rickyma rickyma Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extract a method to make logic more clearer?

I think the code is clear enough. I don't know if we need to extract a method?
After extracting a method, will it make it less clear? I don't know.

if (nettyServerEnabled) {
  needFlush = pinnedDirectMemory > highWaterMark;
} else {
  needFlush = usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() > highWaterMark;
}

the pseudocode for needFlush is as follows:

needFlush = current shuffle server's actual used buffer > highWaterMark;

We use PooledByteBufAllocator to allocate buffer in Netty mode, so we can basically regard pinnedDirectMemory as current shuffle server's actual used buffer when enabling Netty.

In netty mode, current shuffle server's actual used buffer will be pinnedUsedMemory.
In grpc mode, current shuffle server's actual used buffer will be usedMemory.get() - preAllocatedSize.get() - inFlushSize.get().

@jerqi
Copy link
Contributor

jerqi commented Feb 16, 2024

  1. Do we need to modify the logic of method pickFlushedShuffle? You can refer to the comment [#1472][part-2] fix(server): Reuse ByteBuf when decoding shuffle blocks instead of reallocating it #1521 (comment)
  2. Could extract some methods to make the logic more clearer?

@@ -47,6 +47,8 @@ public class ShuffleBuffer {

private final long capacity;
private long size;
// for Netty mode
private long estimatedSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need estimatedSize? Could we reuse estimatedSize?

Copy link
Contributor Author

@rickyma rickyma Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we use the accurate real-time used direct memory pinnedDirectMemory to determine whether to do the pre-allocation(or flush) or not. If we use size to calculate usedMemory, usedMemory will gradually deviate from pinnedDirectMemory over time(In fact, they will deviate more and more, with an increasing divergence.). This will lead to inaccuracies when calling the pickFlushedShuffle method and when the coordinator allocates shuffle servers, as they both continue to use usedMemory as the basis for judgment.

Copy link
Contributor Author

@rickyma rickyma Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, we cannot reuse size, because the real size of file will still be used in places like:
LocalStorageManager.updateWriteMetrics
HadoopStorageManager.updateWriteMetrics
ShuffleTaskInfo.addOnLocalFileDataSize
ShuffleTaskInfo.addOnHadoopDataSize
StorageWriteMetrics.eventSize

We have to keep it the original way.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 16, 2024

  1. Do we need to modify the logic of method pickFlushedShuffle? You can refer to the comment [#1472][part-2] fix(server): Reuse ByteBuf when decoding shuffle blocks instead of reallocating it #1521 (comment)
  2. Could extract some methods to make the logic more clearer?

We reuse highWaterMark and lowWaterMark:

this.capacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
if (this.capacity < 0) {
  this.capacity =
      nettyServerEnabled
          ? (long)
              (NettyUtils.getMaxDirectMemory()
                  * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO))
          : (long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
}
this.highWaterMark =
    (long)
        (capacity
            / 100.0
            * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE));
this.lowWaterMark =
    (long)
        (capacity
            / 100.0
            * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));

So we don't need to modify the logic of method pickFlushedShuffle.
The pickedFlushSize in pickFlushedShuffle will become estimatedSize in Netty mode, because it comes from shuffleSizeMap which will be modified in cacheShuffleData -> updateShuffleSize.

@rickyma rickyma requested a review from jerqi February 16, 2024 16:48
@jerqi
Copy link
Contributor

jerqi commented Feb 17, 2024

Could you provide some common abstraction for Netty mode and non-Netty mode? Netty mode implement specific behaviour and non-Netty mode implement specific behaviour. Maybe we need some interfaces.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 17, 2024

Could you provide some common abstraction for Netty mode and non-Netty mode? Netty mode implement specific behaviour and non-Netty mode implement specific behaviour. Maybe we need some interfaces.

Abstraction is provided as belows:

AbstractShuffleBuffer
├── GrpcShuffleBuffer
└── NettyShuffleBuffer

AbstractShuffleBufferManager
├── GrpcShuffleBufferManager
└── NettyShuffleBufferManager

ShuffleBufferManagerFactory
└── createShuffleBufferManager()

@jerqi

@rickyma rickyma force-pushed the issue-1472-part5 branch 3 times, most recently from b1713a0 to e83e9a1 Compare February 18, 2024 03:27
@rickyma rickyma force-pushed the issue-1472-part5 branch 6 times, most recently from 5c01e94 to 73a2ac6 Compare February 18, 2024 07:12
@zuston
Copy link
Member

zuston commented Feb 18, 2024

I'm still evaluating this PR effective and rationality. Do you have similar experience about netty? @EnricoMi

Detail could be found on #1519

@rickyma
Copy link
Contributor Author

rickyma commented Feb 18, 2024

I'm still evaluating this PR effective and rationality. Do you have similar experience about netty? @EnricoMi

Detail could be found on #1519

image
Tested in our test env, it solved the issue successfully. Before this PR, it will fail very quickly.

@rickyma rickyma closed this Feb 18, 2024
@rickyma rickyma reopened this Feb 18, 2024
@XuQianJin-Stars
Copy link

hi @rickyma The core code overall looks good, Netty memory's ut needs to be increased to account for the growth of mem.

Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have understood your motivation, but the change is not reasonable, that looks hack

* @param requestedSize The requested size of the direct memory.
* @return The estimated allocated direct memory size.
*/
public static int calculateEstimatedMemoryAllocationSize(int requestedSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really weird

@@ -68,6 +74,9 @@ public void start() {
ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(usedDirectMemory);
ShuffleServerMetrics.gaugeAllocatedDirectMemorySize.set(allocatedDirectMemory);
ShuffleServerMetrics.gaugePinnedDirectMemorySize.set(pinnedDirectMemory);
if (nettyServerEnabled) {
shuffleBufferManager.setUsedMemory(pinnedDirectMemory);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm... It's not a good design that use the scheduled thread to update usedMem, which is not determined.

Copy link
Contributor Author

@rickyma rickyma Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can never accurately obtain the usedMemory calculated through business code.

The first reason is that you cannot estimate this size, due to a lot reasons mentioned before, like network fluctuations.
The second reason is that PooledByteBufAllocator may reuse direct memory through caching.
That means even if you calculate the size directly through the ByteBuf received in the method channedRead on server side, the usedMemory you count may still be larger than the memory managed by PooledByteBufAllocator.

Moreover, NettyUtils.getNettyBufferAllocator().pinnedDirectMemory() is very performance-consuming, so it is periodically obtained.

So, it is meant to be not determined anyways. And we don't need a determined usedMemory here. That's why I use a calculateEstimatedMemoryAllocationSize method to calculate the size.

@rickyma
Copy link
Contributor Author

rickyma commented Feb 21, 2024

Closed. I've created a new PR to solve this problem: #1534

@zuston @jerqi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants