Skip to content

Commit

Permalink
[FLINK-34516] Use new CheckpointingMode in flink-core in python
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Mar 14, 2024
1 parent b3355f4 commit ccba6d5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
4 changes: 2 additions & 2 deletions flink-python/pyflink/datastream/checkpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_checkpointing_mode(self) -> CheckpointingMode:
:return: The :class:`CheckpointingMode`.
"""
return CheckpointingMode._from_j_checkpointing_mode(
self._j_checkpoint_config.getCheckpointingMode())
self._j_checkpoint_config.getConsistencyMode())

def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'CheckpointConfig':
"""
Expand All @@ -90,7 +90,7 @@ def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'Chec
:param checkpointing_mode: The :class:`CheckpointingMode`.
"""
self._j_checkpoint_config.setCheckpointingMode(
self._j_checkpoint_config.setConsistencyMode(
CheckpointingMode._to_j_checkpointing_mode(checkpointing_mode))
return self

Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/checkpointing_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ def _from_j_checkpointing_mode(j_checkpointing_mode) -> 'CheckpointingMode':
def _to_j_checkpointing_mode(self):
gateway = get_gateway()
JCheckpointingMode = \
gateway.jvm.org.apache.flink.streaming.api.CheckpointingMode
gateway.jvm.org.apache.flink.core.execution.CheckpointingMode
return getattr(JCheckpointingMode, self.name)
13 changes: 12 additions & 1 deletion flink-python/pyflink/datastream/stream_execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,18 @@ def get_checkpointing_mode(self) -> CheckpointingMode:
:return: The :class:`~pyflink.datastream.CheckpointingMode`.
"""
j_checkpointing_mode = self._j_stream_execution_environment.getCheckpointingMode()
j_checkpointing_mode = self._j_stream_execution_environment.getConsistencyMode()
return CheckpointingMode._from_j_checkpointing_mode(j_checkpointing_mode)

def get_checkpointing_consistency_mode(self) -> CheckpointingMode:
"""
Returns the checkpointing mode (exactly-once vs. at-least-once).
Shorthand for get_checkpoint_config().get_checkpointing_mode().
:return: The :class:`~pyflink.datastream.CheckpointingMode`.
"""
j_checkpointing_mode = self._j_stream_execution_environment.getConsistencyMode()
return CheckpointingMode._from_j_checkpointing_mode(j_checkpointing_mode)

def get_state_backend(self) -> StateBackend:
Expand Down

0 comments on commit ccba6d5

Please sign in to comment.