From 96aa09cea0151c8bfef9ef0fc8a23b3beb8f60cf Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Tue, 30 Mar 2021 14:14:01 -0500 Subject: [PATCH] [FLINK-22052][python] Add new state backends to PyFlink --- flink-python/pyflink/datastream/__init__.py | 2 + .../pyflink/datastream/state_backend.py | 338 +++++++++++++++++- .../stream_execution_environment.py | 2 +- .../datastream/tests/test_state_backend.py | 82 ++++- 4 files changed, 405 insertions(+), 19 deletions(-) diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py index 3bd3fe6600b7f1..0bdfe2021979b0 100644 --- a/flink-python/pyflink/datastream/__init__.py +++ b/flink-python/pyflink/datastream/__init__.py @@ -99,6 +99,8 @@ 'SourceFunction', 'StateBackend', 'MapFunction', + 'HashMapStateBackend', + 'EmbeddedRocksDBStateBackend', 'MemoryStateBackend', 'FsStateBackend', 'RocksDBStateBackend', diff --git a/flink-python/pyflink/datastream/state_backend.py b/flink-python/pyflink/datastream/state_backend.py index f3c74b968fc748..b5095ad01075cd 100644 --- a/flink-python/pyflink/datastream/state_backend.py +++ b/flink-python/pyflink/datastream/state_backend.py @@ -27,6 +27,8 @@ __all__ = [ 'StateBackend', + 'HashMapStateBackend', + 'EmbeddedRocksDBStateBackend', 'MemoryStateBackend', 'FsStateBackend', 'RocksDBStateBackend', @@ -39,6 +41,9 @@ def _from_j_state_backend(j_state_backend): return None gateway = get_gateway() JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend + JHashMapStateBackend = gateway.jvm.org.apache.flink.runtime.state.hashmap.HashMapStateBackend + JEmbeddedRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.\ + EmbeddedRocksDBStateBackend JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory.MemoryStateBackend JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem.FsStateBackend JRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBStateBackend @@ -47,7 +52,11 @@ def _from_j_state_backend(j_state_backend): if not get_java_class(JStateBackend).isAssignableFrom(j_clz): raise TypeError("The input %s is not an instance of StateBackend." % j_state_backend) - if get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()): + if get_java_class(JHashMapStateBackend).isAssignableFrom(j_state_backend.getClass()): + return HashMapStateBackend(j_hashmap_state_backend=j_state_backend.getClass()) + elif get_java_class(JEmbeddedRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()): + return EmbeddedRocksDBStateBackend(j_embedded_rocks_db_state_backend=j_state_backend) + elif get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()): return MemoryStateBackend(j_memory_state_backend=j_state_backend) elif get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()): return FsStateBackend(j_fs_state_backend=j_state_backend) @@ -59,34 +68,28 @@ def _from_j_state_backend(j_state_backend): class StateBackend(object, metaclass=ABCMeta): """ - A **State Backend** defines how the state of a streaming application is stored and - checkpointed. Different State Backends store their state in different fashions, and use - different data structures to hold the state of a running application. + A **State Backend** defines how the state of a streaming application is stored locally within + the cluster. Different state backends store their state in different fashions, and use different + data structures to hold the state of running applications. - For example, the :class:`MemoryStateBackend` keeps working state in the memory of the - TaskManager and stores checkpoints in the memory of the JobManager. The backend is - lightweight and without additional dependencies, but not highly available and supports only - small state. + For example, the :class:`HashMapStateBackend` keeps working state in the memory of the + TaskManager. The backend is lightweight and without additional dependencies. - The :class:`FsStateBackend` keeps working state in the memory of the TaskManager and stores - state checkpoints in a filesystem(typically a replicated highly-available filesystem, + The :class:`EmbeddedRocksDBStateBackend` keeps working state in the memory of the TaskManager + and stores state checkpoints in a filesystem(typically a replicated highly-available filesystem, like `HDFS `_, `Ceph `_, `S3 `_, `GCS `_, etc). - The :class:`RocksDBStateBackend` stores working state in `RocksDB `_, - and checkpoints the state by default to a filesystem (similar to the :class:`FsStateBackend`). + The :class:`EmbeddedRocksDBStateBackend` stores working state in an embedded + `RocksDB `_, instance and is able to scale working state to many + terrabytes in size, only limited by available disk space across all task amangers. **Raw Bytes Storage and Backends** The :class:`StateBackend` creates services for *raw bytes storage* and for *keyed state* and *operator state*. - The *raw bytes storage* (through the `org.apache.flink.runtime.state.CheckpointStreamFactory`) - is the fundamental service that simply stores bytes in a fault tolerant fashion. This service - is used by the JobManager to store checkpoint and recovery metadata and is typically also used - by the keyed- and operator state backends to store checkpointed state. - The `org.apache.flink.runtime.state.AbstractKeyedStateBackend and `org.apache.flink.runtime.state.OperatorStateBackend` created by this state backend define how to hold the working state for keys and operators. They also define how to checkpoint that @@ -116,8 +119,280 @@ def __init__(self, j_state_backend): self._j_state_backend = j_state_backend +class HashMapStateBackend(StateBackend): + """ + This state backend holds the working state in the memory (JVM heap) of the TaskManagers + and checkpoints based on the configured CheckpointStorage. + + **State Size Considerations** + + Working state is kept on the TaskManager heap. If a TaskManager executes multiple + tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) + then the aggregate state of all tasks needs to fit into that TaskManager's memory. + + **Configuration** + + As for all state backends, this backend can either be configured within the application (by + creating the backend with the respective constructor parameters and setting it on the execution + environment) or by specifying it in the Flink configuration. + + If the state backend was specified in the application, it may pick up additional configuration + parameters from the Flink configuration. For example, if the backend if configured in the + application without a default savepoint directory, it will pick up a default savepoint + directory specified in the Flink configuration of the running job/cluster. That behavior is + implemented via the :func:`configure` method. + """ + + def __init__(self, j_hashmap_state_backend=None): + """ + Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint + metadata and savepoints to, as well as configuring state thresholds and asynchronous + operations. + + WARNING: Increasing the size of this value beyond the default value + (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care. + The checkpointed state needs to be send to the JobManager via limited size RPC messages, + and there and the JobManager needs to be able to hold all aggregated state in its memory. + + Example: + :: + >>> state_backend = HashMapStateBackend() + + :param j_hashmap_state_backend: For internal use, please keep none. + """ + if j_hashmap_state_backend is None: + gateway = get_gateway() + JHashMapStateBackend = gateway.jvm.org.apache.flink.runtime.state.hashmap\ + .HashMapStateBackend + + j_hashmap_state_backend = JHashMapStateBackend() + + self._j_hashmap_state_backend = j_hashmap_state_backend + super(HashMapStateBackend, self).__init__(j_hashmap_state_backend) + + def __str__(self): + return self._j_hashmap_state_backend.toString() + + +class EmbeddedRocksDBStateBackend(StateBackend): + """ + A State Backend that stores its state in an embedded ``RocksDB`` instance. This state backend + can store very large state that exceeds memory and spills to local disk. + + All key/value state (including windows) is stored in the key/value index of RocksDB. + For persistence against loss of machines, please configure a CheckpointStorage instance + for the Job. + + The behavior of the RocksDB instances can be parametrized by setting RocksDB Options + using the methods :func:`set_predefined_options` and :func:`set_options`. + """ + + def __init__(self, + enable_incremental_checkpointing=None, + j_embedded_rocks_db_state_backend=None): + """ + Creates a new :class:`EmbeddedRocksDBStateBackend` for storing local state. + + Example: + :: + + >>> state_backend = EmbeddedRocksDBStateBackend() + + :param enable_incremental_checkpointing: True if incremental checkpointing is enabled. + :param j_embedded_rocks_db_state_backend: For internal use, please keep none. + """ + if j_embedded_rocks_db_state_backend is None: + gateway = get_gateway() + JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean + JEmbeddedRocksDBStateBackend = gateway.jvm.org.apache.flink.contrib.streaming.state \ + .EmbeddedRocksDBStateBackend + + if enable_incremental_checkpointing not in (None, True, False): + raise TypeError("Unsupported input for 'enable_incremental_checkpointing': %s, " + "the value of the parameter should be None or" + "True or False.") + + if enable_incremental_checkpointing is None: + j_enable_incremental_checkpointing = JTernaryBoolean.UNDEFINED + elif enable_incremental_checkpointing is True: + j_enable_incremental_checkpointing = JTernaryBoolean.TRUE + else: + j_enable_incremental_checkpointing = JTernaryBoolean.FALSE + + j_embedded_rocks_db_state_backend = \ + JEmbeddedRocksDBStateBackend(j_enable_incremental_checkpointing) + + self._j_embedded_rocks_db_state_backend = j_embedded_rocks_db_state_backend + super(EmbeddedRocksDBStateBackend, self).__init__(j_embedded_rocks_db_state_backend) + + def set_db_storage_paths(self, *paths: str): + """ + Sets the directories in which the local RocksDB database puts its files (like SST and + metadata files). These directories do not need to be persistent, they can be ephemeral, + meaning that they are lost on a machine failure, because state in RocksDB is persisted + in checkpoints. + + If nothing is configured, these directories default to the TaskManager's local + temporary file directories. + + Each distinct state will be stored in one path, but when the state backend creates + multiple states, they will store their files on different paths. + + Passing ``None`` to this function restores the default behavior, where the configured + temp directories will be used. + + :param paths: The paths across which the local RocksDB database files will be spread. this + parameter is optional. + """ + if len(paths) < 1: + self._j_embedded_rocks_db_state_backend.setDbStoragePath(None) + else: + gateway = get_gateway() + j_path_array = gateway.new_array(gateway.jvm.String, len(paths)) + for i in range(0, len(paths)): + j_path_array[i] = paths[i] + self._j_embedded_rocks_db_state_backend.setDbStoragePaths(j_path_array) + + def get_db_storage_paths(self) -> List[str]: + """ + Gets the configured local DB storage paths, or null, if none were configured. + + Under these directories on the TaskManager, RocksDB stores its SST files and + metadata files. These directories do not need to be persistent, they can be ephermeral, + meaning that they are lost on a machine failure, because state in RocksDB is persisted + in checkpoints. + + If nothing is configured, these directories default to the TaskManager's local + temporary file directories. + + :return: The list of configured local DB storage paths. + """ + return list(self._j_embedded_rocks_db_state_backend.getDbStoragePaths()) + + def is_incremental_checkpoints_enabled(self) -> bool: + """ + Gets whether incremental checkpoints are enabled for this state backend. + + :return: True if incremental checkpoints are enabled, false otherwise. + """ + return self._j_embedded_rocks_db_state_backend.isIncrementalCheckpointsEnabled() + + def set_predefined_options(self, options: 'PredefinedOptions'): + """ + Sets the predefined options for RocksDB. + + If user-configured options within ``RocksDBConfigurableOptions`` is set (through + flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + then the options from the factory are applied on top of the here specified + predefined options and customized options. + + Example: + :: + + >>> state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED) + + :param options: The options to set (must not be null), see :class:`PredefinedOptions`. + """ + self._j_embedded_rocks_db_state_backend\ + .setPredefinedOptions(options._to_j_predefined_options()) + + def get_predefined_options(self) -> 'PredefinedOptions': + """ + Gets the current predefined options for RocksDB. + The default options (if nothing was set via :func:`setPredefinedOptions`) + are :data:`PredefinedOptions.DEFAULT`. + + If user-configured options within ``RocksDBConfigurableOptions`` is set (through + flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + then the options from the factory are applied on top of the predefined and customized + options. + + .. seealso:: :func:`set_predefined_options` + + :return: Current predefined options. + """ + j_predefined_options = self._j_embedded_rocks_db_state_backend.getPredefinedOptions() + return PredefinedOptions._from_j_predefined_options(j_predefined_options) + + def set_options(self, options_factory_class_name: str): + """ + Sets ``org.rocksdb.Options`` for the RocksDB instances. + Because the options are not serializable and hold native code references, + they must be specified through a factory. + + The options created by the factory here are applied on top of the pre-defined + options profile selected via :func:`set_predefined_options`. + If the pre-defined options profile is the default (:data:`PredefinedOptions.DEFAULT`), + then the factory fully controls the RocksDB options. + + :param options_factory_class_name: The fully-qualified class name of the options + factory in Java that lazily creates the RocksDB options. + The options factory must have a default constructor. + """ + gateway = get_gateway() + JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory + j_options_factory_clz = load_java_class(options_factory_class_name) + if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz): + raise ValueError("The input class does not implement RocksDBOptionsFactory.") + self._j_embedded_rocks_db_state_backend\ + .setRocksDBOptions(j_options_factory_clz.newInstance()) + + def get_options(self) -> Optional[str]: + """ + Gets the fully-qualified class name of the options factory in Java that lazily creates + the RocksDB options. + + :return: The fully-qualified class name of the options factory in Java. + """ + j_options_factory = self._j_embedded_rocks_db_state_backend.getRocksDBOptions() + if j_options_factory is not None: + return j_options_factory.getClass().getName() + else: + return None + + def get_number_of_transfering_threads(self) -> int: + """ + Gets the number of threads used to transfer files while snapshotting/restoring. + + :return: The number of threads used to transfer files while snapshotting/restoring. + """ + return self._j_embedded_rocks_db_state_backend.getNumberOfTransferingThreads() + + def set_number_of_transfering_threads(self, number_of_transfering_threads: int): + """ + Sets the number of threads used to transfer files while snapshotting/restoring. + + :param number_of_transfering_threads: The number of threads used to transfer files while + snapshotting/restoring. + """ + self._j_embedded_rocks_db_state_backend\ + .setNumberOfTransferingThreads(number_of_transfering_threads) + + def __str__(self): + return self._j_embedded_rocks_db_state_backend.toString() + + class MemoryStateBackend(StateBackend): """ + **IMPORTANT** `MemoryStateBackend` is deprecated in favor of `HashMapStateBackend` and + `JobManagerCheckpointStorage`. This change does not affect the runtime characteristics of your + Jobs and is simply an API change to help better communicate the ways Flink separates local state + storage from fault tolerance. Jobs can be upgraded without loss of state. If configuring + your state backend via the `StreamExecutionEnvironment` please make the following changes. + + :: + + >> env.set_state_backend(HashMapStateBackend()) + >> env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) + + If you are configuring your state backend via the `flink-conf.yaml` please make the following + changes. + + ``` + state.backend: hashmap + state.checkpoint-storage: jobmanager + ``` + This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), but the checkpoints will be persisted to a file system for high-availability setups and @@ -251,6 +526,21 @@ def __str__(self): class FsStateBackend(StateBackend): """ + **IMPORTANT** `FsStateBackend is deprecated in favor of `HashMapStateBackend` and + `FileSystemCheckpointStorage`. This change does not affect the runtime characteristics + of your Jobs and is simply an API change to help better communicate the ways Flink separates + local state storage from fault tolerance. Jobs can be upgraded without loss of state. If + configuring your state backend via the `StreamExecutionEnvironment` please make the following + changes. + + :: + + >> env.set_state_backend(HashMapStateBackend()) + >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints") + + If you are configuring your state backend via the `flink-conf.yaml` please set your state + backend type to `hashmap`. + This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The state backend checkpoints state as files to a file system (hence the backend's name). @@ -419,6 +709,20 @@ def get_write_buffer_size(self) -> int: class RocksDBStateBackend(StateBackend): """ + **IMPORTANT** `RocksDBStateBackend` is deprecated in favor of `EmbeddedRocksDBStateBackend` + and `FileSystemCheckpointStorage`. This change does not affect the runtime characteristics of + your Jobs and is simply an API change to help better communicate the ways Flink separates + local state storage from fault tolerance. Jobs can be upgraded without loss of state. If + configuring your state backend via the `StreamExecutionEnvironment` please make the following + changes. + + :: + + >> env.set_state_backend(EmbeddedRocksDBStateBackend()) + >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints") + + If you are configuring your state backend via the `flink-conf.yaml` no changes are required. + A State Backend that stores its state in ``RocksDB``. This state backend can store very large state that exceeds memory and spills to disk. diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index 55f9706eafca1b..97fc330d78446a 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -293,7 +293,7 @@ def set_state_backend(self, state_backend: StateBackend) -> 'StreamExecutionEnvi Example: :: - >>> env.set_state_backend(RocksDBStateBackend("file://var/checkpoints/")) + >>> env.set_state_backend(EmbeddedRocksDBStateBackend()) :param state_backend: The :class:`StateBackend`. :return: This object. diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py b/flink-python/pyflink/datastream/tests/test_state_backend.py index e9ae8ec3127b14..be6942fb9a8b43 100644 --- a/flink-python/pyflink/datastream/tests/test_state_backend.py +++ b/flink-python/pyflink/datastream/tests/test_state_backend.py @@ -17,7 +17,8 @@ ################################################################################ from pyflink.datastream.state_backend import (_from_j_state_backend, CustomStateBackend, MemoryStateBackend, FsStateBackend, - RocksDBStateBackend, PredefinedOptions) + RocksDBStateBackend, PredefinedOptions, + EmbeddedRocksDBStateBackend) from pyflink.java_gateway import get_gateway from pyflink.pyflink_gateway_server import on_windows from pyflink.testing.test_case_utils import PyFlinkTestCase @@ -96,6 +97,85 @@ def test_get_checkpoint_path(self): self.assertEqual(state_backend.get_checkpoint_path(), "file://var/checkpoints") +class EmbeddedRocksDBStateBackendTests(PyFlinkTestCase): + + def test_create_rocks_db_state_backend(self): + + self.assertIsNotNone(EmbeddedRocksDBStateBackend()) + + self.assertIsNotNone(EmbeddedRocksDBStateBackend(True)) + + self.assertIsNotNone(EmbeddedRocksDBStateBackend(False)) + + def test_get_set_db_storage_paths(self): + if on_windows(): + storage_path = ["file:/C:/var/db_storage_dir1/", + "file:/C:/var/db_storage_dir2/", + "file:/C:/var/db_storage_dir3/"] + expected = ["C:\\var\\db_storage_dir1", + "C:\\var\\db_storage_dir2", + "C:\\var\\db_storage_dir3"] + else: + storage_path = ["file://var/db_storage_dir1/", + "file://var/db_storage_dir2/", + "file://var/db_storage_dir3/"] + expected = ["/db_storage_dir1", + "/db_storage_dir2", + "/db_storage_dir3"] + + state_backend = EmbeddedRocksDBStateBackend() + state_backend.set_db_storage_paths(*storage_path) + self.assertEqual(state_backend.get_db_storage_paths(), expected) + + def test_get_set_predefined_options(self): + + state_backend = EmbeddedRocksDBStateBackend() + + self.assertEqual(state_backend.get_predefined_options(), PredefinedOptions.DEFAULT) + + state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) + + self.assertEqual(state_backend.get_predefined_options(), + PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) + + state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED) + + self.assertEqual(state_backend.get_predefined_options(), + PredefinedOptions.SPINNING_DISK_OPTIMIZED) + + state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED) + + self.assertEqual(state_backend.get_predefined_options(), + PredefinedOptions.FLASH_SSD_OPTIMIZED) + + state_backend.set_predefined_options(PredefinedOptions.DEFAULT) + + self.assertEqual(state_backend.get_predefined_options(), PredefinedOptions.DEFAULT) + + def test_get_set_options(self): + + state_backend = EmbeddedRocksDBStateBackend() + + self.assertIsNone(state_backend.get_options()) + + state_backend.set_options( + "org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory") + + self.assertEqual(state_backend.get_options(), + "org.apache.flink.contrib.streaming.state." + "DefaultConfigurableOptionsFactory") + + def test_get_set_number_of_transfering_threads(self): + + state_backend = EmbeddedRocksDBStateBackend() + + self.assertEqual(state_backend.get_number_of_transfering_threads(), 1) + + state_backend.set_number_of_transfering_threads(4) + + self.assertEqual(state_backend.get_number_of_transfering_threads(), 4) + + class RocksDBStateBackendTests(PyFlinkTestCase): def test_create_rocks_db_state_backend(self):