From ed75795e97800177cb67141ab838632d5ec55bb5 Mon Sep 17 00:00:00 2001 From: Jinzhong Li Date: Wed, 28 Feb 2024 12:19:36 +0800 Subject: [PATCH] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery --- .../checkpointing_configuration.html | 6 +++ .../common_state_backends_section.html | 6 +++ .../state_recovery_configuration.html | 2 +- .../configuration/CheckpointingOptions.java | 24 +++++++++++- .../configuration/StateRecoveryOptions.java | 16 ++++---- .../fs/DuplicatingStateChangeFsUploader.java | 10 ++--- .../changelog/fs/FsStateChangelogStorage.java | 2 +- .../changelog/fs/FsStateChangelogWriter.java | 2 +- .../fs/StateChangeUploadScheduler.java | 2 +- .../runtime/SavepointTaskStateManager.java | 2 +- ...efaultSlotPoolServiceSchedulerFactory.java | 1 - .../state/ChangelogTaskLocalStateStore.java | 2 +- .../CheckpointStreamWithResultProvider.java | 2 +- .../runtime/state/LocalRecoveryConfig.java | 38 +++++++++++++++++-- ...va => LocalSnapshotDirectoryProvider.java} | 14 +++---- ...> LocalSnapshotDirectoryProviderImpl.java} | 10 ++--- .../TaskExecutorLocalStateStoresManager.java | 21 ++++++---- .../state/TaskLocalStateStoreImpl.java | 14 ++++++- .../state/heap/HeapSnapshotStrategy.java | 2 +- .../taskexecutor/TaskManagerServices.java | 1 + .../TaskManagerServicesConfiguration.java | 14 ++++++- .../ChangelogTaskLocalStateStoreTest.java | 15 ++++---- ...heckpointStreamWithResultProviderTest.java | 8 ++-- ...calSnapshotDirectoryProviderImplTest.java} | 10 ++--- ...skExecutorLocalStateStoresManagerTest.java | 2 +- .../state/TaskLocalStateStoreImplTest.java | 7 ++-- .../state/TaskStateManagerImplTest.java | 7 ++-- .../state/TestLocalRecoveryConfig.java | 6 +-- .../changelog/ChangelogStateDiscardTest.java | 2 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 8 ++-- .../tasks/LocalStateForwardingTest.java | 9 +++-- .../runtime/tasks/StreamTaskTestHarness.java | 6 +-- .../benchmark/StateBackendBenchmarkUtils.java | 4 +- .../StreamOperatorSnapshotRestoreTest.java | 13 ++++--- 34 files changed, 194 insertions(+), 94 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProvider.java => LocalSnapshotDirectoryProvider.java} (83%) rename flink-runtime/src/main/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProviderImpl.java => LocalSnapshotDirectoryProviderImpl.java} (93%) rename flink-runtime/src/test/java/org/apache/flink/runtime/state/{LocalRecoveryDirectoryProviderImplTest.java => LocalSnapshotDirectoryProviderImplTest.java} (93%) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index 0cd3dd53755..c87a9c33803 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -8,6 +8,12 @@ + +
execution.checkpointing.local-backup.enabled
+ false + Boolean + This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend). +
state.backend.incremental
false diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index a16650bf412..ab664f51bfb 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -32,6 +32,12 @@ String The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). + +
execution.state-recovery.from-local
+ false + Boolean + This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)." +
state.backend.incremental
false diff --git a/docs/layouts/shortcodes/generated/state_recovery_configuration.html b/docs/layouts/shortcodes/generated/state_recovery_configuration.html index 2a1d914e2fd..2df071ea4d5 100644 --- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html +++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html @@ -18,7 +18,7 @@
execution.state-recovery.from-local
false Boolean - This option configures local recovery for the state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)." + This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."
execution.state-recovery.ignore-unclaimed-state
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index a5a00e813a0..5626da3d467 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -179,7 +179,8 @@ public class CheckpointingOptions { *

Local recovery currently only covers keyed state backends (including both the * EmbeddedRocksDBStateBackend and the HashMapStateBackend). * - * @Deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} instead. + * @deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} and {@link + * CheckpointingOptions#LOCAL_BACKUP_ENABLED} instead. */ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) @Documentation.ExcludeFromDocumentation("Hidden for deprecated") @@ -309,4 +310,25 @@ public class CheckpointingOptions { + "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.", FS_SMALL_FILE_THRESHOLD.key())) .withDeprecatedKeys("state.backend.fs.write-buffer-size"); + + /** + * This option configures local backup for the state backend, which indicates whether to make + * backup checkpoint on local disk. If not configured, fallback to {@link + * StateRecoveryOptions#LOCAL_RECOVERY}. By default, local backup is deactivated. Local backup + * currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend + * and the HashMapStateBackend). + */ + public static final ConfigOption LOCAL_BACKUP_ENABLED = + ConfigOptions.key("execution.checkpointing.local-backup.enabled") + .booleanType() + .defaultValue(StateRecoveryOptions.LOCAL_RECOVERY.defaultValue()) + .withFallbackKeys(StateRecoveryOptions.LOCAL_RECOVERY.key()) + .withDeprecatedKeys(LOCAL_RECOVERY.key()) + .withDescription( + "This option configures local backup for the state backend, " + + "which indicates whether to make backup checkpoint on local disk. " + + "If not configured, fallback to " + + StateRecoveryOptions.LOCAL_RECOVERY.key() + + ". By default, local backup is deactivated. Local backup currently only " + + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."); } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java index cfd585b89f6..c19329c5d92 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java @@ -99,19 +99,21 @@ public class StateRecoveryOptions { .build()); /** - * This option configures local recovery for the state backend. By default, local recovery is - * deactivated. + * This option configures local recovery for the state backend, which indicates whether to + * recovery from local snapshot. By default, local recovery is deactivated. * *

Local recovery currently only covers keyed state backends (including both the * EmbeddedRocksDBStateBackend and the HashMapStateBackend). - * */ + @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) public static final ConfigOption LOCAL_RECOVERY = ConfigOptions.key("execution.state-recovery.from-local") .booleanType() .defaultValue(false) - .withDeprecatedKeys(CheckpointingOptions.LOCAL_RECOVERY.key()) - .withDescription(" This option configures local recovery for the state backend. " - + "By default, local recovery is deactivated. Local recovery currently only " - + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\""); + .withDeprecatedKeys("state.backend.local-recovery") + .withDescription( + "This option configures local recovery for the state backend, " + + "which indicates whether to recovery from local snapshot." + + "By default, local recovery is deactivated. Local recovery currently only " + + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\""); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java index 50db749b973..babe8fa8d28 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java @@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -68,7 +68,7 @@ public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploa private final Path basePath; private final FileSystem fileSystem; - private final LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider; + private final LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider; private final JobID jobID; public DuplicatingStateChangeFsUploader( @@ -79,12 +79,12 @@ public DuplicatingStateChangeFsUploader( int bufferSize, ChangelogStorageMetricGroup metrics, TaskChangelogRegistry changelogRegistry, - LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) { + LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider) { super(compression, bufferSize, metrics, changelogRegistry, FileStateHandle::new); this.basePath = new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR)); this.fileSystem = fileSystem; - this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider; + this.localSnapshotDirectoryProvider = localSnapshotDirectoryProvider; this.jobID = jobID; } @@ -96,7 +96,7 @@ public OutputStreamWithPos prepareStream() throws IOException { FSDataOutputStream primaryStream = fileSystem.create(path, WriteMode.NO_OVERWRITE); Path localPath = new Path( - getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID), + getLocalTaskOwnedDirectory(localSnapshotDirectoryProvider, jobID), fileName); FSDataOutputStream secondaryStream = localPath.getFileSystem().create(localPath, WriteMode.NO_OVERWRITE); diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java index 1d4668cb8e8..66d6409c806 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java @@ -143,7 +143,7 @@ public FsStateChangelogStorage( this.changelogRegistry = changelogRegistry; this.uploader = uploader; this.localRecoveryConfig = localRecoveryConfig; - if (localRecoveryConfig.isLocalRecoveryEnabled()) { + if (localRecoveryConfig.isLocalBackupEnabled()) { this.localChangelogRegistry = new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor()); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index f0a4b21646e..9f0c74964b2 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -421,7 +421,7 @@ private SnapshotResult buildSnapshotResult( size, incrementalSize, FsStateChangelogStorageFactory.IDENTIFIER); - if (localRecoveryConfig.isLocalRecoveryEnabled()) { + if (localRecoveryConfig.isLocalBackupEnabled()) { size = 0; List> localTuples = new ArrayList<>(); for (UploadResult uploadResult : results.values()) { diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java index 19f09a151e6..45386f842b2 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java @@ -97,7 +97,7 @@ static StateChangeUploadScheduler fromConfig( checkArgument(bytes <= Integer.MAX_VALUE); int bufferSize = (int) bytes; StateChangeUploader store = - localRecoveryConfig.isLocalRecoveryEnabled() + localRecoveryConfig.isLocalBackupEnabled() ? new DuplicatingStateChangeFsUploader( jobID, basePath, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java index 6ce30d61243..1fb84df416d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java @@ -89,7 +89,7 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera @Nonnull @Override public LocalRecoveryConfig createLocalRecoveryConfig() { - return new LocalRecoveryConfig(null); + return LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 8e85563fdf9..1472f247a10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RpcOptions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java index 23b2b6fd42a..7be6d9ead80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java @@ -101,7 +101,7 @@ private void updateReference(long checkpointId, TaskStateSnapshot localState) { } public static Path getLocalTaskOwnedDirectory( - LocalRecoveryDirectoryProvider provider, JobID jobID) { + LocalSnapshotDirectoryProvider provider, JobID jobID) { File outDir = provider.selectAllocationBaseDirectory( (jobID.hashCode() & Integer.MAX_VALUE) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java index 3378ccb7713..a4bc3c27f54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java @@ -164,7 +164,7 @@ static CheckpointStreamWithResultProvider createDuplicatingStream( @Nonnegative long checkpointId, @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory primaryStreamFactory, - @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) + @Nonnull LocalSnapshotDirectoryProvider secondaryStreamDirProvider) throws IOException { CheckpointStateOutputStream primaryOut = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java index 5a378b60ae2..e4b833540d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.state; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Optional; @@ -30,18 +33,40 @@ */ public class LocalRecoveryConfig { + public static final LocalRecoveryConfig BACKUP_AND_RECOVERY_DISABLED = + new LocalRecoveryConfig(false, false, null); + + /** Whether to recover from the local snapshot. */ + private final boolean localRecoveryEnabled; + + /** Whether to do backup checkpoint on local disk. */ + private final boolean localBackupEnabled; + /** Encapsulates the root directories and the subtask-specific path. */ - @Nullable private final LocalRecoveryDirectoryProvider localStateDirectories; + @Nullable private final LocalSnapshotDirectoryProvider localStateDirectories; - public LocalRecoveryConfig(@Nullable LocalRecoveryDirectoryProvider directoryProvider) { + public LocalRecoveryConfig( + boolean localRecoveryEnabled, + boolean localBackupEnabled, + @Nullable LocalSnapshotDirectoryProvider directoryProvider) { + this.localRecoveryEnabled = localRecoveryEnabled; + this.localBackupEnabled = localBackupEnabled; this.localStateDirectories = directoryProvider; } public boolean isLocalRecoveryEnabled() { - return localStateDirectories != null; + return localRecoveryEnabled; + } + + public boolean isLocalBackupEnabled() { + return localBackupEnabled; + } + + public boolean isLocalRecoveryOrLocalBackupEnabled() { + return localRecoveryEnabled || localBackupEnabled; } - public Optional getLocalStateDirectoryProvider() { + public Optional getLocalStateDirectoryProvider() { return Optional.ofNullable(localStateDirectories); } @@ -55,4 +80,9 @@ public static Supplier localRecoveryNotEnabled() { new IllegalStateException( "Getting a LocalRecoveryDirectoryProvider is only supported with the local recovery enabled. This is a bug and should be reported."); } + + public static LocalRecoveryConfig backupAndRecoveryEnabled( + @Nonnull LocalSnapshotDirectoryProvider directoryProvider) { + return new LocalRecoveryConfig(true, true, Preconditions.checkNotNull(directoryProvider)); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java similarity index 83% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java index 85d2ab00161..0ab95bcf00b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java @@ -22,12 +22,12 @@ import java.io.Serializable; /** - * Provides directories for local recovery. It offers access to the allocation base directories - * (i.e. the root directories for all local state that is created under the same allocation id) and - * the subtask-specific paths, which contain the local state for one subtask. Access by checkpoint - * id rotates over all root directory indexes, in case that there is more than one. Selection - * methods are provided to pick the directory under a certain index. Directory structures are of the - * following shape: + * Provides directories for local backup or local recovery. It offers access to the allocation base + * directories (i.e. the root directories for all local state that is created under the same + * allocation id) and the subtask-specific paths, which contain the local state for one subtask. + * Access by checkpoint id rotates over all root directory indexes, in case that there is more than + * one. Selection methods are provided to pick the directory under a certain index. Directory + * structures are of the following shape: * *

* @@ -48,7 +48,7 @@ * *

*/ -public interface LocalRecoveryDirectoryProvider extends Serializable { +public interface LocalSnapshotDirectoryProvider extends Serializable { /** * Returns the local state allocation base directory for given checkpoint id w.r.t. our rotation diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java similarity index 93% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java index c2ee005806f..128e16758f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java @@ -33,15 +33,15 @@ import java.nio.file.Paths; import java.util.Arrays; -/** Implementation of {@link LocalRecoveryDirectoryProvider}. */ -public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirectoryProvider { +/** Implementation of {@link LocalSnapshotDirectoryProvider}. */ +public class LocalSnapshotDirectoryProviderImpl implements LocalSnapshotDirectoryProvider { /** Serial version. */ private static final long serialVersionUID = 1L; /** Logger for this class. */ private static final Logger LOG = - LoggerFactory.getLogger(LocalRecoveryDirectoryProviderImpl.class); + LoggerFactory.getLogger(LocalSnapshotDirectoryProviderImpl.class); /** All available root directories that this can potentially deliver. */ @Nonnull private final File[] allocationBaseDirs; @@ -55,7 +55,7 @@ public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirector /** Index of the owning subtask. */ @Nonnegative private final int subtaskIndex; - public LocalRecoveryDirectoryProviderImpl( + public LocalSnapshotDirectoryProviderImpl( File allocationBaseDir, @Nonnull JobID jobID, @Nonnull JobVertexID jobVertexID, @@ -63,7 +63,7 @@ public LocalRecoveryDirectoryProviderImpl( this(new File[] {allocationBaseDir}, jobID, jobVertexID, subtaskIndex); } - public LocalRecoveryDirectoryProviderImpl( + public LocalSnapshotDirectoryProviderImpl( @Nonnull File[] allocationBaseDirs, @Nonnull JobID jobID, @Nonnull JobVertexID jobVertexID, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index b58b7fbde93..efb0f54f4ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -67,9 +67,12 @@ public class TaskExecutorLocalStateStoresManager { private final Map> taskStateStoresByAllocationID; - /** The configured mode for local recovery on this task manager. */ + /** Whether to recover from the local snapshot. */ private final boolean localRecoveryEnabled; + /** Whether to do backup checkpoint on local disk. */ + private final boolean localBackupEnabled; + /** This is the root directory for all local state of this task manager / executor. */ private final Reference localStateRootDirectories; @@ -86,6 +89,7 @@ public class TaskExecutorLocalStateStoresManager { public TaskExecutorLocalStateStoresManager( boolean localRecoveryEnabled, + boolean localBackupEnabled, @Nonnull Reference localStateRootDirectories, @Nonnull Executor discardExecutor) throws IOException { @@ -97,6 +101,7 @@ public TaskExecutorLocalStateStoresManager( this.taskStateStoresByAllocationID = new HashMap<>(); this.localRecoveryEnabled = localRecoveryEnabled; + this.localBackupEnabled = localBackupEnabled; this.localStateRootDirectories = localStateRootDirectories; this.discardExecutor = discardExecutor; this.lock = new Object(); @@ -157,17 +162,17 @@ public TaskLocalStateStore localStateStoreForSubtask( if (taskLocalStateStore == null) { - LocalRecoveryDirectoryProviderImpl directoryProvider = null; - if (localRecoveryEnabled) { + LocalSnapshotDirectoryProviderImpl directoryProvider = null; + if (localRecoveryEnabled || localBackupEnabled) { // create the allocation base dirs, one inside each root dir. File[] allocationBaseDirectories = allocationBaseDirectories(allocationID); directoryProvider = - new LocalRecoveryDirectoryProviderImpl( + new LocalSnapshotDirectoryProviderImpl( allocationBaseDirectories, jobId, jobVertexID, subtaskIndex); } - LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(directoryProvider); + new LocalRecoveryConfig( + localRecoveryEnabled, localBackupEnabled, directoryProvider); boolean changelogEnabled = jobConfiguration @@ -176,7 +181,7 @@ public TaskLocalStateStore localStateStoreForSubtask( clusterConfiguration.get( StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)); - if (localRecoveryConfig.isLocalRecoveryEnabled() && changelogEnabled) { + if (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() && changelogEnabled) { taskLocalStateStore = new ChangelogTaskLocalStateStore( jobId, @@ -185,7 +190,7 @@ public TaskLocalStateStore localStateStoreForSubtask( subtaskIndex, localRecoveryConfig, discardExecutor); - } else if (localRecoveryConfig.isLocalRecoveryEnabled()) { + } else if (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled()) { taskLocalStateStore = new TaskLocalStateStoreImpl( jobId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index cf6a25545f8..a3c3b35fba6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -203,7 +203,7 @@ private void createFolderOrFail(File checkpointDirectory) { } } - protected LocalRecoveryDirectoryProvider getLocalRecoveryDirectoryProvider() { + protected LocalSnapshotDirectoryProvider getLocalRecoveryDirectoryProvider() { return localRecoveryConfig .getLocalStateDirectoryProvider() .orElseThrow(() -> new IllegalStateException("Local recovery must be enabled.")); @@ -219,6 +219,18 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { snapshot = loadTaskStateSnapshot(checkpointID); } + // Even if local recovery is disabled, it is still necessary to load the TaskStateSnapshot + // so that it can be managed by the TaskLocalStateStore. + if (!localRecoveryConfig.isLocalRecoveryEnabled()) { + LOG.debug( + "Local recovery is disabled for checkpoint {} in subtask ({} - {} - {})", + checkpointID, + jobID, + jobVertexID, + subtaskIndex); + return null; + } + if (snapshot != null) { LOG.info( "Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java index a403cfce5d2..69da532a6f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java @@ -119,7 +119,7 @@ public SnapshotResultSupplier asyncSnapshot( final SupplierWithException checkpointStreamSupplier = - localRecoveryConfig.isLocalRecoveryEnabled() + localRecoveryConfig.isLocalBackupEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ? () -> createDuplicatingStream( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 5009ef733c6..6398107ea48 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -389,6 +389,7 @@ public static TaskManagerServices fromConfiguration( final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), + taskManagerServicesConfiguration.isLocalBackupEnabled(), taskManagerServicesConfiguration.getLocalRecoveryStateDirectories(), ioExecutor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index a6577c26b93..962d0e0605d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -83,6 +83,8 @@ public class TaskManagerServicesConfiguration { private final boolean localRecoveryEnabled; + private final boolean localBackupEnabled; + private final RetryingRegistrationConfiguration retryingRegistrationConfiguration; private Optional