From 87b826d246f4e396c92dabfa7510607f1b77e6c4 Mon Sep 17 00:00:00 2001 From: "kevin.cyj" Date: Wed, 7 Apr 2021 14:08:20 +0800 Subject: [PATCH] [FLINK-22127][network] Enrich error message of read buffer request timeout to tell the user how to solve the problem when using sort-merge blocking shuffle --- .../partition/SortMergeResultPartitionReadScheduler.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java index 19c3defa4ff297..be91116ef2cdbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; @@ -163,7 +164,11 @@ private Queue allocateBuffers( } if (numRequestedBuffers <= 0) { - throw new TimeoutException("Buffer request timeout."); + throw new TimeoutException( + String.format( + "Buffer request timeout, this means there is a fierce contention of" + + " the batch shuffle read memory, please increase '%s'.", + TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); } } catch (Throwable throwable) { // fail all pending subpartition readers immediately if any exception occurs