Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.forstdb.FlinkEnv;
import org.forstdb.IndexType;
import org.forstdb.PlainTableConfig;
import org.forstdb.Priority;
import org.forstdb.ReadOptions;
import org.forstdb.Statistics;
import org.forstdb.TableFormatConfig;
Expand Down Expand Up @@ -76,6 +77,8 @@ public final class ForStResourceContainer implements AutoCloseable {
// the filename length limit is 255 on most operating systems
private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length();

@Nullable private FlinkEnv flinkEnv = null;

@Nullable private final Path remoteBasePath;

@Nullable private final Path remoteForStPath;
Expand Down Expand Up @@ -187,7 +190,8 @@ public DBOptions getDbOptions() {
// configured,
// fallback to local directory currently temporarily.
if (remoteForStPath != null) {
opt.setEnv(new FlinkEnv(remoteForStPath.toString(), forstFileSystem));
flinkEnv = new FlinkEnv(remoteForStPath.toString(), forstFileSystem);
opt.setEnv(flinkEnv);
}

return opt;
Expand Down Expand Up @@ -408,6 +412,15 @@ public void close() throws Exception {
sharedResources.close();
}
cleanRelocatedDbLogs();
if (flinkEnv != null) {
// There is something wrong with the FlinkEnv, the background threads won't quit during
// the disposal of DB. We explicit shrink the thread pool here until the ForSt repo
// fixes that.
flinkEnv.setBackgroundThreads(0, Priority.LOW);
flinkEnv.setBackgroundThreads(0, Priority.HIGH);
flinkEnv.close();
flinkEnv = null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

/** Tests for the async keyed state backend part of {@link ForStStateBackend}. */
@ExtendWith(ParameterizedTestExtension.class)
class ForStStateBackendTestV2 extends StateBackendTestV2Base<ForStStateBackend> {
class ForStStateBackendV2Test extends StateBackendTestV2Base<ForStStateBackend> {

@TempDir private static java.nio.file.Path tempFolder;
@TempDir private static java.nio.file.Path tempFolderForForStLocal;
Expand Down Expand Up @@ -94,7 +94,6 @@ protected ConfigurableStateBackend getStateBackend() throws Exception {
if (hasRemoteDir) {
config.set(REMOTE_DIRECTORY, tempFolderForForstRemote.toString());
}
backend.configure(config, Thread.currentThread().getContextClassLoader());
return new ForStStateBackend();
return backend.configure(config, Thread.currentThread().getContextClassLoader());
}
}