From b66c90c0479c115633a2d110c1fd8233efaaf5f8 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Tue, 3 Dec 2024 22:02:30 +0800 Subject: [PATCH 1/2] [FLINK-36838][state] Join background threads when ForSt state backend quit --- .../flink/state/forst/ForStResourceContainer.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index b807e4d6c4b9b..377496d40c860 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -39,6 +39,7 @@ import org.forstdb.FlinkEnv; import org.forstdb.IndexType; import org.forstdb.PlainTableConfig; +import org.forstdb.Priority; import org.forstdb.ReadOptions; import org.forstdb.Statistics; import org.forstdb.TableFormatConfig; @@ -76,6 +77,8 @@ public final class ForStResourceContainer implements AutoCloseable { // the filename length limit is 255 on most operating systems private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); + @Nullable private FlinkEnv flinkEnv = null; + @Nullable private final Path remoteBasePath; @Nullable private final Path remoteForStPath; @@ -187,7 +190,8 @@ public DBOptions getDbOptions() { // configured, // fallback to local directory currently temporarily. if (remoteForStPath != null) { - opt.setEnv(new FlinkEnv(remoteForStPath.toString(), forstFileSystem)); + flinkEnv = new FlinkEnv(remoteForStPath.toString(), forstFileSystem); + opt.setEnv(flinkEnv); } return opt; @@ -408,6 +412,15 @@ public void close() throws Exception { sharedResources.close(); } cleanRelocatedDbLogs(); + if (flinkEnv != null) { + // There is something wrong with the FlinkEnv, the background threads won't quit during + // the disposal of DB. We explicit shrink the thread pool here until the ForSt repo + // fixes that. + flinkEnv.setBackgroundThreads(0, Priority.LOW); + flinkEnv.setBackgroundThreads(0, Priority.HIGH); + flinkEnv.close(); + flinkEnv = null; + } } /** From e0c75823b7f34c58e2ded5da99a0663bede0f0ab Mon Sep 17 00:00:00 2001 From: Zakelly Date: Tue, 3 Dec 2024 22:03:20 +0800 Subject: [PATCH 2/2] [hotfix] Use `FlinkEnv` in tests for ForStStateBackend --- ...tStateBackendTestV2.java => ForStStateBackendV2Test.java} | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) rename flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/{ForStStateBackendTestV2.java => ForStStateBackendV2Test.java} (95%) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java similarity index 95% rename from flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java rename to flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java index 413f81532ff89..ff77675d05bf0 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTestV2.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java @@ -43,7 +43,7 @@ /** Tests for the async keyed state backend part of {@link ForStStateBackend}. */ @ExtendWith(ParameterizedTestExtension.class) -class ForStStateBackendTestV2 extends StateBackendTestV2Base { +class ForStStateBackendV2Test extends StateBackendTestV2Base { @TempDir private static java.nio.file.Path tempFolder; @TempDir private static java.nio.file.Path tempFolderForForStLocal; @@ -94,7 +94,6 @@ protected ConfigurableStateBackend getStateBackend() throws Exception { if (hasRemoteDir) { config.set(REMOTE_DIRECTORY, tempFolderForForstRemote.toString()); } - backend.configure(config, Thread.currentThread().getContextClassLoader()); - return new ForStStateBackend(); + return backend.configure(config, Thread.currentThread().getContextClassLoader()); } }