Skip to content

Commit

Permalink
[FLINK-34484][state] Split 'state.backend.local-recovery' into two op…
Browse files Browse the repository at this point in the history
…tions for checkpointing and recovery
  • Loading branch information
ljz2051 authored and fredia committed Mar 14, 2024
1 parent 1d1dca6 commit ed75795
Show file tree
Hide file tree
Showing 34 changed files with 194 additions and 94 deletions.
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>execution.checkpointing.local-backup.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).</td>
</tr>
<tr>
<td><h5>state.backend.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Expand Up @@ -32,6 +32,12 @@
<td>String</td>
<td>The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).</td>
</tr>
<tr>
<td><h5>execution.state-recovery.from-local</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."</td>
</tr>
<tr>
<td><h5>state.backend.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Expand Up @@ -18,7 +18,7 @@
<td><h5>execution.state-recovery.from-local</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> This option configures local recovery for the state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."</td>
<td>This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."</td>
</tr>
<tr>
<td><h5>execution.state-recovery.ignore-unclaimed-state</h5></td>
Expand Down
Expand Up @@ -179,7 +179,8 @@ public class CheckpointingOptions {
* <p>Local recovery currently only covers keyed state backends (including both the
* EmbeddedRocksDBStateBackend and the HashMapStateBackend).
*
* @Deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} instead.
* @deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} and {@link
* CheckpointingOptions#LOCAL_BACKUP_ENABLED} instead.
*/
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
Expand Down Expand Up @@ -309,4 +310,25 @@ public class CheckpointingOptions {
+ "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.",
FS_SMALL_FILE_THRESHOLD.key()))
.withDeprecatedKeys("state.backend.fs.write-buffer-size");

/**
* This option configures local backup for the state backend, which indicates whether to make
* backup checkpoint on local disk. If not configured, fallback to {@link
* StateRecoveryOptions#LOCAL_RECOVERY}. By default, local backup is deactivated. Local backup
* currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend
* and the HashMapStateBackend).
*/
public static final ConfigOption<Boolean> LOCAL_BACKUP_ENABLED =
ConfigOptions.key("execution.checkpointing.local-backup.enabled")
.booleanType()
.defaultValue(StateRecoveryOptions.LOCAL_RECOVERY.defaultValue())
.withFallbackKeys(StateRecoveryOptions.LOCAL_RECOVERY.key())
.withDeprecatedKeys(LOCAL_RECOVERY.key())
.withDescription(
"This option configures local backup for the state backend, "
+ "which indicates whether to make backup checkpoint on local disk. "
+ "If not configured, fallback to "
+ StateRecoveryOptions.LOCAL_RECOVERY.key()
+ ". By default, local backup is deactivated. Local backup currently only "
+ "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).");
}
Expand Up @@ -99,19 +99,21 @@ public class StateRecoveryOptions {
.build());

/**
* This option configures local recovery for the state backend. By default, local recovery is
* deactivated.
* This option configures local recovery for the state backend, which indicates whether to
* recovery from local snapshot. By default, local recovery is deactivated.
*
* <p>Local recovery currently only covers keyed state backends (including both the
* EmbeddedRocksDBStateBackend and the HashMapStateBackend).
*
*/
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<Boolean> LOCAL_RECOVERY =
ConfigOptions.key("execution.state-recovery.from-local")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys(CheckpointingOptions.LOCAL_RECOVERY.key())
.withDescription(" This option configures local recovery for the state backend. "
+ "By default, local recovery is deactivated. Local recovery currently only "
+ "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\"");
.withDeprecatedKeys("state.backend.local-recovery")
.withDescription(
"This option configures local recovery for the state backend, "
+ "which indicates whether to recovery from local snapshot."
+ "By default, local recovery is deactivated. Local recovery currently only "
+ "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\"");
}
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
Expand Down Expand Up @@ -68,7 +68,7 @@ public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploa

private final Path basePath;
private final FileSystem fileSystem;
private final LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider;
private final LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider;
private final JobID jobID;

