From a27db1e4f10e98f5cf3a2dfd48eb9b288f6c1d40 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Tue, 7 Mar 2023 00:26:58 +0800 Subject: [PATCH 1/3] [hotfix][test] Remove meaningless reader in SortMergeResultPartitionReadSchedulerTest#testRequestTimeoutIsRefreshedAndSuccess. (cherry picked from commit a3068310ba90ad851f660d4efb43ddf364218cdb) --- .../SortMergeResultPartitionReadSchedulerTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java index 62c76b9f1ecb0..546f29dc2a508 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java @@ -48,6 +48,7 @@ import static org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SortMergeResultPartitionReadScheduler}. */ @@ -282,16 +283,16 @@ void testRequestTimeoutIsRefreshedAndSuccess() throws Exception { SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler( bufferPool, executor, this, bufferRequestTimeout); - SortMergeSubpartitionReader subpartitionReader = - new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader); long startTimestamp = System.nanoTime(); - Queue allocatedBuffers = readScheduler.allocateBuffers(); + Queue allocatedBuffers = new ArrayDeque<>(); + + assertThatCode(() -> allocatedBuffers.addAll(readScheduler.allocateBuffers())) + .doesNotThrowAnyException(); long requestDuration = System.nanoTime() - startTimestamp; assertThat(allocatedBuffers).hasSize(3); assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2); - assertThat(subpartitionReader.getFailureCause()).isNull(); bufferPool.recycle(allocatedBuffers); bufferPool.destroy(); From 900cec9c700180b9f8668ea1b328bbf7ccf0cd5f Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Mon, 6 Mar 2023 22:29:43 +0800 Subject: [PATCH 2/3] [hotfix][runtime] IO Scheduler should handle un-expected exceptions. (cherry picked from commit faf663a6298c602d93c351f9971eed8aa8dabb93) --- .../SortMergeResultPartitionReadScheduler.java | 12 +++++++++++- .../partition/hybrid/HsFileDataManager.java | 12 +++++++++++- .../partition/hybrid/HsResultPartitionTest.java | 16 ++++++++++++++-- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java index 1a5bbae4b547f..0f38aca30629a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java @@ -427,7 +427,17 @@ private void mayTriggerReading() { && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) { isRunning = true; - ioExecutor.execute(this); + ioExecutor.execute( + () -> { + try { + run(); + } catch (Throwable throwable) { + // handle un-expected exception as unhandledExceptionHandler is not + // worked for ScheduledExecutorService. + FatalExitExceptionHandler.INSTANCE.uncaughtException( + Thread.currentThread(), throwable); + } + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index dc898dacfb83e..14ac2c84b935e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -289,7 +289,17 @@ private void mayTriggerReading() { <= maxRequestedBuffers && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) { isRunning = true; - ioExecutor.execute(this); + ioExecutor.execute( + () -> { + try { + run(); + } catch (Throwable throwable) { + // handle un-expected exception as unhandledExceptionHandler is not + // worked for ScheduledExecutorService. + FatalExitExceptionHandler.INSTANCE.uncaughtException( + Thread.currentThread(), throwable); + } + }); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java index 0ad1541f69742..f4104e7b869cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -61,8 +62,9 @@ import java.util.List; import java.util.Queue; import java.util.Random; -import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -100,7 +102,17 @@ void before() { new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing"); globalPool = new NetworkBufferPool(totalBuffers, bufferSize); readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize); - readIOExecutor = Executors.newScheduledThreadPool(numThreads); + readIOExecutor = + new ScheduledThreadPoolExecutor( + numThreads, + new ExecutorThreadFactory("test-io-scheduler-thread"), + (ignored, executor) -> { + if (executor.isShutdown()) { + // ignore rejected as shutdown. + } else { + throw new RejectedExecutionException(); + } + }); } @AfterEach From 544736e59d4717fe7f3f0bda47531e70d15d9c13 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Mon, 6 Mar 2023 22:40:53 +0800 Subject: [PATCH 3/3] [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0. This will cause deadlock as read buffer was occupied by a reader with low consumption priority. This closes #22112 (cherry picked from commit 5ad2ae2c24ade2655981f609298978d26329466f) --- ...SortMergeResultPartitionReadScheduler.java | 24 ++++++++----- .../partition/hybrid/HsFileDataManager.java | 35 +++++++++---------- ...MergeResultPartitionReadSchedulerTest.java | 13 ++++--- .../hybrid/HsFileDataManagerTest.java | 16 ++++++--- 4 files changed, 53 insertions(+), 35 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java index 0f38aca30629a..72ef41d381f7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java @@ -221,14 +221,22 @@ Queue allocateBuffers() throws Exception { } while (System.nanoTime() < timeoutTime || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime())); - if (numRequestedBuffers <= 0) { - throw new TimeoutException( - String.format( - "Buffer request timeout, this means there is a fierce contention of" - + " the batch shuffle read memory, please increase '%s'.", - TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); - } - return new ArrayDeque<>(); + // This is a safe net against potential deadlocks. + // + // A deadlock can happen when the downstream task needs to consume multiple result + // partitions (e.g., A and B) in specific order (cannot consume B before finishing + // consuming A). Since the reading buffer pool is shared across the TM, if B happens to + // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks + // B from being consumed and releasing the buffers. + // + // The imperfect solution here is to fail all the subpartitionReaders (A), which + // consequently fail all the downstream tasks, unregister their other + // subpartitionReaders (B) and release the read buffers. + throw new TimeoutException( + String.format( + "Buffer request timeout, this means there is a fierce contention of" + + " the batch shuffle read memory, please increase '%s'.", + TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); } private long getBufferRequestTimeoutTime() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index 14ac2c84b935e..5e595f42d3e8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -260,25 +260,22 @@ private Queue allocateBuffers() throws Exception { } while (System.nanoTime() < timeoutTime || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime())); - if (numRequestedBuffers <= 0) { - // This is a safe net against potential deadlocks. - // - // A deadlock can happen when the downstream task needs to consume multiple result - // partitions (e.g., A and B) in specific order (cannot consume B before finishing - // consuming A). Since the reading buffer pool is shared across the TM, if B happens to - // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks - // B from being consumed and releasing the buffers. - // - // The imperfect solution here is to fail all the subpartitionReaders (A), which - // consequently fail all the downstream tasks, unregister their other - // subpartitionReaders (B) and release the read buffers. - throw new TimeoutException( - String.format( - "Buffer request timeout, this means there is a fierce contention of" - + " the batch shuffle read memory, please increase '%s'.", - TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); - } - return new ArrayDeque<>(); + // This is a safe net against potential deadlocks. + // + // A deadlock can happen when the downstream task needs to consume multiple result + // partitions (e.g., A and B) in specific order (cannot consume B before finishing + // consuming A). Since the reading buffer pool is shared across the TM, if B happens to + // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks + // B from being consumed and releasing the buffers. + // + // The imperfect solution here is to fail all the subpartitionReaders (A), which + // consequently fail all the downstream tasks, unregister their other + // subpartitionReaders (B) and release the read buffers. + throw new TimeoutException( + String.format( + "Buffer request timeout, this means there is a fierce contention of" + + " the batch shuffle read memory, please increase '%s'.", + TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key())); } private void mayTriggerReading() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java index 546f29dc2a508..0d1613e14017e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java @@ -39,7 +39,6 @@ import java.time.Duration; import java.util.ArrayDeque; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -261,17 +260,23 @@ void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception { @Test void testRequestBufferTimeout() throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); - List buffers = bufferPool.requestBuffers(); + // avoid auto trigger reading. + ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler( - bufferPool, executor, this, bufferRequestTimeout); + bufferPool, executorService, this, bufferRequestTimeout); + readScheduler.createSubpartitionReader( + new NoOpBufferAvailablityListener(), 0, partitionedFile); + // request and use all buffers of buffer pool. + readScheduler.run(); + assertThat(bufferPool.getAvailableBuffers()).isZero(); long startTimestamp = System.nanoTime(); assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class); long requestDuration = System.nanoTime() - startTimestamp; assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue(); - bufferPool.recycle(buffers); readScheduler.release(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index f2b6591884c99..bb6b1de3fa74c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -222,10 +222,6 @@ void testScheduleReadersOrdered() throws Exception { void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); - // request all buffer first. - bufferPool.requestBuffers(); - assertThat(bufferPool.getAvailableBuffers()).isZero(); - fileDataManager = new HsFileDataManager( bufferPool, @@ -244,11 +240,23 @@ void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception { CompletableFuture cause = new CompletableFuture<>(); reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null)); reader.setFailConsumer((cause::complete)); + reader.setReadBuffersConsumer( + (requestedBuffers, ignore) -> { + assertThat(requestedBuffers).hasSize(bufferPool.getNumTotalBuffers()); + // discard all buffers so that they cannot be recycled. + requestedBuffers.clear(); + }); factory.allReaders.add(reader); + // register a new consumer, this will trigger io scheduler run a round. fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); + // first round run will allocate and use all buffers. ioExecutor.trigger(); + assertThat(bufferPool.getAvailableBuffers()).isZero(); + + // second round run will trigger timeout. + fileDataManager.run(); assertThat(prepareForSchedulingFinished).isCompleted(); assertThat(cause).isCompleted();