Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,22 @@ Queue<MemorySegment> allocateBuffers() throws Exception {
} while (System.nanoTime() < timeoutTime
|| System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));

if (numRequestedBuffers <= 0) {
throw new TimeoutException(
String.format(
"Buffer request timeout, this means there is a fierce contention of"
+ " the batch shuffle read memory, please increase '%s'.",
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
}
return new ArrayDeque<>();
// This is a safe net against potential deadlocks.
//
// A deadlock can happen when the downstream task needs to consume multiple result
// partitions (e.g., A and B) in specific order (cannot consume B before finishing
// consuming A). Since the reading buffer pool is shared across the TM, if B happens to
// take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
// B from being consumed and releasing the buffers.
//
// The imperfect solution here is to fail all the subpartitionReaders (A), which
// consequently fail all the downstream tasks, unregister their other
// subpartitionReaders (B) and release the read buffers.
throw new TimeoutException(
String.format(
"Buffer request timeout, this means there is a fierce contention of"
+ " the batch shuffle read memory, please increase '%s'.",
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
}

private long getBufferRequestTimeoutTime() {
Expand Down Expand Up @@ -427,7 +435,17 @@ private void mayTriggerReading() {
&& numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers
&& numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) {
isRunning = true;
ioExecutor.execute(this);
ioExecutor.execute(
() -> {
try {
run();
} catch (Throwable throwable) {
// handle un-expected exception as unhandledExceptionHandler is not
// worked for ScheduledExecutorService.
FatalExitExceptionHandler.INSTANCE.uncaughtException(
Thread.currentThread(), throwable);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,25 +260,22 @@ private Queue<MemorySegment> allocateBuffers() throws Exception {
} while (System.nanoTime() < timeoutTime
|| System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));

if (numRequestedBuffers <= 0) {
// This is a safe net against potential deadlocks.
//
// A deadlock can happen when the downstream task needs to consume multiple result
// partitions (e.g., A and B) in specific order (cannot consume B before finishing
// consuming A). Since the reading buffer pool is shared across the TM, if B happens to
// take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
// B from being consumed and releasing the buffers.
//
// The imperfect solution here is to fail all the subpartitionReaders (A), which
// consequently fail all the downstream tasks, unregister their other
// subpartitionReaders (B) and release the read buffers.
throw new TimeoutException(
String.format(
"Buffer request timeout, this means there is a fierce contention of"
+ " the batch shuffle read memory, please increase '%s'.",
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
}
return new ArrayDeque<>();
// This is a safe net against potential deadlocks.
//
// A deadlock can happen when the downstream task needs to consume multiple result
// partitions (e.g., A and B) in specific order (cannot consume B before finishing
// consuming A). Since the reading buffer pool is shared across the TM, if B happens to
// take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
// B from being consumed and releasing the buffers.
//
// The imperfect solution here is to fail all the subpartitionReaders (A), which
// consequently fail all the downstream tasks, unregister their other
// subpartitionReaders (B) and release the read buffers.
throw new TimeoutException(
String.format(
"Buffer request timeout, this means there is a fierce contention of"
+ " the batch shuffle read memory, please increase '%s'.",
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
}

private void mayTriggerReading() {
Expand All @@ -289,7 +286,17 @@ private void mayTriggerReading() {
<= maxRequestedBuffers
&& numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) {
isRunning = true;
ioExecutor.execute(this);
ioExecutor.execute(
() -> {
try {
run();
} catch (Throwable throwable) {
// handle un-expected exception as unhandledExceptionHandler is not
// worked for ScheduledExecutorService.
FatalExitExceptionHandler.INSTANCE.uncaughtException(
Thread.currentThread(), throwable);
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
Expand All @@ -48,6 +47,7 @@

import static org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link SortMergeResultPartitionReadScheduler}. */
Expand Down Expand Up @@ -260,17 +260,23 @@ void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
@Test
void testRequestBufferTimeout() throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);
List<MemorySegment> buffers = bufferPool.requestBuffers();
// avoid auto trigger reading.
ManuallyTriggeredScheduledExecutorService executorService =
new ManuallyTriggeredScheduledExecutorService();
SortMergeResultPartitionReadScheduler readScheduler =
new SortMergeResultPartitionReadScheduler(
bufferPool, executor, this, bufferRequestTimeout);
bufferPool, executorService, this, bufferRequestTimeout);
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0, partitionedFile);
// request and use all buffers of buffer pool.
readScheduler.run();

assertThat(bufferPool.getAvailableBuffers()).isZero();
long startTimestamp = System.nanoTime();
assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
long requestDuration = System.nanoTime() - startTimestamp;
assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();

bufferPool.recycle(buffers);
readScheduler.release();
}

Expand All @@ -282,16 +288,16 @@ void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
SortMergeResultPartitionReadScheduler readScheduler =
new SortMergeResultPartitionReadScheduler(
bufferPool, executor, this, bufferRequestTimeout);
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);

long startTimestamp = System.nanoTime();
Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers();
Queue<MemorySegment> allocatedBuffers = new ArrayDeque<>();

assertThatCode(() -> allocatedBuffers.addAll(readScheduler.allocateBuffers()))
.doesNotThrowAnyException();
long requestDuration = System.nanoTime() - startTimestamp;

assertThat(allocatedBuffers).hasSize(3);
assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2);
assertThat(subpartitionReader.getFailureCause()).isNull();

bufferPool.recycle(allocatedBuffers);
bufferPool.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,6 @@ void testScheduleReadersOrdered() throws Exception {
void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);

// request all buffer first.
bufferPool.requestBuffers();
assertThat(bufferPool.getAvailableBuffers()).isZero();

fileDataManager =
new HsFileDataManager(
bufferPool,
Expand All @@ -244,11 +240,23 @@ void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception {
CompletableFuture<Throwable> cause = new CompletableFuture<>();
reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null));
reader.setFailConsumer((cause::complete));
reader.setReadBuffersConsumer(
(requestedBuffers, ignore) -> {
assertThat(requestedBuffers).hasSize(bufferPool.getNumTotalBuffers());
// discard all buffers so that they cannot be recycled.
requestedBuffers.clear();
});
factory.allReaders.add(reader);

// register a new consumer, this will trigger io scheduler run a round.
fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation);

// first round run will allocate and use all buffers.
ioExecutor.trigger();
assertThat(bufferPool.getAvailableBuffers()).isZero();

// second round run will trigger timeout.
fileDataManager.run();

assertThat(prepareForSchedulingFinished).isCompleted();
assertThat(cause).isCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -61,8 +62,9 @@
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -100,7 +102,17 @@ void before() {
new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing");
globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
readBufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
readIOExecutor = Executors.newScheduledThreadPool(numThreads);
readIOExecutor =
new ScheduledThreadPoolExecutor(
numThreads,
new ExecutorThreadFactory("test-io-scheduler-thread"),
(ignored, executor) -> {
if (executor.isShutdown()) {
// ignore rejected as shutdown.
} else {
throw new RejectedExecutionException();
}
});
}

@AfterEach
Expand Down