public DuplicatingStateChangeFsUploader(
Expand All @@ -79,12 +79,12 @@ public DuplicatingStateChangeFsUploader(
int bufferSize,
ChangelogStorageMetricGroup metrics,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) {
LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider) {
super(compression, bufferSize, metrics, changelogRegistry, FileStateHandle::new);
this.basePath =
new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
this.fileSystem = fileSystem;
this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider;
this.localSnapshotDirectoryProvider = localSnapshotDirectoryProvider;
this.jobID = jobID;
}

Expand All @@ -96,7 +96,7 @@ public OutputStreamWithPos prepareStream() throws IOException {
FSDataOutputStream primaryStream = fileSystem.create(path, WriteMode.NO_OVERWRITE);
Path localPath =
new Path(
getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID),
getLocalTaskOwnedDirectory(localSnapshotDirectoryProvider, jobID),
fileName);
FSDataOutputStream secondaryStream =
localPath.getFileSystem().create(localPath, WriteMode.NO_OVERWRITE);
Expand Down
Expand Up @@ -143,7 +143,7 @@ public FsStateChangelogStorage(
this.changelogRegistry = changelogRegistry;
this.uploader = uploader;
this.localRecoveryConfig = localRecoveryConfig;
if (localRecoveryConfig.isLocalRecoveryEnabled()) {
if (localRecoveryConfig.isLocalBackupEnabled()) {
this.localChangelogRegistry =
new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor());
}
Expand Down
Expand Up @@ -421,7 +421,7 @@ private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
size,
incrementalSize,
FsStateChangelogStorageFactory.IDENTIFIER);
if (localRecoveryConfig.isLocalRecoveryEnabled()) {
if (localRecoveryConfig.isLocalBackupEnabled()) {
size = 0;
List<Tuple2<StreamStateHandle, Long>> localTuples = new ArrayList<>();
for (UploadResult uploadResult : results.values()) {
Expand Down
Expand Up @@ -97,7 +97,7 @@ static StateChangeUploadScheduler fromConfig(
checkArgument(bytes <= Integer.MAX_VALUE);
int bufferSize = (int) bytes;
StateChangeUploader store =
localRecoveryConfig.isLocalRecoveryEnabled()
localRecoveryConfig.isLocalBackupEnabled()
? new DuplicatingStateChangeFsUploader(
jobID,
basePath,
Expand Down
Expand Up @@ -89,7 +89,7 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera
@Nonnull
@Override
public LocalRecoveryConfig createLocalRecoveryConfig() {
return new LocalRecoveryConfig(null);
return LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED;
}

@Override
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RpcOptions;
Expand Down
Expand Up @@ -101,7 +101,7 @@ private void updateReference(long checkpointId, TaskStateSnapshot localState) {
}

public static Path getLocalTaskOwnedDirectory(
LocalRecoveryDirectoryProvider provider, JobID jobID) {
LocalSnapshotDirectoryProvider provider, JobID jobID) {
File outDir =
provider.selectAllocationBaseDirectory(
(jobID.hashCode() & Integer.MAX_VALUE)
Expand Down
Expand Up @@ -164,7 +164,7 @@ static CheckpointStreamWithResultProvider createDuplicatingStream(
@Nonnegative long checkpointId,
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
@Nonnull LocalSnapshotDirectoryProvider secondaryStreamDirProvider)
throws IOException {

CheckpointStateOutputStream primaryOut =
Expand Down
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.state;

import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Optional;
Expand All @@ -30,18 +33,40 @@
*/
public class LocalRecoveryConfig {

public static final LocalRecoveryConfig BACKUP_AND_RECOVERY_DISABLED =
new LocalRecoveryConfig(false, false, null);

/** Whether to recover from the local snapshot. */
private final boolean localRecoveryEnabled;

/** Whether to do backup checkpoint on local disk. */
private final boolean localBackupEnabled;

/** Encapsulates the root directories and the subtask-specific path. */
@Nullable private final LocalRecoveryDirectoryProvider localStateDirectories;
@Nullable private final LocalSnapshotDirectoryProvider localStateDirectories;

public LocalRecoveryConfig(@Nullable LocalRecoveryDirectoryProvider directoryProvider) {
public LocalRecoveryConfig(
boolean localRecoveryEnabled,
boolean localBackupEnabled,
@Nullable LocalSnapshotDirectoryProvider directoryProvider) {
this.localRecoveryEnabled = localRecoveryEnabled;
this.localBackupEnabled = localBackupEnabled;
this.localStateDirectories = directoryProvider;
}

public boolean isLocalRecoveryEnabled() {
return localStateDirectories != null;
return localRecoveryEnabled;
}

public boolean isLocalBackupEnabled() {
return localBackupEnabled;
}

public boolean isLocalRecoveryOrLocalBackupEnabled() {
return localRecoveryEnabled || localBackupEnabled;
}

public Optional<LocalRecoveryDirectoryProvider> getLocalStateDirectoryProvider() {
public Optional<LocalSnapshotDirectoryProvider> getLocalStateDirectoryProvider() {
return Optional.ofNullable(localStateDirectories);
}

Expand All @@ -55,4 +80,9 @@ public static Supplier<IllegalStateException> localRecoveryNotEnabled() {
new IllegalStateException(
"Getting a LocalRecoveryDirectoryProvider is only supported with the local recovery enabled. This is a bug and should be reported.");
}

public static LocalRecoveryConfig backupAndRecoveryEnabled(
@Nonnull LocalSnapshotDirectoryProvider directoryProvider) {
return new LocalRecoveryConfig(true, true, Preconditions.checkNotNull(directoryProvider));
}
}
Expand Up @@ -22,12 +22,12 @@
import java.io.Serializable;

/**
* Provides directories for local recovery. It offers access to the allocation base directories
* (i.e. the root directories for all local state that is created under the same allocation id) and
* the subtask-specific paths, which contain the local state for one subtask. Access by checkpoint
* id rotates over all root directory indexes, in case that there is more than one. Selection
* methods are provided to pick the directory under a certain index. Directory structures are of the
* following shape:
* Provides directories for local backup or local recovery. It offers access to the allocation base
* directories (i.e. the root directories for all local state that is created under the same
* allocation id) and the subtask-specific paths, which contain the local state for one subtask.
* Access by checkpoint id rotates over all root directory indexes, in case that there is more than
* one. Selection methods are provided to pick the directory under a certain index. Directory
* structures are of the following shape:
*
* <p>
*
Expand All @@ -48,7 +48,7 @@
*
* <p>
*/
public interface LocalRecoveryDirectoryProvider extends Serializable {
public interface LocalSnapshotDirectoryProvider extends Serializable {

/**
* Returns the local state allocation base directory for given checkpoint id w.r.t. our rotation
Expand Down
Expand Up @@ -33,15 +33,15 @@
import java.nio.file.Paths;
import java.util.Arrays;

/** Implementation of {@link LocalRecoveryDirectoryProvider}. */
public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirectoryProvider {
/** Implementation of {@link LocalSnapshotDirectoryProvider}. */
public class LocalSnapshotDirectoryProviderImpl implements LocalSnapshotDirectoryProvider {

/** Serial version. */
private static final long serialVersionUID = 1L;

/** Logger for this class. */
private static final Logger LOG =
LoggerFactory.getLogger(LocalRecoveryDirectoryProviderImpl.class);
LoggerFactory.getLogger(LocalSnapshotDirectoryProviderImpl.class);

/** All available root directories that this can potentially deliver. */
@Nonnull private final File[] allocationBaseDirs;
Expand All @@ -55,15 +55,15 @@ public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirector
/** Index of the owning subtask. */
@Nonnegative private final int subtaskIndex;

public LocalRecoveryDirectoryProviderImpl(
public LocalSnapshotDirectoryProviderImpl(
File allocationBaseDir,
@Nonnull JobID jobID,
@Nonnull JobVertexID jobVertexID,
@Nonnegative int subtaskIndex) {
this(new File[] {allocationBaseDir}, jobID, jobVertexID, subtaskIndex);
}

public LocalRecoveryDirectoryProviderImpl(
public LocalSnapshotDirectoryProviderImpl(
@Nonnull File[] allocationBaseDirs,
@Nonnull JobID jobID,
@Nonnull JobVertexID jobVertexID,
Expand Down

0 comments on commit ed75795

Please sign in to comment.