diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java index 1a5bbae4b547f..72ef41d381f7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java @@ -221,14 +221,22 @@ Queue allocateBuffers() throws Exception { } while (System.nanoTime() < timeoutTime || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime())); - if (numRequestedBuffers <= 0) { - throw new TimeoutException( - String.format( - "Buffer request timeout, this means there is a fierce contention of" - + " the batch shuffle read memory, please increase '%s'.", - TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); - } - return new ArrayDeque<>(); + // This is a safe net against potential deadlocks. + // + // A deadlock can happen when the downstream task needs to consume multiple result + // partitions (e.g., A and B) in specific order (cannot consume B before finishing + // consuming A). Since the reading buffer pool is shared across the TM, if B happens to + // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks + // B from being consumed and releasing the buffers. + // + // The imperfect solution here is to fail all the subpartitionReaders (A), which + // consequently fail all the downstream tasks, unregister their other + // subpartitionReaders (B) and release the read buffers. + throw new TimeoutException( + String.format( + "Buffer request timeout, this means there is a fierce contention of" + + " the batch shuffle read memory, please increase '%s'.", + TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); } private long getBufferRequestTimeoutTime() { @@ -427,7 +435,17 @@ private void mayTriggerReading() { && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) { isRunning = true; - ioExecutor.execute(this); + ioExecutor.execute( + () -> { + try { + run(); + } catch (Throwable throwable) { + // handle un-expected exception as unhandledExceptionHandler is not + // worked for ScheduledExecutorService. + FatalExitExceptionHandler.INSTANCE.uncaughtException( + Thread.currentThread(), throwable); + } + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index dc898dacfb83e..5e595f42d3e8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -260,25 +260,22 @@ private Queue allocateBuffers() throws Exception { } while (System.nanoTime() < timeoutTime || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime())); - if (numRequestedBuffers <= 0) { - // This is a safe net against potential deadlocks. - // - // A deadlock can happen when the downstream task needs to consume multiple result - // partitions (e.g., A and B) in specific order (cannot consume B before finishing - // consuming A). Since the reading buffer pool is shared across the TM, if B happens to - // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks - // B from being consumed and releasing the buffers. - // - // The imperfect solution here is to fail all the subpartitionReaders (A), which - // consequently fail all the downstream tasks, unregister their other - // subpartitionReaders (B) and release the read buffers. - throw new TimeoutException( - String.format( - "Buffer request timeout, this means there is a fierce contention of" - + " the batch shuffle read memory, please increase '%s'.", - TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); - } - return new ArrayDeque<>(); + // This is a safe net against potential deadlocks. + // + // A deadlock can happen when the downstream task needs to consume multiple result + // partitions (e.g., A and B) in specific order (cannot consume B before finishing + // consuming A). Since the reading buffer pool is shared across the TM, if B happens to + // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks + // B from being consumed and releasing the buffers. + // + // The imperfect solution here is to fail all the subpartitionReaders (A), which + // consequently fail all the downstream tasks, unregister their other + // subpartitionReaders (B) and release the read buffers. + throw new TimeoutException( + String.format( + "Buffer request timeout, this means there is a fierce contention of" + + " the batch shuffle read memory, please increase '%s'.", + TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); } private void mayTriggerReading() { @@ -289,7 +286,17 @@ private void mayTriggerReading() { <= maxRequestedBuffers && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) { isRunning = true; - ioExecutor.execute(this); + ioExecutor.execute( + () -> { + try { + run(); + } catch (Throwable throwable) { + // handle un-expected exception as unhandledExceptionHandler is not + // worked for ScheduledExecutorService. + FatalExitExceptionHandler.INSTANCE.uncaughtException( + Thread.currentThread(), throwable); + } + }); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java index 62c76b9f1ecb0..0d1613e14017e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java @@ -39,7 +39,6 @@ import java.time.Duration; import java.util.ArrayDeque; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -48,6 +47,7 @@ import static org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SortMergeResultPartitionReadScheduler}. */ @@ -260,17 +260,23 @@ void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception { @Test void testRequestBufferTimeout() throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); - List buffers = bufferPool.requestBuffers(); + // avoid auto trigger reading. + ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler( - bufferPool, executor, this, bufferRequestTimeout); + bufferPool, executorService, this, bufferRequestTimeout); + readScheduler.createSubpartitionReader( + new NoOpBufferAvailablityListener(), 0, partitionedFile); + // request and use all buffers of buffer pool. + readScheduler.run(); + assertThat(bufferPool.getAvailableBuffers()).isZero(); long startTimestamp = System.nanoTime(); assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class); long requestDuration = System.nanoTime() - startTimestamp; assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue(); - bufferPool.recycle(buffers); readScheduler.release(); } @@ -282,16 +288,16 @@ void testRequestTimeoutIsRefreshedAndSuccess() throws Exception { SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler( bufferPool, executor, this, bufferRequestTimeout); - SortMergeSubpartitionReader subpartitionReader = - new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader); long startTimestamp = System.nanoTime(); - Queue allocatedBuffers = readScheduler.allocateBuffers(); + Queue allocatedBuffers = new ArrayDeque<>(); + + assertThatCode(() -> allocatedBuffers.addAll(readScheduler.allocateBuffers())) + .doesNotThrowAnyException(); long requestDuration = System.nanoTime() - startTimestamp; assertThat(allocatedBuffers).hasSize(3); assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2); - assertThat(subpartitionReader.getFailureCause()).isNull(); bufferPool.recycle(allocatedBuffers); bufferPool.destroy(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index f2b6591884c99..bb6b1de3fa74c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -222,10 +222,6 @@ void testScheduleReadersOrdered() throws Exception { void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); - // request all buffer first. - bufferPool.requestBuffers(); - assertThat(bufferPool.getAvailableBuffers()).isZero(); - fileDataManager = new HsFileDataManager( bufferPool, @@ -244,11 +240,23 @@ void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception { CompletableFuture cause = new CompletableFuture<>(); reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null)); reader.setFailConsumer((cause::complete)); + reader.setReadBuffersConsumer( + (requestedBuffers, ignore) -> { + assertThat(requestedBuffers).hasSize(bufferPool.getNumTotalBuffers()); + // discard all buffers so that they cannot be recycled. + requestedBuffers.clear(); + }); factory.allReaders.add(reader); + // register a new consumer, this will trigger io scheduler run a round. fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); + // first round run will allocate and use all buffers. ioExecutor.trigger(); + assertThat(bufferPool.getAvailableBuffers()).isZero(); + + // second round run will trigger timeout. + fileDataManager.run(); assertThat(prepareForSchedulingFinished).isCompleted(); assertThat(cause).isCompleted(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java index 0ad1541f69742..f4104e7b869cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -61,8 +62,9 @@ import java.util.List; import java.util.Queue; import java.util.Random; -import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -100,7 +102,17 @@ void before() { new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing"); globalPool = new NetworkBufferPool(totalBuffers, bufferSize); readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize); - readIOExecutor = Executors.newScheduledThreadPool(numThreads); + readIOExecutor = + new ScheduledThreadPoolExecutor( + numThreads, + new ExecutorThreadFactory("test-io-scheduler-thread"), + (ignored, executor) -> { + if (executor.isShutdown()) { + // ignore rejected as shutdown. + } else { + throw new RejectedExecutionException(); + } + }); } @AfterEach