diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java index d80d0aa6d9..b96c028fbd 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java @@ -131,4 +131,9 @@ public int getLength() { public long getTimestamp() { return timestamp; } + + @Override + public String getOperationType() { + return "getLocalShuffleData"; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java index 1ccdfae10f..105fea051d 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java @@ -93,4 +93,9 @@ public int getPartitionNumPerRange() { public int getPartitionNum() { return partitionNum; } + + @Override + public String getOperationType() { + return "getLocalShuffleIndex"; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java index d358cf7cdf..13a2412414 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java @@ -148,4 +148,9 @@ public long getTimestamp() { public Roaring64NavigableMap getExpectedTaskIdsBitmap() { return expectedTaskIdsBitmap; } + + @Override + public String getOperationType() { + return "getMemoryShuffleData"; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java index cfa55287cf..946f906cc6 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java @@ -35,4 +35,6 @@ public RequestMessage(long requestId, ManagedBuffer managedBuffer) { public long getRequestId() { return requestId; } + + public abstract String getOperationType(); } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java index 492b5b64b9..a77b0d3c7a 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java @@ -145,4 +145,9 @@ public long getTimestamp() { public void setTimestamp(long timestamp) { this.timestamp = timestamp; } + + @Override + public String getOperationType() { + return "sendShuffleData"; + } } diff --git a/docs/server_guide.md b/docs/server_guide.md index 25224d6d69..efe0856a9f 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -83,11 +83,11 @@ This document will introduce how to deploy Uniffle shuffle servers. | rss.server.netty.receive.buf | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. | | rss.server.netty.send.buf | 0 | Send buffer size (SO_SNDBUF). | | rss.server.buffer.capacity | -1 | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used | -| rss.server.buffer.capacity.ratio | 0.8 | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * ratio | +| rss.server.buffer.capacity.ratio | 0.8 | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio | | rss.server.memory.shuffle.highWaterMark.percentage | 75.0 | Threshold of spill data to storage, percentage of rss.server.buffer.capacity | | rss.server.memory.shuffle.lowWaterMark.percentage | 25.0 | Threshold of keep data in memory, percentage of rss.server.buffer.capacity | | rss.server.read.buffer.capacity | -1 | Max size of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is used | -| rss.server.read.buffer.capacity.ratio | 0.4 | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size * ratio | +| rss.server.read.buffer.capacity.ratio | 0.4 | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio | | rss.server.heartbeat.interval | 10000 | Heartbeat interval to Coordinator (ms) | | rss.server.flush.localfile.threadPool.size | 10 | Thread pool for flush data to local file | | rss.server.flush.hadoop.threadPool.size | 60 | Thread pool for flush data to hadoop storage | @@ -104,7 +104,7 @@ This document will introduce how to deploy Uniffle shuffle servers. | rss.server.max.concurrency.of.per-partition.write | 30 | The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition. | | rss.server.max.concurrency.limit.of.per-partition.write | - | The limit for max concurrency per-partition write specified by client, this won't be enabled by default. | | rss.metrics.reporter.class | - | The class of metrics reporter. | -| rss.server.hybrid.storage.manager.selector.class | org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage. | +| rss.server.hybrid.storage.manager.selector.class | org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage. | | rss.server.disk-capacity.watermark.check.enabled | false | If it is co-located with other services, the high-low watermark check based on the uniffle used is not correct. Due to this, the whole disk capacity watermark check is necessary, which will reuse the current watermark value. It will be disabled by default. | ### Advanced Configurations diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index e72510eaeb..9ea2e84f21 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -42,7 +42,7 @@ public class ShuffleServerConf extends RssBaseConf { .doubleType() .defaultValue(0.6) .withDescription( - "JVM heap size * ratio for the maximum memory of buffer manager for shuffle server, this " + "JVM heap size or off-heap size(when enabling Netty) * ratio for the maximum memory of buffer manager for shuffle server, this " + "is only effective when `rss.server.buffer.capacity` is not explicitly set"); public static final ConfigOption SERVER_READ_BUFFER_CAPACITY = @@ -56,7 +56,7 @@ public class ShuffleServerConf extends RssBaseConf { .doubleType() .defaultValue(0.2) .withDescription( - "JVM heap size * ratio for read buffer size, this is only effective when " + "JVM heap size or off-heap size(when enabling Netty) * ratio for read buffer size, this is only effective when " + "`rss.server.reader.buffer.capacity.ratio` is not explicitly set"); public static final ConfigOption SERVER_HEARTBEAT_DELAY = diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index ceca592113..4d42b0576f 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -98,7 +98,12 @@ public ShuffleBufferManager( this.readCapacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY); if (this.readCapacity < 0) { this.readCapacity = - (long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO)); + nettyServerEnabled + ? (long) + (NettyUtils.getMaxDirectMemory() + * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO)) + : (long) + (heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO)); } LOG.info( "Init shuffle buffer manager with capacity: {}, read buffer capacity: {}.", diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index ac8973ecc8..2e0c070e9a 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -24,6 +24,8 @@ import com.google.common.collect.Lists; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,7 +255,7 @@ public void handleGetMemoryShuffleDataRequest( .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime); } } - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); StatusCode status = StatusCode.SUCCESS; String msg = "OK"; GetMemoryShuffleDataResponse response; @@ -262,8 +264,9 @@ public void handleGetMemoryShuffleDataRequest( // todo: if can get the exact memory size? if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize)) { + ShuffleDataResult shuffleDataResult = null; try { - ShuffleDataResult shuffleDataResult = + shuffleDataResult = shuffleServer .getShuffleTaskManager() .getInMemoryShuffleData( @@ -281,19 +284,18 @@ public void handleGetMemoryShuffleDataRequest( ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size()); ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size()); } - long costTime = System.currentTimeMillis() - start; - shuffleServer - .getNettyMetrics() - .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(), costTime); - LOG.info( - "Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" + " data for {}", - costTime, - data.size(), - requestInfo); - response = new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, bufferSegments, data); + ReleaseMemoryAndRecordReadTimeListener listener = + new ReleaseMemoryAndRecordReadTimeListener( + start, readBufferSize, data.size(), requestInfo, req, client); + client.getChannel().writeAndFlush(response).addListener(listener); + return; } catch (Exception e) { + shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize); + if (shuffleDataResult != null) { + shuffleDataResult.release(); + } status = StatusCode.INTERNAL_ERROR; msg = "Error happened when get in memory shuffle data for " @@ -304,8 +306,6 @@ public void handleGetMemoryShuffleDataRequest( response = new GetMemoryShuffleDataResponse( req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER); - } finally { - shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize); } } else { status = StatusCode.INTERNAL_ERROR; @@ -348,9 +348,10 @@ public void handleGetLocalShuffleIndexRequest( .getShuffleServerConf() .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT); if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize)) { + ShuffleIndexResult shuffleIndexResult = null; try { final long start = System.currentTimeMillis(); - ShuffleIndexResult shuffleIndexResult = + shuffleIndexResult = shuffleServer .getShuffleTaskManager() .getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); @@ -361,13 +362,16 @@ public void handleGetLocalShuffleIndexRequest( response = new GetLocalShuffleIndexResponse( req.getRequestId(), status, msg, data, shuffleIndexResult.getDataFileLen()); - long readTime = System.currentTimeMillis() - start; - LOG.info( - "Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}", - readTime, - data.size(), - requestInfo); + ReleaseMemoryAndRecordReadTimeListener listener = + new ReleaseMemoryAndRecordReadTimeListener( + start, assumedFileSize, data.size(), requestInfo, req, client); + client.getChannel().writeAndFlush(response).addListener(listener); + return; } catch (FileNotFoundException indexFileNotFoundException) { + shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize); + if (shuffleIndexResult != null) { + shuffleIndexResult.release(); + } LOG.warn( "Index file for {} is not found, maybe the data has been flushed to cold storage.", requestInfo, @@ -376,14 +380,16 @@ public void handleGetLocalShuffleIndexRequest( new GetLocalShuffleIndexResponse( req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L); } catch (Exception e) { + shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize); + if (shuffleIndexResult != null) { + shuffleIndexResult.release(); + } status = StatusCode.INTERNAL_ERROR; msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage(); LOG.error(msg, e); response = new GetLocalShuffleIndexResponse( req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L); - } finally { - shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize); } } else { status = StatusCode.INTERNAL_ERROR; @@ -418,7 +424,6 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat StatusCode status = StatusCode.SUCCESS; String msg = "OK"; GetLocalShuffleDataResponse response; - ShuffleDataResult sdr; String requestInfo = "appId[" + appId @@ -426,11 +431,9 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat + shuffleId + "], partitionId[" + partitionId - + "]" - + "offset[" + + "], offset[" + offset - + "]" - + "length[" + + "], length[" + length + "]"; @@ -445,8 +448,9 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat } if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) { + ShuffleDataResult sdr = null; try { - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); sdr = shuffleServer .getShuffleTaskManager() @@ -459,29 +463,27 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat storageType, offset, length); - long readTime = System.currentTimeMillis() - start; - ShuffleServerMetrics.counterTotalReadTime.inc(readTime); ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength()); ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength()); - shuffleServer - .getNettyMetrics() - .recordProcessTime(GetLocalShuffleDataRequest.class.getName(), readTime); - LOG.info( - "Successfully getShuffleData cost {} ms for shuffle" + " data with {}", - readTime, - requestInfo); response = new GetLocalShuffleDataResponse( req.getRequestId(), status, msg, sdr.getManagedBuffer()); + ReleaseMemoryAndRecordReadTimeListener listener = + new ReleaseMemoryAndRecordReadTimeListener( + start, length, sdr.getDataLength(), requestInfo, req, client); + client.getChannel().writeAndFlush(response).addListener(listener); + return; } catch (Exception e) { + shuffleServer.getShuffleBufferManager().releaseReadMemory(length); + if (sdr != null) { + sdr.release(); + } status = StatusCode.INTERNAL_ERROR; msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage(); LOG.error(msg, e); response = new GetLocalShuffleDataResponse( req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER)); - } finally { - shuffleServer.getShuffleBufferManager().releaseReadMemory(length); } } else { status = StatusCode.INTERNAL_ERROR; @@ -522,4 +524,89 @@ private ShufflePartitionedBlock[] toPartitionedBlock(List bloc } return ret; } + + class ReleaseMemoryAndRecordReadTimeListener implements ChannelFutureListener { + private final long readStartedTime; + private final long readBufferSize; + private final long dataSize; + private final String requestInfo; + private final RequestMessage request; + private final TransportClient client; + + ReleaseMemoryAndRecordReadTimeListener( + long readStartedTime, + long readBufferSize, + long dataSize, + String requestInfo, + RequestMessage request, + TransportClient client) { + this.readStartedTime = readStartedTime; + this.readBufferSize = readBufferSize; + this.dataSize = dataSize; + this.requestInfo = requestInfo; + this.request = request; + this.client = client; + } + + @Override + public void operationComplete(ChannelFuture future) { + shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize); + long readTime = System.currentTimeMillis() - readStartedTime; + ShuffleServerMetrics.counterTotalReadTime.inc(readTime); + shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), readTime); + if (!future.isSuccess()) { + Throwable cause = future.cause(); + String errorMsg = + "Error happened when executing " + + request.getOperationType() + + " for " + + requestInfo + + ", " + + cause.getMessage(); + LOG.error(errorMsg, future.cause()); + RpcResponse errorResponse; + if (request instanceof GetLocalShuffleDataRequest) { + errorResponse = + new GetLocalShuffleDataResponse( + request.getRequestId(), + StatusCode.INTERNAL_ERROR, + errorMsg, + new NettyManagedBuffer(Unpooled.EMPTY_BUFFER)); + } else if (request instanceof GetLocalShuffleIndexRequest) { + errorResponse = + new GetLocalShuffleIndexResponse( + request.getRequestId(), + StatusCode.INTERNAL_ERROR, + errorMsg, + Unpooled.EMPTY_BUFFER, + 0L); + } else if (request instanceof GetMemoryShuffleDataRequest) { + errorResponse = + new GetMemoryShuffleDataResponse( + request.getRequestId(), + StatusCode.INTERNAL_ERROR, + errorMsg, + Lists.newArrayList(), + Unpooled.EMPTY_BUFFER); + } else { + LOG.error("Cannot handle request {}", request.type()); + return; + } + client.getChannel().writeAndFlush(errorResponse); + LOG.error( + "Failed to execute {} for {}. Took {} ms and could not retrieve {} bytes of data", + request.getOperationType(), + requestInfo, + readTime, + dataSize); + } else { + LOG.info( + "Successfully executed {} for {}. Took {} ms and retrieved {} bytes of data", + request.getOperationType(), + requestInfo, + readTime, + dataSize); + } + } + } }