From a8460bc0873c8a6169c496331040e2b9082c9ba2 Mon Sep 17 00:00:00 2001 From: AlexYinHan Date: Fri, 26 Sep 2025 11:56:15 +0800 Subject: [PATCH] [FLINK-38336][state/forst] Avoid data copy during failover for ForSt statebackend --- .../state/forst/ForStKeyedStateBackend.java | 14 +- .../forst/ForStKeyedStateBackendBuilder.java | 15 +- .../flink/state/forst/ForStPathContainer.java | 147 ++++++++++++++++++ .../state/forst/ForStResourceContainer.java | 120 +++++--------- .../flink/state/forst/ForStStateBackend.java | 52 ++++--- .../DataTransferStrategyBuilder.java | 31 +++- .../datatransfer/ForStStateDataTransfer.java | 10 +- .../ForStIncrementalRestoreOperation.java | 15 +- .../sync/ForStSyncKeyedStateBackend.java | 11 +- .../ForStSyncKeyedStateBackendBuilder.java | 4 +- .../flink/state/forst/ForStExtension.java | 9 +- .../forst/ForStResourceContainerTest.java | 10 +- .../forst/ForStStateBackendConfigTest.java | 27 +++- .../state/forst/ForStStateBackendTest.java | 6 +- .../DataTransferStrategyTest.java | 78 +++++++++- .../ForStStateDataTransferTest.java | 22 ++- 16 files changed, 409 insertions(+), 162 deletions(-) create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 635d29b8c4194..1efde10dfdf4c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -596,18 +596,14 @@ public void dispose() { IOUtils.closeQuietly(db); LOG.info( - "Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.", - optionsContainer.getLocalBasePath(), - optionsContainer.getRemoteBasePath()); + "Closed ForSt State Backend. Cleaning up ForSt: {}.", + optionsContainer.getPathContainer()); try { optionsContainer.clearDirectories(); } catch (Exception ex) { LOG.warn( - "Could not delete ForSt local working directory {}, remote working directory {}.", - optionsContainer.getLocalBasePath(), - optionsContainer.getRemoteBasePath(), - ex); + "Could not delete ForSt: {}.", optionsContainer.getPathContainer(), ex); } IOUtils.closeQuietly(optionsContainer); @@ -624,12 +620,12 @@ public boolean isSafeToReuseKVState() { @VisibleForTesting Path getLocalBasePath() { - return optionsContainer.getLocalBasePath(); + return optionsContainer.getPathContainer().getLocalBasePath(); } @VisibleForTesting Path getRemoteBasePath() { - return optionsContainer.getRemoteBasePath(); + return optionsContainer.getPathContainer().getRemoteBasePath(); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 31473eda4c4cf..d9b2bc3dd58c4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -302,11 +302,7 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { // deletion in file mapping manager. optionsContainer.forceClearRemoteDirectories(); } catch (Exception ex) { - logger.warn( - "Failed to delete ForSt local base path {}, remote base path {}.", - optionsContainer.getLocalBasePath(), - optionsContainer.getRemoteBasePath(), - ex); + logger.warn("Failed to delete ForSt: {}.", optionsContainer.getPathContainer(), ex); } IOUtils.closeQuietly(optionsContainer); IOUtils.closeQuietly(snapshotStrategy); @@ -322,9 +318,8 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { InternalKeyContext keyContext = new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups); logger.info( - "Finished building ForSt keyed state-backend at local base path: {}, remote base path: {}.", - optionsContainer.getLocalBasePath(), - optionsContainer.getRemoteBasePath()); + "Finished building ForSt keyed state-backend at {}", + optionsContainer.getPathContainer()); return new ForStKeyedStateBackend<>( backendUID, executionConfig, @@ -360,8 +355,8 @@ private ForStRestoreOperation getForStRestoreOperation( // working dir. We will implement this in ForStDB later, but before that, we achieved this // by setting the dbPath to "/" when the dfs directory existed. Path instanceForStPath = - optionsContainer.getRemoteForStPath() == null - ? optionsContainer.getLocalForStPath() + optionsContainer.getPathContainer().getRemoteForStPath() == null + ? optionsContainer.getPathContainer().getLocalForStPath() : new Path("/db"); if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java new file mode 100644 index 0000000000000..0535d40ba764f --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.core.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** Container for ForSt paths. */ +public class ForStPathContainer { + + private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class); + public static final String DB_DIR_STRING = "db"; + + @Nullable final Path localJobPath; + @Nullable private final Path localBasePath; + @Nullable private final Path localForStPath; + + @Nullable private final Path remoteJobPath; + @Nullable private final Path remoteBasePath; + @Nullable private final Path remoteForStPath; + + public static ForStPathContainer empty() { + return of(null, null, null, null); + } + + public static ForStPathContainer ofLocal( + @Nullable Path localJobPath, @Nullable Path localBasePath) { + return new ForStPathContainer(localJobPath, localBasePath, null, null); + } + + public static ForStPathContainer of( + @Nullable Path localJobPath, + @Nullable Path localBasePath, + @Nullable Path remoteJobPath, + @Nullable Path remoteBasePath) { + return new ForStPathContainer(localJobPath, localBasePath, remoteJobPath, remoteBasePath); + } + + public ForStPathContainer( + @Nullable Path localJobPath, + @Nullable Path localBasePath, + @Nullable Path remoteJobPath, + @Nullable Path remoteBasePath) { + this.localJobPath = localJobPath; + this.localBasePath = localBasePath; + this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null; + + this.remoteJobPath = remoteJobPath; + this.remoteBasePath = remoteBasePath; + this.remoteForStPath = + remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null; + + LOG.info( + "ForStPathContainer: localJobPath: {}, localBasePath: {}, localForStPath:{}, remoteJobPath: {}, remoteBasePath: {}, remoteForStPath: {}", + localJobPath, + localBasePath, + localForStPath, + remoteJobPath, + remoteBasePath, + remoteForStPath); + } + + public @Nullable Path getLocalJobPath() { + return localJobPath; + } + + public @Nullable Path getLocalBasePath() { + return localBasePath; + } + + public @Nullable Path getLocalForStPath() { + return localForStPath; + } + + public @Nullable Path getRemoteJobPath() { + return remoteJobPath; + } + + public @Nullable Path getRemoteBasePath() { + return remoteBasePath; + } + + public @Nullable Path getRemoteForStPath() { + return remoteForStPath; + } + + public Path getJobPath() { + if (remoteJobPath != null) { + return remoteJobPath; + } else { + return localJobPath; + } + } + + public Path getBasePath() { + if (remoteBasePath != null) { + return remoteBasePath; + } else { + return localBasePath; + } + } + + public Path getDbPath() { + if (remoteForStPath != null) { + return remoteForStPath; + } else { + return localForStPath; + } + } + + @Override + public String toString() { + return "ForStPathContainer(localJobPath = [" + + localJobPath + + "] localBasePath = [" + + localBasePath + + "] localForStPath = [" + + localForStPath + + "] remoteJobPath = [" + + remoteJobPath + + "] remoteBasePath = [" + + remoteBasePath + + "] remoteForStPath = [" + + remoteForStPath + + "])"; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index d4c5a7cc831fc..32a0855733274 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -73,8 +73,6 @@ public final class ForStResourceContainer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class); - public static final String DB_DIR_STRING = "db"; - private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG"; // the filename length limit is 255 on most operating systems @@ -82,16 +80,10 @@ public final class ForStResourceContainer implements AutoCloseable { // and the db data dir's absolute path will be used as the log file name's prefix. private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); - @Nullable private final Path remoteBasePath; - - @Nullable private final Path remoteForStPath; + private final ForStPathContainer pathContainer; private boolean remotePathNewlyCreated; - @Nullable private final Path localBasePath; - - @Nullable private final Path localForStPath; - @Nullable private Path cacheBasePath; private final long cacheCapacity; @@ -130,8 +122,7 @@ public ForStResourceContainer() { new Configuration(), null, null, - null, - null, + ForStPathContainer.empty(), RecoveryClaimMode.DEFAULT, null, null, @@ -144,8 +135,7 @@ public ForStResourceContainer(@Nullable ForStOptionsFactory optionsFactory) { new Configuration(), optionsFactory, null, - null, - null, + ForStPathContainer.empty(), RecoveryClaimMode.DEFAULT, null, null, @@ -160,8 +150,7 @@ public ForStResourceContainer( new Configuration(), optionsFactory, sharedResources, - null, - null, + ForStPathContainer.empty(), RecoveryClaimMode.DEFAULT, null, null, @@ -172,8 +161,7 @@ public ForStResourceContainer( ReadableConfig configuration, @Nullable ForStOptionsFactory optionsFactory, @Nullable OpaqueMemoryResource sharedResources, - @Nullable Path localBasePath, - @Nullable Path remoteBasePath, + ForStPathContainer pathContainer, RecoveryClaimMode claimMode, @Nullable CheckpointStorageAccess checkpointStorageAccess, MetricGroup metricGroup, @@ -183,11 +171,7 @@ public ForStResourceContainer( this.optionsFactory = optionsFactory; this.sharedResources = sharedResources; - this.localBasePath = localBasePath; - this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null; - this.remoteBasePath = remoteBasePath; - this.remoteForStPath = - remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null; + this.pathContainer = pathContainer; this.enableStatistics = enableStatistics; this.handlesToClose = new ArrayList<>(); @@ -228,10 +212,10 @@ public DBOptions getDbOptions() { // TODO: Fallback to checkpoint directory when checkpoint feature is ready if not // configured, // fallback to local directory currently temporarily. - if (remoteForStPath != null) { + if (pathContainer.getRemoteForStPath() != null) { FlinkEnv flinkEnv = new FlinkEnv( - remoteBasePath.toString(), + pathContainer.getRemoteBasePath().toString(), new StringifiedForStFileSystem(forStFileSystem)); opt.setEnv(flinkEnv); handlesToClose.add(flinkEnv); @@ -312,40 +296,16 @@ public ReadOptions getReadOptions() { return opt; } - @Nullable - public Path getLocalBasePath() { - return localBasePath; - } - - @Nullable - public Path getLocalForStPath() { - return localForStPath; - } - - @Nullable - public Path getRemoteBasePath() { - return remoteBasePath; - } - - @Nullable - public Path getRemoteForStPath() { - return remoteForStPath; + public ForStPathContainer getPathContainer() { + return pathContainer; } public Path getBasePath() { - if (remoteBasePath != null) { - return remoteBasePath; - } else { - return localBasePath; - } + return pathContainer.getBasePath(); } public Path getDbPath() { - if (remoteForStPath != null) { - return remoteForStPath; - } else { - return localForStPath; - } + return pathContainer.getDbPath(); } public boolean isCoordinatorInline() { @@ -370,28 +330,31 @@ public int getWriteIoParallelism() { * @throws Exception if any unexpected behaviors. */ public void prepareDirectories() throws Exception { - if (remoteBasePath != null && remoteForStPath != null) { - remotePathNewlyCreated = prepareDirectories(remoteBasePath, remoteForStPath); + if (pathContainer.getRemoteBasePath() != null + && pathContainer.getRemoteForStPath() != null) { + remotePathNewlyCreated = + prepareDirectories( + pathContainer.getRemoteBasePath(), pathContainer.getRemoteForStPath()); } - if (localBasePath != null && localForStPath != null) { - prepareDirectories( - new Path(localBasePath.getPath()), new Path(localForStPath.getPath())); + if (pathContainer.getLocalBasePath() != null && pathContainer.getLocalForStPath() != null) { + prepareDirectories(pathContainer.getLocalBasePath(), pathContainer.getLocalForStPath()); } - if (remoteForStPath != null && localForStPath != null) { - if (cacheBasePath == null && localBasePath != null) { - cacheBasePath = new Path(localBasePath.getPath(), "cache"); + if (pathContainer.getRemoteForStPath() != null + && pathContainer.getLocalForStPath() != null) { + if (cacheBasePath == null && pathContainer.getLocalBasePath() != null) { + cacheBasePath = new Path(pathContainer.getLocalBasePath().getPath(), "cache"); LOG.info( "Cache base path is not configured, set to local base path: {}", cacheBasePath); } forStFileSystem = ForStFlinkFileSystem.get( - remoteForStPath.toUri(), - localForStPath, + pathContainer.getRemoteForStPath().toUri(), + pathContainer.getLocalForStPath(), ForStFlinkFileSystem.getFileBasedCache( configuration, cacheBasePath, - remoteForStPath, + pathContainer.getRemoteForStPath(), cacheCapacity, cacheReservedSize, metricGroup)); @@ -432,17 +395,17 @@ private static boolean prepareDirectories(Path basePath, Path dbPath) throws IOE * @throws Exception if any unexpected behaviors. */ public void clearDirectories() throws Exception { - if (remoteBasePath != null) { - forStFileSystem.delete(remoteBasePath, true); + if (pathContainer.getRemoteBasePath() != null) { + forStFileSystem.delete(pathContainer.getRemoteBasePath(), true); } - if (localBasePath != null) { - clearDirectories(localBasePath); + if (pathContainer.getLocalBasePath() != null) { + clearDirectories(pathContainer.getLocalBasePath()); } } public void forceClearRemoteDirectories() throws Exception { - if (remoteBasePath != null && remotePathNewlyCreated) { - clearDirectories(remoteBasePath); + if (pathContainer.getRemoteBasePath() != null && remotePathNewlyCreated) { + clearDirectories(pathContainer.getRemoteBasePath()); } } @@ -529,9 +492,10 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions) String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR); if (logDir == null || logDir.isEmpty()) { // only relocate db log dir in local mode - if (remoteForStPath == null - && localForStPath != null - && localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { + if (pathContainer.getRemoteForStPath() == null + && pathContainer.getLocalForStPath() != null + && pathContainer.getLocalForStPath().getPath().length() + <= INSTANCE_PATH_LENGTH_LIMIT) { relocateDefaultDbLogDir(currentOptions); } } else { @@ -647,9 +611,10 @@ private void setLocalForStPathAsLogDir(DBOptions dbOptions) { // log dir, which results in "/" being used as the log directory. This often has permission // issues, so the db log dir is temporarily set explicitly here. // TODO: remove this method after ForSt deal log dir well - if (localForStPath != null) { - this.relocatedDbLogBaseDir = java.nio.file.Path.of(localForStPath.toUri().toString()); - dbOptions.setDbLogDir(localForStPath.getPath()); + if (pathContainer.getLocalForStPath() != null) { + this.relocatedDbLogBaseDir = + java.nio.file.Path.of(pathContainer.getLocalForStPath().toUri().toString()); + dbOptions.setDbLogDir(pathContainer.getLocalForStPath().getPath()); } } @@ -666,10 +631,11 @@ private File resolveFileLocation(String logFilePath) { /** Clean all relocated ForSt logs. */ private void cleanRelocatedDbLogs() { - if (localForStPath != null && relocatedDbLogBaseDir != null) { + if (pathContainer.getLocalForStPath() != null && relocatedDbLogBaseDir != null) { LOG.info("Cleaning up relocated ForSt logs: {}.", relocatedDbLogBaseDir); - String relocatedDbLogPrefix = resolveRelocatedDbLogPrefix(localForStPath.getPath()); + String relocatedDbLogPrefix = + resolveRelocatedDbLogPrefix(pathContainer.getLocalForStPath().getPath()); try { Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir)) .filter( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 328ff9eb7094f..e8f51c0c63168 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -418,7 +417,8 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( lazyInitializeForJob(env, fileCompatibleIdentifier); - Tuple2 localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env); + ForStPathContainer pathContainer = + createForStPathContainer(fileCompatibleIdentifier, env, false); final OpaqueMemoryResource sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured( @@ -433,8 +433,7 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( final ForStResourceContainer resourceContainer = createOptionsAndResourceContainer( sharedResources, - localAndRemoteBasePath.f0, - localAndRemoteBasePath.f1, + pathContainer, env.getCheckpointStorageAccess(), parameters.getMetricGroup(), nativeMetricOptions.isStatisticsEnabled()); @@ -490,7 +489,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( lazyInitializeForJob(env, fileCompatibleIdentifier); - Tuple2 localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env); + ForStPathContainer pathContainer = + createForStPathContainer(fileCompatibleIdentifier, env, forceSyncLocal); LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig(); @@ -508,8 +508,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( final ForStResourceContainer resourceContainer = createOptionsAndResourceContainer( sharedResources, - localAndRemoteBasePath.f0, - forceSyncLocal ? null : localAndRemoteBasePath.f1, + pathContainer, env.getCheckpointStorageAccess(), parameters.getMetricGroup(), nativeMetricOptions.isStatisticsEnabled()); @@ -795,25 +794,32 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon return configuration; } - Tuple2 getForStBasePath(String operatorIdentifier, Environment env) { + ForStPathContainer createForStPathContainer( + String operatorIdentifier, Environment env, boolean forceLocal) { String opChildPath = String.format( "op_%s_attempt_%s", operatorIdentifier, env.getTaskInfo().getAttemptNumber()); - Path localBasePath = - new Path( - new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath) - .getAbsolutePath()); + File localJobFile = new File(getNextStoragePath(), jobId.toHexString()); + Path localJobPath = new Path(localJobFile.getPath()); + Path localBasePath = new Path(new File(localJobFile, opChildPath).getAbsolutePath()); + + if (forceLocal) { + return ForStPathContainer.ofLocal(localJobPath, localBasePath); + } + + Path remoteJobPath = null; Path remoteBasePath = null; if (remoteForStDirectory != null) { - remoteBasePath = - new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath); + remoteJobPath = new Path(remoteForStDirectory, jobId.toHexString()); + remoteBasePath = new Path(remoteJobPath, opChildPath); } else if (remoteShareWithCheckpoint) { if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) { - Path sharedStateDirectory = - ((FsCheckpointStorageAccess) env.getCheckpointStorageAccess()) - .getSharedStateDirectory(); + FsCheckpointStorageAccess fsCheckpointStorageAccess = + (FsCheckpointStorageAccess) env.getCheckpointStorageAccess(); + remoteJobPath = fsCheckpointStorageAccess.getCheckpointsDirectory(); + Path sharedStateDirectory = fsCheckpointStorageAccess.getSharedStateDirectory(); remoteBasePath = new Path(sharedStateDirectory, opChildPath); LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath); } else { @@ -821,19 +827,20 @@ Tuple2 getForStBasePath(String operatorIdentifier, Environment env) "Remote ForSt directory can't be set, because checkpoint directory isn't on file system."); } } - return Tuple2.of(localBasePath, remoteBasePath); + + return ForStPathContainer.of(localJobPath, localBasePath, remoteJobPath, remoteBasePath); } @VisibleForTesting ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) { - return createOptionsAndResourceContainer(null, localBasePath, null, null, null, false); + return createOptionsAndResourceContainer( + null, ForStPathContainer.ofLocal(null, localBasePath), null, null, false); } @VisibleForTesting private ForStResourceContainer createOptionsAndResourceContainer( @Nullable OpaqueMemoryResource sharedResources, - @Nullable Path localBasePath, - @Nullable Path remoteBasePath, + ForStPathContainer pathContainer, @Nullable CheckpointStorageAccess checkpointStorageAccess, @Nullable MetricGroup metricGroup, boolean enableStatistics) { @@ -842,8 +849,7 @@ private ForStResourceContainer createOptionsAndResourceContainer( configurableOptions != null ? configurableOptions : new Configuration(), forStOptionsFactory, sharedResources, - localBasePath, - remoteBasePath, + pathContainer, recoveryClaimMode, checkpointStorageAccess, metricGroup, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java index 68d54b177997d..0c7df037996f2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation; +import org.apache.flink.state.forst.ForStPathContainer; import org.apache.flink.state.forst.StateHandleTransferSpec; import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; @@ -122,23 +123,28 @@ private static boolean isDbPathUnderCheckpointPathForSnapshot( public static DataTransferStrategy buildForRestore( @Nullable ForStFlinkFileSystem forStFlinkFileSystem, + ForStPathContainer forStPathContainer, Collection specs, RecoveryClaimMode recoveryClaimMode) { DataTransferStrategy strategy; FileSystem cpSharedFs = getSharedStateFileSystem(specs); + boolean isDbUnderSameJobPathFromRestore = + isDbUnderSameJobPathFromRestore(forStPathContainer, specs); if (forStFlinkFileSystem == null || cpSharedFs == null || !forStFlinkFileSystem.getUri().equals(cpSharedFs.getUri()) - || recoveryClaimMode == RecoveryClaimMode.NO_CLAIM) { + || (!isDbUnderSameJobPathFromRestore + && recoveryClaimMode == RecoveryClaimMode.NO_CLAIM)) { strategy = forStFlinkFileSystem == null ? new CopyDataTransferStrategy() : new CopyDataTransferStrategy(forStFlinkFileSystem); LOG.info( - "Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}", + "Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, isDbUnderSameJobPathFromRestore:{}, recoveryClaimMode:{}", strategy, forStFlinkFileSystem, cpSharedFs, + isDbUnderSameJobPathFromRestore, recoveryClaimMode); return strategy; } @@ -168,4 +174,25 @@ public static DataTransferStrategy buildForRestore( } return null; } + + // Verify if the job path matches the restored path. A match indicates that the job is being + // restored from a failover. + private static boolean isDbUnderSameJobPathFromRestore( + ForStPathContainer forStPathContainer, Collection specs) { + String jobPathStr = forStPathContainer.getJobPath().getPath(); + for (StateHandleTransferSpec spec : specs) { + IncrementalRemoteKeyedStateHandle stateHandle = spec.getStateHandle(); + for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : + stateHandle.getSharedState()) { + StreamStateHandle handle = handleAndLocalPath.getHandle(); + if (handle instanceof FileStateHandle) { + Path dbRemotePath = ((FileStateHandle) handle).getFilePath(); + if (!dbRemotePath.getPath().startsWith(jobPathStr)) { + return false; + } + } + } + } + return true; + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java index 4dfc9c0a91fcd..0305f8cd02e9a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.state.forst.ForStKeyedStateBackend; +import org.apache.flink.state.forst.ForStPathContainer; import org.apache.flink.state.forst.StateHandleTransferSpec; import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; import org.apache.flink.util.ExceptionUtils; @@ -231,6 +232,7 @@ private FileSystem getDbFileSystem() { * @throws Exception If anything about the transfer goes wrong. */ public void transferAllStateDataToDirectory( + ForStPathContainer forStPathContainer, Collection transferSpecs, CloseableRegistry closeableRegistry, RecoveryClaimMode recoveryClaimMode) @@ -244,7 +246,10 @@ public void transferAllStateDataToDirectory( try { List> futures = transferAllStateDataToDirectoryAsync( - transferSpecs, internalCloser, recoveryClaimMode) + forStPathContainer, + transferSpecs, + internalCloser, + recoveryClaimMode) .collect(Collectors.toList()); // Wait until either all futures completed successfully or one failed exceptionally. @@ -276,12 +281,13 @@ public void transferAllStateDataToDirectory( /** Asynchronously runs the specified transfer requests on executorService. */ private Stream> transferAllStateDataToDirectoryAsync( + ForStPathContainer forStPathContainer, Collection transferSpecs, CloseableRegistry closeableRegistry, RecoveryClaimMode recoveryClaimMode) { DataTransferStrategy strategy = DataTransferStrategyBuilder.buildForRestore( - forStFs, transferSpecs, recoveryClaimMode); + forStFs, forStPathContainer, transferSpecs, recoveryClaimMode); return transferSpecs.stream() .flatMap( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index 699da08a53dbe..f205fe21b9f0d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -45,6 +45,7 @@ import org.apache.flink.state.forst.ForStIncrementalCheckpointUtils; import org.apache.flink.state.forst.ForStNativeMetricOptions; import org.apache.flink.state.forst.ForStOperationUtils; +import org.apache.flink.state.forst.ForStPathContainer; import org.apache.flink.state.forst.ForStResourceContainer; import org.apache.flink.state.forst.StateHandleTransferSpec; import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer; @@ -93,7 +94,6 @@ import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange; import static org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial; import static org.apache.flink.state.forst.ForStOperationUtils.createColumnFamilyOptions; -import static org.apache.flink.state.forst.ForStResourceContainer.DB_DIR_STRING; /** Encapsulates the process of restoring a ForSt instance from an incremental snapshot. */ public class ForStIncrementalRestoreOperation implements ForStRestoreOperation { @@ -217,7 +217,7 @@ public ForStRestoreResult restore() throws Exception { toTransferSpecs.add( new StateHandleTransferSpec( restoreStateHandles.get(bestStateHandleForInit), - new Path(forstBasePath, DB_DIR_STRING))); + new Path(forstBasePath, ForStPathContainer.DB_DIR_STRING))); } for (int i = 0; i < restoreStateHandles.size(); i++) { if (i != bestStateHandleForInit) { @@ -268,7 +268,10 @@ private void transferAllStateHandles(List specs) throws ForStStateDataTransfer.DEFAULT_THREAD_NUM, optionsContainer.getFileSystem())) { transfer.transferAllStateDataToDirectory( - specs, cancelStreamRegistry, recoveryClaimMode); + optionsContainer.getPathContainer(), + specs, + cancelStreamRegistry, + recoveryClaimMode); } } @@ -697,7 +700,7 @@ public void mergeStateHandlesWithClipAndIngest( StateHandleTransferSpec baseSpec = new StateHandleTransferSpec( restoreStateHandles.get(bestStateHandleForInit), - new Path(forstBasePath, DB_DIR_STRING)); + new Path(forstBasePath, ForStPathContainer.DB_DIR_STRING)); transferAllStateHandles(Collections.singletonList(baseSpec)); mergeStateHandlesWithCopyFromTemporaryInstance( baseSpec, @@ -751,7 +754,7 @@ public void exportColumnFamilies( String uuid = UUID.randomUUID().toString(); String subPathStr = - optionsContainer.getRemoteBasePath() != null + optionsContainer.getPathContainer().getRemoteBasePath() != null ? exportBasePath.getName() + "/" + uuid : exportBasePath.toString() + "/" + uuid; ExportImportFilesMetaData exportedColumnFamilyMetaData = @@ -953,7 +956,7 @@ private RestoredDBInstance restoreTempDBInstance(StateHandleTransferSpec stateHa new ArrayList<>(stateMetaInfoSnapshots.size() + 1); String dbName = - optionsContainer.getRemoteBasePath() != null + optionsContainer.getPathContainer().getRemoteBasePath() != null ? "/" + stateHandleSpec.getTransferDestination().getName() : stateHandleSpec.getTransferDestination().toString(); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index 2a2705cf8c019..de929111d82d4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -488,18 +488,13 @@ public void dispose() { columnFamilyOptions.forEach(IOUtils::closeQuietly); LOG.info( - "Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.", - optionsContainer.getLocalBasePath(), - optionsContainer.getRemoteBasePath()); + "Closed ForSt State Backend. Cleaning up ForSt: {}.", + optionsContainer.getPathContainer()); try { optionsContainer.clearDirectories(); } catch (Exception ex) { - LOG.warn( - "Could not delete ForSt local working directory {}, remote working directory {}.", - optionsContainer.getLocalBasePath(), - optionsContainer.getRemoteBasePath(), - ex); + LOG.warn("Could not delete ForSt: {}.", optionsContainer.getPathContainer(), ex); } IOUtils.closeQuietly(optionsContainer); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index 07de516c71d97..0e83b22a0ced2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -506,8 +506,8 @@ private ForStRestoreOperation getForStDBRestoreOperation( // working dir. We will implement this in ForStDB later, but before that, we achieved this // by setting the dbPath to "/" when the dfs directory existed. Path instanceForStPath = - optionsContainer.getRemoteForStPath() == null - ? optionsContainer.getLocalForStPath() + optionsContainer.getPathContainer().getRemoteForStPath() == null + ? optionsContainer.getPathContainer().getLocalForStPath() : new Path("/db"); if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java index 6e6c01280d451..85b303b086ff2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java @@ -164,13 +164,14 @@ public void before() throws Exception { resourceGuard = new ResourceGuard(); File localWorkingDir = TempDirUtils.newFolder(rocksFolder.toPath(), "local-working-dir"); + Path localJobPath = new Path(localWorkingDir.getAbsolutePath()); + Path localBasePath = new Path(localJobPath, "base"); this.resourceContainer = new ForStResourceContainer( new Configuration(), optionsFactory, null, - new Path(localWorkingDir.getAbsolutePath()), - null, + ForStPathContainer.ofLocal(localJobPath, localBasePath), null, null, null, @@ -191,8 +192,8 @@ public void before() throws Exception { // by setting the dbPath to "/" when the dfs directory existed. // TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir Path instanceForStPath = - resourceContainer.getRemoteForStPath() == null - ? resourceContainer.getLocalForStPath() + resourceContainer.getPathContainer().getRemoteForStPath() == null + ? resourceContainer.getPathContainer().getLocalForStPath() : new Path("/"); this.columnFamilyHandles = new ArrayList<>(1); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java index fd9ae8ea11177..63d7d1f643bbc 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java @@ -308,15 +308,17 @@ public ColumnFamilyOptions createColumnOptions( @Test public void testDirectoryResources() throws Exception { - Path localBasePath = new Path(TMP_FOLDER.newFolder().getPath()); - Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath()); + Path localJobPath = new Path(TMP_FOLDER.newFolder().getPath()); + Path localBasePath = new Path(localJobPath, "base"); + Path remoteJobPath = new Path(TMP_FOLDER.newFolder().getPath()); + Path remoteBasePath = new Path(remoteJobPath, "base"); try (final ForStResourceContainer optionsContainer = new ForStResourceContainer( new Configuration(), null, null, - localBasePath, - remoteBasePath, + ForStPathContainer.of( + localJobPath, localBasePath, remoteJobPath, remoteBasePath), null, new FsCheckpointStorageAccess( new Path(TMP_FOLDER.newFolder().getPath()), diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java index 78b2a0bd79d16..50f3ac0954bd0 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java @@ -530,7 +530,14 @@ public void testConfigurableOptionsFromConfig() throws Exception { try (ForStResourceContainer optionsContainer = new ForStResourceContainer( - configuration, null, null, null, null, null, null, null, false)) { + configuration, + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(-1, dbOptions.maxOpenFiles()); @@ -615,7 +622,14 @@ public void testConfigurableOptions() throws Exception { configuration.set(ForStConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL); try (final ForStResourceContainer optionsContainer = new ForStResourceContainer( - configuration, null, null, null, null, null, null, null, false)) { + configuration, + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -624,7 +638,14 @@ public void testConfigurableOptions() throws Exception { try (final ForStResourceContainer optionsContainer = new ForStResourceContainer( - new Configuration(), null, null, null, null, null, null, null, false)) { + new Configuration(), + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java index 7c4e58e1ce6ee..b3d8bc8e364d9 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java @@ -130,7 +130,8 @@ void testCreateKeyedStateBackend() throws Exception { ForStStateBackend backend = new ForStStateBackend(); ForStSyncKeyedStateBackend keyedStateBackend1 = (ForStSyncKeyedStateBackend) createKeyedBackend(IntSerializer.INSTANCE); - assertThat(keyedStateBackend1.getOptionsContainer().getRemoteBasePath()).isNull(); + assertThat(keyedStateBackend1.getOptionsContainer().getPathContainer().getRemoteBasePath()) + .isNull(); Configuration config = new Configuration(); config.set(ForStOptions.SYNC_ENFORCE_LOCAL, false); backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); @@ -151,6 +152,7 @@ void testCreateKeyedStateBackend() throws Exception { Collections.emptyList(), new CloseableRegistry(), 1.0d)); - assertThat(keyedStateBackend2.getOptionsContainer().getRemoteBasePath()).isNotNull(); + assertThat(keyedStateBackend2.getOptionsContainer().getPathContainer().getRemoteBasePath()) + .isNotNull(); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java index 73f3f46df4fb6..deddefe883516 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; +import org.apache.flink.state.forst.ForStPathContainer; import org.apache.flink.state.forst.StateHandleTransferSpec; import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider; @@ -471,11 +472,14 @@ void testSnapshotStrategyAsExpected( void testRestoreStrategyAsExpected( @Nullable ForStFlinkFileSystem forStFlinkFileSystem, + String sourceDirectoryStr, + String desDirStr, RecoveryClaimMode recoveryClaimMode, Class expected) { List sharedStateHandleList = new ArrayList<>(); sharedStateHandleList.add( - HandleAndLocalPath.of(new FileStateHandle(new Path("1.sst"), 0), "1.sst")); + HandleAndLocalPath.of( + new FileStateHandle(new Path(sourceDirectoryStr + "/1.sst"), 0), "1.sst")); IncrementalRemoteKeyedStateHandle stateHandle = new IncrementalRemoteKeyedStateHandle( UUID.randomUUID(), @@ -484,12 +488,15 @@ void testRestoreStrategyAsExpected( sharedStateHandleList, Collections.emptyList(), new FileStateHandle(new Path("meta"), 0)); + Path destJobDir = new Path(desDirStr); + Path destBaseDir = new Path(destJobDir, "base"); assertThat( DataTransferStrategyBuilder.buildForRestore( forStFlinkFileSystem, + ForStPathContainer.of(null, null, destJobDir, destBaseDir), Collections.singletonList( new StateHandleTransferSpec( - stateHandle, new Path("dst"))), + stateHandle, destBaseDir)), recoveryClaimMode) .getClass()) .isEqualTo(expected); @@ -524,19 +531,76 @@ void testBuildingStrategyAsExpected() throws IOException { CopyDataTransferStrategy.class); testRestoreStrategyAsExpected( - forStFlinkFileSystem, RecoveryClaimMode.CLAIM, ReusableDataTransferStrategy.class); + forStFlinkFileSystem, + "/src-dir", + "/dst-dir", + RecoveryClaimMode.CLAIM, + ReusableDataTransferStrategy.class); + + testRestoreStrategyAsExpected( + forStFlinkFileSystem, + "/src-dir", + "/dst-dir", + RecoveryClaimMode.NO_CLAIM, + CopyDataTransferStrategy.class); + + testRestoreStrategyAsExpected( + forStFlinkFileSystem, + "/src-dir", + "/dst-dir", + RecoveryClaimMode.LEGACY, + ReusableDataTransferStrategy.class); + + testRestoreStrategyAsExpected( + null, + "/src-dir", + "/dst-dir", + RecoveryClaimMode.CLAIM, + CopyDataTransferStrategy.class); + + testRestoreStrategyAsExpected( + null, + "/src-dir", + "/dst-dir", + RecoveryClaimMode.NO_CLAIM, + CopyDataTransferStrategy.class); + // Restoring from the same directory indicates a failover scenario, allowing us to reuse the + // files if we are in a disaggregated setup. testRestoreStrategyAsExpected( - forStFlinkFileSystem, RecoveryClaimMode.NO_CLAIM, CopyDataTransferStrategy.class); + forStFlinkFileSystem, + "/same-dir", + "/same-dir", + RecoveryClaimMode.CLAIM, + ReusableDataTransferStrategy.class); testRestoreStrategyAsExpected( - forStFlinkFileSystem, RecoveryClaimMode.LEGACY, ReusableDataTransferStrategy.class); + forStFlinkFileSystem, + "/same-dir", + "/same-dir", + RecoveryClaimMode.NO_CLAIM, + ReusableDataTransferStrategy.class); testRestoreStrategyAsExpected( - null, RecoveryClaimMode.CLAIM, CopyDataTransferStrategy.class); + forStFlinkFileSystem, + "/same-dir", + "/same-dir", + RecoveryClaimMode.LEGACY, + ReusableDataTransferStrategy.class); testRestoreStrategyAsExpected( - null, RecoveryClaimMode.NO_CLAIM, CopyDataTransferStrategy.class); + null, + "/same-dir", + "/same-dir", + RecoveryClaimMode.CLAIM, + CopyDataTransferStrategy.class); + + testRestoreStrategyAsExpected( + null, + "/same-dir", + "/same-dir", + RecoveryClaimMode.NO_CLAIM, + CopyDataTransferStrategy.class); } @TestTemplate diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java index bfa90404adf08..3b161ad138441 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.TestStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.state.forst.ForStPathContainer; import org.apache.flink.state.forst.StateHandleTransferSpec; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.IOUtils; @@ -71,6 +72,14 @@ class ForStStateDataTransferTest extends TestLogger { @TempDir private java.nio.file.Path temporaryFolder; + private ForStPathContainer createPathContainer() throws IOException { + Path localJobPath = Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder)); + Path localBasePath = new Path(localJobPath, "base"); + Path remoteJobPath = Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder)); + Path remoteBasePath = new Path(remoteJobPath, "base"); + return ForStPathContainer.of(localJobPath, localBasePath, remoteJobPath, remoteBasePath); + } + /** Test that the exception arose in the thread pool will rethrow to the main thread. */ @Test void testMultiThreadTransferThreadPoolExceptionRethrow() throws IOException { @@ -465,11 +474,13 @@ public void testMultiThreadRestoreThreadPoolExceptionRethrow() { stateHandle); try (ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(5)) { + ForStPathContainer pathContainer = createPathContainer(); stateTransfer.transferAllStateDataToDirectory( + pathContainer, Collections.singletonList( new StateHandleTransferSpec( incrementalKeyedStateHandle, - Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder)))), + pathContainer.getRemoteForStPath())), new CloseableRegistry(), RecoveryClaimMode.DEFAULT); fail(); @@ -494,8 +505,12 @@ public void testMultiThreadRestoreCorrectly() throws Exception { } try (ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(4)) { + ForStPathContainer pathContainer = createPathContainer(); stateTransfer.transferAllStateDataToDirectory( - transferRequests, new CloseableRegistry(), RecoveryClaimMode.DEFAULT); + pathContainer, + transferRequests, + new CloseableRegistry(), + RecoveryClaimMode.DEFAULT); } for (int i = 0; i < numRemoteHandles; ++i) { @@ -540,8 +555,9 @@ public void testMultiThreadCleanupOnFailure() throws Exception { CloseableRegistry closeableRegistry = new CloseableRegistry(); try (ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(5)) { + ForStPathContainer pathContainer = createPathContainer(); stateTransfer.transferAllStateDataToDirectory( - transferRequests, closeableRegistry, RecoveryClaimMode.DEFAULT); + pathContainer, transferRequests, closeableRegistry, RecoveryClaimMode.DEFAULT); fail("Exception is expected"); } catch (IOException ignore) { }