Skip to content

Commit

Permalink
[#1596] fix(netty): Use a ChannelFutureListener callback mechanism to…
Browse files Browse the repository at this point in the history
… release readMemory (#1605)

### What changes were proposed in this pull request?

1. Add a `ChannelFutureListener` and use its callback mechanism to release `readMemory` only after the `writeAndFlush` method is truly completed.
2. Change the descriptions of configurations `rss.server.buffer.capacity.ratio` and `rss.server.read.buffer.capacity.ratio`. 

### Why are the changes needed?

This is actually a bug, which was introduced by PR #879. The issue has been present since the very beginning when the Netty feature was first integrated.
Fix #1596.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I don't think we need new tests. Tested in our env.
The new log will be:
```
[2024-03-26 23:11:51.039] [epollEventLoopGroup-3-158] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getLocalShuffleData for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1328], offset[0], length[14693742]. Took 1457 ms and retrieved 14693742 bytes of data
[2024-03-26 23:11:51.040] [epollEventLoopGroup-3-130] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getMemoryShuffleData for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1262]. Took 1 ms and retrieved 0 bytes of data
[2024-03-26 23:11:51.068] [epollEventLoopGroup-3-177] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getLocalShuffleIndex for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1366]. Took 918 ms and retrieved 1653600 bytes of data
```
  • Loading branch information
rickyma committed Mar 28, 2024
1 parent 3a1b4d2 commit cbf4f6f
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,9 @@ public int getLength() {
public long getTimestamp() {
return timestamp;
}

@Override
public String getOperationType() {
return "getLocalShuffleData";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ public int getPartitionNumPerRange() {
public int getPartitionNum() {
return partitionNum;
}

@Override
public String getOperationType() {
return "getLocalShuffleIndex";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,9 @@ public long getTimestamp() {
public Roaring64NavigableMap getExpectedTaskIdsBitmap() {
return expectedTaskIdsBitmap;
}

@Override
public String getOperationType() {
return "getMemoryShuffleData";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public RequestMessage(long requestId, ManagedBuffer managedBuffer) {
public long getRequestId() {
return requestId;
}

public abstract String getOperationType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,9 @@ public long getTimestamp() {
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

@Override
public String getOperationType() {
return "sendShuffleData";
}
}
6 changes: 3 additions & 3 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ This document will introduce how to deploy Uniffle shuffle servers.
| rss.server.netty.receive.buf | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. |
| rss.server.netty.send.buf | 0 | Send buffer size (SO_SNDBUF). |
| rss.server.buffer.capacity | -1 | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used |
| rss.server.buffer.capacity.ratio | 0.8 | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * ratio |
| rss.server.buffer.capacity.ratio | 0.8 | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio |
| rss.server.memory.shuffle.highWaterMark.percentage | 75.0 | Threshold of spill data to storage, percentage of rss.server.buffer.capacity |
| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0 | Threshold of keep data in memory, percentage of rss.server.buffer.capacity |
| rss.server.read.buffer.capacity | -1 | Max size of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is used |
| rss.server.read.buffer.capacity.ratio | 0.4 | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size * ratio |
| rss.server.read.buffer.capacity.ratio | 0.4 | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio |
| rss.server.heartbeat.interval | 10000 | Heartbeat interval to Coordinator (ms) |
| rss.server.flush.localfile.threadPool.size | 10 | Thread pool for flush data to local file |
| rss.server.flush.hadoop.threadPool.size | 60 | Thread pool for flush data to hadoop storage |
Expand All @@ -104,7 +104,7 @@ This document will introduce how to deploy Uniffle shuffle servers.
| rss.server.max.concurrency.of.per-partition.write | 30 | The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition. |
| rss.server.max.concurrency.limit.of.per-partition.write | - | The limit for max concurrency per-partition write specified by client, this won't be enabled by default. |
| rss.metrics.reporter.class | - | The class of metrics reporter. |
| rss.server.hybrid.storage.manager.selector.class | org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage. |
| rss.server.hybrid.storage.manager.selector.class | org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage. |
| rss.server.disk-capacity.watermark.check.enabled | false | If it is co-located with other services, the high-low watermark check based on the uniffle used is not correct. Due to this, the whole disk capacity watermark check is necessary, which will reuse the current watermark value. It will be disabled by default. |

### Advanced Configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ShuffleServerConf extends RssBaseConf {
.doubleType()
.defaultValue(0.6)
.withDescription(
"JVM heap size * ratio for the maximum memory of buffer manager for shuffle server, this "
"JVM heap size or off-heap size(when enabling Netty) * ratio for the maximum memory of buffer manager for shuffle server, this "
+ "is only effective when `rss.server.buffer.capacity` is not explicitly set");

public static final ConfigOption<Long> SERVER_READ_BUFFER_CAPACITY =
Expand All @@ -56,7 +56,7 @@ public class ShuffleServerConf extends RssBaseConf {
.doubleType()
.defaultValue(0.2)
.withDescription(
"JVM heap size * ratio for read buffer size, this is only effective when "
"JVM heap size or off-heap size(when enabling Netty) * ratio for read buffer size, this is only effective when "
+ "`rss.server.reader.buffer.capacity.ratio` is not explicitly set");

public static final ConfigOption<Long> SERVER_HEARTBEAT_DELAY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ public ShuffleBufferManager(
this.readCapacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
if (this.readCapacity < 0) {
this.readCapacity =
(long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
nettyServerEnabled
? (long)
(NettyUtils.getMaxDirectMemory()
* conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO))
: (long)
(heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
}
LOG.info(
"Init shuffle buffer manager with capacity: {}, read buffer capacity: {}.",
Expand Down

0 comments on commit cbf4f6f

Please sign in to comment.