Skip to content

Commit

Permalink
[FLINK-19467][runtime / state backends] Implement HashMapStateBackend…
Browse files Browse the repository at this point in the history
… and EmbeddedRocksDBStateBackend
  • Loading branch information
sjwiesman committed Feb 25, 2021
1 parent 16f62c5 commit 0a76dab
Show file tree
Hide file tree
Showing 42 changed files with 1,950 additions and 744 deletions.
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

/** A collection of all configuration options that relate to checkpoints and savepoints. */
public class CheckpointingOptions {
Expand All @@ -27,20 +29,72 @@ public class CheckpointingOptions {
// general checkpoint and state backend options
// ------------------------------------------------------------------------

/** The state backend to be used to store and checkpoint state. */
/**
* The checkpoint storage used to store operator state locally within the cluster during
* execution.
*
* <p>The implementation can be specified either via their shortcut name, or via the class name
* of a {@code StateBackendFactory}. If a StateBackendFactory class name is specified, the
* factory is instantiated (via its zero-argument constructor) and its {@code
* StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
*
* <p>Recognized shortcut names are 'hashmap' and 'rocksdb'.
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 1)
public static final ConfigOption<String> STATE_BACKEND =
ConfigOptions.key("state.backend")
.stringType()
.noDefaultValue()
.withDescription("The state backend to be used to store state.");
.withDescription(
Description.builder()
.text("The state backend to be used to store state.")
.linebreak()
.text(
"The implementation can be specified either via their shortcut "
+ " name, or via the class name of a %s. "
+ "If a factory is specified it is instantiated via its "
+ "zero argument constructor and its %s "
+ "method is called.",
TextElement.code("StateBackendFactory"),
TextElement.code(
"StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)"))
.linebreak()
.text("Recognized shortcut names are 'hashmap' and 'rocksdb'.")
.build());

/** The checkpoint storage used to checkpoint state. */
/**
* The checkpoint storage used to checkpoint state for recovery.
*
* <p>The implementation can be specified either via their shortcut name, or via the class name
* of a {@code CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified,
* the factory is instantiated (via its zero-argument constructor) and its {@code
* CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
*
* <p>Recognized shortcut names are 'jobmanager' and 'filesystem'.
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
public static final ConfigOption<String> CHECKPOINT_STORAGE =
ConfigOptions.key("state.checkpoint-storage")
.stringType()
.noDefaultValue()
.withDescription("The checkpoint storage to be used to checkpoint state.");
.withDescription(
Description.builder()
.text(
"The checkpoint storage implementation to be used to checkpoint state.")
.linebreak()
.text(
"The implementation can be specified either via their shortcut "
+ " name, or via the class name of a %s. "
+ "If a factory is specified it is instantiated via its "
+ "zero argument constructor and its %s "
+ " method is called.",
TextElement.code("CheckpointStorageFactory"),
TextElement.code(
"CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)"))
.linebreak()
.text(
"Recognized shortcut names are 'jobmanager' and 'filesystem'.")
.build());

/** The maximum number of completed checkpoints to retain. */
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
Expand Down Expand Up @@ -143,6 +197,7 @@ public class CheckpointingOptions {
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
public static final ConfigOption<String> CHECKPOINTS_DIRECTORY =
ConfigOptions.key("state.checkpoints.dir")
.stringType()
.noDefaultValue()
.withDeprecatedKeys("state.backend.fs.checkpointdir")
.withDescription(
Expand All @@ -156,7 +211,7 @@ public class CheckpointingOptions {
*/
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD =
ConfigOptions.key("state.snapshot.fs.memory-threshold")
ConfigOptions.key("state.storage.fs.memory-threshold")
.memoryType()
.defaultValue(MemorySize.parse("20kb"))
.withDescription(
Expand All @@ -169,7 +224,7 @@ public class CheckpointingOptions {
*/
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
public static final ConfigOption<Integer> FS_WRITE_BUFFER_SIZE =
ConfigOptions.key("state.snapshot.fs.write-buffer-size")
ConfigOptions.key("state.storage.fs.write-buffer-size")
.intType()
.defaultValue(4 * 1024)
.withDescription(
Expand Down
Expand Up @@ -34,7 +34,8 @@
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;

Expand Down Expand Up @@ -313,9 +314,9 @@ public static StateBackend loadStateBackend(
StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);

if (backend == null && logger != null) {
logger.info(
logger.debug(
"No state backend configured, attempting to dispose savepoint "
+ "with default backend (file system based)");
+ "with configured checkpoint storage");
}
} catch (Throwable t) {
// catches exceptions and errors (like linking errors)
Expand All @@ -326,9 +327,10 @@ public static StateBackend loadStateBackend(
}

if (backend == null) {
// We use the memory state backend by default. The MemoryStateBackend is actually
// FileSystem-based for metadata
backend = new MemoryStateBackend();
// We use the hashmap state backend by default. This will
// force the checkpoint storage loader to load
// the configured storage backend.
backend = new HashMapStateBackend();
}
return backend;
}
Expand All @@ -355,6 +357,12 @@ public static CheckpointStorage loadCheckpointStorage(
}
}

if (checkpointStorage == null) {
// We use the jobmanager checkpoint storage by default.
// The JobManagerCheckpointStorage is actually
// FileSystem-based for metadata
return new JobManagerCheckpointStorage();
}
return checkpointStorage;
}

Expand Down
Expand Up @@ -32,35 +32,22 @@
import java.util.Collection;

/**
* A <b>State Backend</b> defines how the state of a streaming application is stored and
* checkpointed. Different State Backends store their state in different fashions, and use different
* A <b>State Backend</b> defines how the state of a streaming application is stored locally within
* the cluster. Different State Backends store their state in different fashions, and use different
* data structures to hold the state of a running application.
*
* <p>For example, the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend memory state
* backend} keeps working state in the memory of the TaskManager and stores checkpoints in the
* memory of the JobManager. The backend is lightweight and without additional dependencies, but not
* highly available and supports only small state.
* <p>For example, the {@link org.apache.flink.runtime.state.hashmap.HashMapStateBackend hashmap
* state backend} keeps working state in the memory of the TaskManager. The backend is lightweight
* and without additional dependencies.
*
* <p>The {@link org.apache.flink.runtime.state.filesystem.FsStateBackend file system state backend}
* keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem
* (typically a replicated highly-available filesystem, like <a
* href="https://hadoop.apache.org/">HDFS</a>, <a href="https://ceph.com/">Ceph</a>, <a
* href="https://aws.amazon.com/documentation/s3/">S3</a>, <a
* href="https://cloud.google.com/storage/">GCS</a>, etc).
*
* <p>The {@code RocksDBStateBackend} stores working state in <a
* href="http://rocksdb.org/">RocksDB</a>, and checkpoints the state by default to a filesystem
* (similar to the {@code FsStateBackend}).
* <p>The {@code EmbeddedRocksDBStateBackend} stores working state in an embedded <a
* href="http://rocksdb.org/">RocksDB</a> and is able to scale working state to many terabytes in
* size, only limited by available disk space across all task managers.
*
* <h2>Raw Bytes Storage and Backends</h2>
*
* The {@code StateBackend} creates services for <i>raw bytes storage</i> and for <i>keyed state</i>
* and <i>operator state</i>.
*
* <p>The <i>raw bytes storage</i> (through the {@link CheckpointStreamFactory}) is the fundamental
* service that simply stores bytes in a fault tolerant fashion. This service is used by the
* JobManager to store checkpoint and recovery metadata and is typically also used by the keyed- and
* operator state backends to store checkpointed state.
* <p>The {@code StateBackend} creates services for for <i>keyed state</i> and <i>operator
* state</i>.
*
* <p>The {@link CheckpointableKeyedStateBackend} and {@link OperatorStateBackend} created by this
* state backend define how to hold the working state for keys and operators. They also define how
Expand All @@ -71,7 +58,7 @@
*
* <h2>Serializability</h2>
*
* State Backends need to be {@link java.io.Serializable serializable}, because they distributed
* <p>State Backends need to be {@link java.io.Serializable serializable}, because they distributed
* across parallel processes (for distributed execution) together with the streaming application
* code.
*
Expand All @@ -83,8 +70,8 @@
*
* <h2>Thread Safety</h2>
*
* State backend implementations have to be thread-safe. Multiple threads may be creating streams
* and keyed-/operator state backends concurrently.
* <p>State backend implementations have to be thread-safe. Multiple threads may be creating
* keyed-/operator state backends concurrently.
*/
@PublicEvolving
public interface StateBackend extends java.io.Serializable {
Expand Down
Expand Up @@ -22,9 +22,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.util.DynamicCodeLoadingException;
Expand All @@ -47,16 +46,19 @@ public class StateBackendLoader {
// Configuration shortcut names
// ------------------------------------------------------------------------

/** The shortcut configuration name of the HashMap state backend. */
public static final String HASHMAP_STATE_BACKEND_NAME = "hashmap";

/**
* The shortcut configuration name for the MemoryState backend that checkpoints to the
* JobManager
* JobManager.
*/
public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
@Deprecated public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";

/** The shortcut configuration name for the FileSystem State backend */
public static final String FS_STATE_BACKEND_NAME = "filesystem";
/** The shortcut configuration name for the FileSystem State backend. */
@Deprecated public static final String FS_STATE_BACKEND_NAME = "filesystem";

/** The shortcut configuration name for the RocksDB State Backend */
/** The shortcut configuration name for the RocksDB State Backend. */
public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -104,34 +106,39 @@ public static StateBackend loadStateBackendFromConfig(

switch (backendName.toLowerCase()) {
case MEMORY_STATE_BACKEND_NAME:
MemoryStateBackend memBackend =
MemoryStateBackend backend =
new MemoryStateBackendFactory().createFromConfig(config, classLoader);

if (logger != null) {
Path memExternalized = memBackend.getCheckpointPath();
String extern =
memExternalized == null
? ""
: " (externalized to " + memExternalized + ')';
logger.info(
"State backend is set to heap memory (checkpoint to JobManager) {}",
extern);
logger.warn(
"MemoryStateBackend has been deprecated. Please use 'hashmap' state "
+ "backend instead with JobManagerCheckpointStorage for equivalent "
+ "functionality");

logger.info("State backend is set to job manager {}", backend);
}
return memBackend;

return backend;
case FS_STATE_BACKEND_NAME:
FsStateBackend fsBackend =
new FsStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info(
"State backend is set to heap memory (checkpoints to filesystem \"{}\")",
fsBackend.getCheckpointPath());
logger.warn(
"{} state backend has been deprecated. Please use 'hashmap' state "
+ "backend instead.",
backendName.toLowerCase());
}
// fall through and use the HashMapStateBackend instead which
// utilizes the same HeapKeyedStateBackend runtime implementation.
case HASHMAP_STATE_BACKEND_NAME:
HashMapStateBackend hashMapStateBackend =
new HashMapStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info("State backend is set to heap memory {}", hashMapStateBackend);
}
return fsBackend;
return hashMapStateBackend;

case ROCKSDB_STATE_BACKEND_NAME:
factoryClassName =
"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory";
// fall through to the 'default' case that uses reflection to load the backend
// that way we can keep RocksDB in a separate module

Expand Down Expand Up @@ -170,7 +177,7 @@ public static StateBackend loadStateBackendFromConfig(
* Checks if an application-defined state backend is given, and if not, loads the state backend
* from the configuration, from the parameter 'state.backend', as defined in {@link
* CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
* default state backend (the {@link MemoryStateBackend}).
* default state backend (the {@link HashMapStateBackend}).
*
* <p>If an application-defined state backend is found, and the state backend is a {@link
* ConfigurableStateBackend}, this methods calls {@link
Expand Down Expand Up @@ -230,10 +237,10 @@ public static StateBackend fromApplicationOrConfigOrDefault(
backend = fromConfig;
} else {
// (3) use the default
backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
backend = new HashMapStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info(
"No state backend has been configured, using default (Memory / JobManager) {}",
"No state backend has been configured, using default (HashMap) {}",
backend);
}
}
Expand Down Expand Up @@ -283,6 +290,6 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo

// ------------------------------------------------------------------------

/** This class is not meant to be instantiated */
/** This class is not meant to be instantiated. */
private StateBackendLoader() {}
}
Expand Up @@ -66,7 +66,12 @@
*
* <p>A completed checkpoint writes its metadata into a file '{@value
* AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
*
* @deprecated State backends should no longer implement {@link CheckpointStorage} functionality.
* Please inherit {@link AbstractStateBackend} instead. Custom checkpoint storage can be
* additionally implemented as a separate class.
*/
@Deprecated
@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend
implements CheckpointStorage {
Expand Down

0 comments on commit 0a76dab

Please sign in to comment.