From 1f9e43a60666364085c0567b2a5cdc4e260f4e3e Mon Sep 17 00:00:00 2001 From: Jinzhong Li Date: Fri, 10 May 2024 12:53:49 +0800 Subject: [PATCH] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed --- .../AsyncExecutionController.java | 2 +- .../asyncprocessing/StateExecutor.java | 7 +- .../runtime/state/AsyncKeyedStateBackend.java | 3 + .../AsyncExecutionControllerTest.java | 86 ++++++++++++++----- .../state/forst/ForStKeyedStateBackend.java | 44 +++++++++- 5 files changed, 116 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 693ad8753f1d5..d06888ab6c40d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -90,7 +90,7 @@ public class AsyncExecutionController implements StateRequestHandler { private final StateFutureFactory stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ - final StateExecutor stateExecutor; + private final StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ RecordContext currentContext; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java index caf0f504d948b..bbc506e1b1c72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java @@ -22,7 +22,12 @@ import java.util.concurrent.CompletableFuture; -/** Executor for executing batch {@link StateRequest}s. */ +/** + * Executor for executing batch {@link StateRequest}s. + * + *

Notice that the owner who create the {@code StateExecutor} is responsible for shutting down it + * when it is no longer in use. + */ @Internal public interface StateExecutor { /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index 70cdfbef767c9..3f49984bd7e25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -58,6 +58,9 @@ public interface AsyncKeyedStateBackend extends Disposable, Closeable { * Creates a {@code StateExecutor} which supports to execute a batch of state requests * asynchronously. * + *

Notice that the {@code AsyncKeyedStateBackend} is responsible for shutting down the + * StateExecutors created by itself when they are no longer in use. + * * @return a {@code StateExecutor} which supports to execute a batch of state requests * asynchronously. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index c40edbf3940ee..de2bbc7329447 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; @@ -39,6 +40,7 @@ import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; @@ -73,7 +75,9 @@ void setup( long timeout, int maxInFlight, MailboxExecutor mailboxExecutor, - AsyncFrameworkExceptionHandler exceptionHandler) { + AsyncFrameworkExceptionHandler exceptionHandler, + CloseableRegistry closeableRegistry) + throws IOException { StateExecutor stateExecutor = new TestStateExecutor(); ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("test-value-state", BasicTypeInfo.INT_TYPE_INFO); @@ -88,6 +92,8 @@ void setup( } catch (Exception e) { throw new RuntimeException(e); } + closeableRegistry.registerCloseable(asyncKeyedStateBackend); + closeableRegistry.registerCloseable(asyncKeyedStateBackend::dispose); aec = new AsyncExecutionController<>( mailboxExecutor, @@ -109,13 +115,15 @@ void setup( } @Test - void testBasicRun() { + void testBasicRun() throws IOException { + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( 100, 10000L, 1000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); // ============================ element1 ============================ String record1 = "key1-r1"; String key1 = "key1"; @@ -220,16 +228,20 @@ void testBasicRun() { assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); assertThat(output.get()).isEqualTo(1); assertThat(recordContext4.getReferenceCount()).isEqualTo(0); + + resourceRegistry.close(); } @Test - void testRecordsRunInOrder() { + void testRecordsRunInOrder() throws IOException { + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( 100, 10000L, 1000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); // Record1 and record3 have the same key, record2 has a different key. // Record2 should be processed before record3. @@ -284,18 +296,22 @@ void testRecordsRunInOrder() { assertThat(output.get()).isEqualTo(2); assertThat(recordContext3.getReferenceCount()).isEqualTo(0); assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); + + resourceRegistry.close(); } @Test - void testInFlightRecordControl() { + void testInFlightRecordControl() throws IOException { int batchSize = 5; int maxInFlight = 10; + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( batchSize, 10000L, maxInFlight, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); // For records with different keys, the in-flight records is controlled by batch size. for (int round = 0; round < 10; round++) { for (int i = 0; i < batchSize; i++) { @@ -334,16 +350,20 @@ void testInFlightRecordControl() { assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(maxInFlight); } + + resourceRegistry.close(); } @Test - public void testSyncPoint() { + public void testSyncPoint() throws IOException { + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( 1000, 10000L, 6000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); AtomicInteger counter = new AtomicInteger(0); // Test the sync point processing without a key occupied. @@ -384,18 +404,22 @@ public void testSyncPoint() { assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); recordContext2.release(); + + resourceRegistry.close(); } @Test - void testBufferTimeout() { + void testBufferTimeout() throws IOException { int batchSize = 5; int timeout = 1000; + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( batchSize, timeout, 1000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); ManuallyTriggeredScheduledExecutorService scheduledExecutor = new ManuallyTriggeredScheduledExecutorService(); aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor; @@ -457,18 +481,22 @@ void testBufferTimeout() { assertThat(scheduledFuture.isDone()).isTrue(); assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2); assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1); + + resourceRegistry.close(); } @Test - void testBufferTimeoutSkip() { + void testBufferTimeoutSkip() throws IOException { int batchSize = 3; int timeout = 1000; + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( batchSize, timeout, 1000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); ManuallyTriggeredScheduledExecutorService scheduledExecutor = new ManuallyTriggeredScheduledExecutorService(); aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor; @@ -526,14 +554,17 @@ void testBufferTimeoutSkip() { assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2); assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1); assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue(); + + resourceRegistry.close(); } @Test - void testUserCodeException() { + void testUserCodeException() throws IOException { TestAsyncFrameworkExceptionHandler exceptionHandler = new TestAsyncFrameworkExceptionHandler(); TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(false); - setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler); + CloseableRegistry resourceRegistry = new CloseableRegistry(); + setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler, resourceRegistry); Runnable userCode = () -> { valueState @@ -562,14 +593,17 @@ void testUserCodeException() { .isEqualTo("Artificial exception in user code"); assertThat(exceptionHandler.exception).isNull(); assertThat(exceptionHandler.message).isNull(); + + resourceRegistry.close(); } @Test - void testFrameworkException() { + void testFrameworkException() throws IOException { TestAsyncFrameworkExceptionHandler exceptionHandler = new TestAsyncFrameworkExceptionHandler(); TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(true); - setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler); + CloseableRegistry resourceRegistry = new CloseableRegistry(); + setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler, resourceRegistry); Runnable userCode = () -> valueState.asyncValue().thenAccept(val -> {}); String record = "record"; String key = "key"; @@ -588,16 +622,20 @@ void testFrameworkException() { .isEqualTo("java.lang.RuntimeException: Fail to execute."); assertThat(exceptionHandler.message) .isEqualTo("Caught exception when submitting StateFuture's callback."); + + resourceRegistry.close(); } @Test - void testEpochManager() { + void testEpochManager() throws Exception { + CloseableRegistry resourceRegistry = new CloseableRegistry(); setup( 1000, 10000, 6000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); AtomicInteger output = new AtomicInteger(0); Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> output.incrementAndGet()); @@ -622,10 +660,13 @@ void testEpochManager() { assertThat(output.get()).isEqualTo(3); // SERIAL_BETWEEN_EPOCH mode would drain in-flight records on non-record arriving. assertThat(epoch1.ongoingRecordCount).isEqualTo(0); + + resourceRegistry.close(); } @Test - void testMixEpochMode() { + void testMixEpochMode() throws Exception { + CloseableRegistry resourceRegistry = new CloseableRegistry(); // epoch1(parallel mode) -> epoch2(parallel mode) -> epoch3(serial mode), // when epoch2 close, epoch1 is still in-flight. // when epoch3 close, all in-flight records should drain, epoch1 and epoch2 should finish. @@ -634,7 +675,8 @@ void testMixEpochMode() { 10000, 6000, new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler()); + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); AtomicInteger output = new AtomicInteger(0); Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> output.incrementAndGet()); @@ -678,6 +720,8 @@ void testMixEpochMode() { assertThat(epoch2.ongoingRecordCount).isEqualTo(0); assertThat(epoch3.ongoingRecordCount).isEqualTo(0); assertThat(output.get()).isEqualTo(6); + + resourceRegistry.close(); } /** Simulate the underlying state that is actually used to execute the request. */ diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 292cbd835fca7..5192536a8dc7d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -38,10 +39,13 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.HashSet; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; @@ -91,6 +95,17 @@ public class ForStKeyedStateBackend implements AsyncKeyedStateBackend { /** Handler to handle state request. */ private StateRequestHandler stateRequestHandler; + /** Lock guarding the {@code managedStateExecutors} and {@code disposed}. */ + private final Object lock = new Object(); + + /** The StateExecutors which are managed by this ForStKeyedStateBackend. */ + @GuardedBy("lock") + private final Set managedStateExecutors; + + /** The flag indicating whether ForStKeyedStateBackend is closed. */ + @GuardedBy("lock") + private boolean closed = false; + // mark whether this backend is already disposed and prevent duplicate disposing private boolean disposed = false; @@ -113,6 +128,7 @@ public ForStKeyedStateBackend( this.columnFamilyOptionsFactory = columnFamilyOptionsFactory; this.defaultColumnFamily = defaultColumnFamilyHandle; this.nativeMetricMonitor = nativeMetricMonitor; + this.managedStateExecutors = new HashSet<>(1); } @Override @@ -147,8 +163,17 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { - // TODO: Make io parallelism configurable - return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); + synchronized (lock) { + if (closed) { + throw new FlinkRuntimeException( + "Attempt to create StateExecutor after ForStKeyedStateBackend is disposed."); + } + // TODO: Make io parallelism configurable + StateExecutor stateExecutor = + new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); + managedStateExecutors.add(stateExecutor); + return stateExecutor; + } } /** Should only be called by one thread, and only after all accesses to the DB happened. */ @@ -157,6 +182,11 @@ public void dispose() { if (this.disposed) { return; } + synchronized (lock) { + if (!closed) { + IOUtils.closeQuietly(this); + } + } // IMPORTANT: null reference to signal potential async checkpoint workers that the db was // disposed, as @@ -207,6 +237,14 @@ URI getRemoteBasePath() { @Override public void close() throws IOException { - // do nothing currently, native resources will be release in dispose method + synchronized (lock) { + if (closed) { + return; + } + closed = true; + for (StateExecutor executor : managedStateExecutors) { + executor.shutdown(); + } + } } }