Skip to content

Commit

Permalink
[FLINK-22052][python] Add new checkpoint storage classes to PyFlink
Browse files Browse the repository at this point in the history
  • Loading branch information
sjwiesman committed Apr 1, 2021
1 parent bcf55e0 commit b8d1add
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 1 deletion.
10 changes: 9 additions & 1 deletion flink-python/pyflink/datastream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
SinkFunction)
from pyflink.datastream.state_backend import (StateBackend, MemoryStateBackend, FsStateBackend,
RocksDBStateBackend, CustomStateBackend,
PredefinedOptions)
PredefinedOptions, HashMapStateBackend,
EmbeddedRocksDBStateBackend)
from pyflink.datastream.checkpoint_storage import (CheckpointStorage, JobManagerCheckpointStorage,
FileSystemCheckpointStorage,
CustomCheckpointStorage)
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.datastream.time_characteristic import TimeCharacteristic
from pyflink.datastream.time_domain import TimeDomain
Expand Down Expand Up @@ -108,6 +112,10 @@
'RocksDBStateBackend',
'CustomStateBackend',
'PredefinedOptions',
'CheckpointStorage',
'JobManagerCheckpointStorage',
'FileSystemCheckpointStorage',
'CustomCheckpointStorage',
'ExternalizedCheckpointCleanup',
'TimeCharacteristic',
'TimeDomain',
Expand Down
39 changes: 39 additions & 0 deletions flink-python/pyflink/datastream/checkpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from enum import Enum
from typing import Optional

from pyflink.datastream.checkpoint_storage import CheckpointStorage, _from_j_checkpoint_storage
from pyflink.datastream.checkpointing_mode import CheckpointingMode
from pyflink.java_gateway import get_gateway

Expand Down Expand Up @@ -330,6 +331,44 @@ def disable_unaligned_checkpoints(self) -> 'CheckpointConfig':
self.enable_unaligned_checkpoints(False)
return self

def set_checkpoint_storage(self, storage: CheckpointStorage) -> 'CheckpointConfig':
"""
Checkpoint storage defines how stat backends checkpoint their state for fault
tolerance in streaming applications. Various implementations store their checkpoints
in different fashions and have different requirements and availability guarantees.
For example, `JobManagerCheckpointStorage` stores checkpoints in the memory of the
JobManager. It is lightweight and without additional dependencies but is not highly
available and only supports small state sizes. This checkpoint storage policy is convenient
for local testing and development.
The `FileSystemCheckpointStorage` stores checkpoints in a filesystem. For systems like
HDFS, NFS Drivs, S3, and GCS, this storage policy supports large state size, in the
magnitude of many terabytes while providing a highly available foundation for stateful
applications. This checkpoint storage policy is recommended for most production deployments.
"""
self._j_checkpoint_config.setCheckpointStorage(storage._j_checkpoint_storage)
return self

def set_checkpoint_storage_dir(self, checkpoint_path: str) -> 'CheckpointConfig':
"""
Configures the application to write out checkpoint snapshots to the configured directory.
See `FileSystemCheckpointStorage` for more details on checkpointing to a file system.
"""
self._j_checkpoint_config.setCheckpointStorage(checkpoint_path)
return self

def get_checkpoint_storage(self) -> Optional[CheckpointStorage]:
"""
The checkpoint storage that has been configured for the Job, or None if
none has been set.
"""
j_storage = self._j_checkpoint_config.getCheckpointStorage()
if j_storage is None:
return None
else:
return _from_j_checkpoint_storage(j_storage)


class ExternalizedCheckpointCleanup(Enum):
"""
Expand Down
Loading

0 comments on commit b8d1add

Please sign in to comment.