Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1263,11 +1263,15 @@ def get_estimator_state(self):
raise NotImplementedError(type(self))

def current_watermark(self):
# type: () -> timestamp.Timestamp

"""Return estimated output_watermark. This function must return
monotonically increasing watermarks."""
raise NotImplementedError(type(self))

def observe_timestamp(self, timestamp):
# type: (timestamp.Timestamp) -> None

"""Update tracking watermark with latest output timestamp.

Args:
Expand Down
29 changes: 14 additions & 15 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
if TYPE_CHECKING:
from apache_beam.transforms import sideinputs
from apache_beam.transforms.core import TimerSpec
from apache_beam.iobase import RestrictionTracker
from apache_beam.iobase import WatermarkEstimator


class NameContext(object):
Expand Down Expand Up @@ -296,6 +298,7 @@ def get_restriction_provider(self):
return self.process_method.restriction_provider

def get_watermark_estimator_provider(self):
# type: () -> WatermarkEstimatorProvider
return self.process_method.watermark_estimator_provider

def _validate(self):
Expand Down Expand Up @@ -333,15 +336,14 @@ def is_splittable_dofn(self):
return self.get_restriction_provider() is not None

def get_restriction_coder(self):
# type: () -> Optional[TupleCoder]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slight refactor here will avoid introducing a new mypy error:

  def get_restriction_coder(self):
    # type: () -> Optional[TupleCoder]
    """Get coder for a restriction when processing an SDF. """
    if self.is_splittable_dofn():
      return TupleCoder([
          (self.get_restriction_provider().restriction_coder()),
          (self.get_watermark_estimator_provider().estimator_state_coder())
      ])
    else:
      return None

This avoids having to declare the restriction_coder variable as Optional[TupleCoder].

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I think the return type should still be Optional[TupleCoder] given that it also returns None.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. my comment about avoiding the declaration of Optional[TupleCoder] refers to the variable (which my edit does away with), not the the return type.


"""Get coder for a restriction when processing an SDF. """
if self.is_splittable_dofn():
restriction_coder = TupleCoder([
return TupleCoder([
(self.get_restriction_provider().restriction_coder()),
(self.get_watermark_estimator_provider().estimator_state_coder())
])
else:
restriction_coder = None
return restriction_coder

def is_stateful_dofn(self):
# type: () -> bool
Expand Down Expand Up @@ -437,11 +439,11 @@ def create_invoker(
def invoke_process(self,
windowed_value, # type: WindowedValue
restriction_tracker=None, # type: Optional[RestrictionTracker]
watermark_estimator=None,
watermark_estimator=None, # type: Optional[WatermarkEstimator]
additional_args=None,
additional_kwargs=None
):
# type: (...) -> Optional[SplitResultType]
# type: (...) -> Optional[SplitResultResidual]

"""Invokes the DoFn.process() function.

Expand Down Expand Up @@ -524,7 +526,7 @@ def __init__(self,
def invoke_process(self,
windowed_value, # type: WindowedValue
restriction_tracker=None, # type: Optional[RestrictionTracker]
watermark_estimator=None,
watermark_estimator=None, # type: Optional[WatermarkEstimator]
additional_args=None,
additional_kwargs=None
):
Expand Down Expand Up @@ -557,8 +559,8 @@ def __init__(self,
signature.is_stateful_dofn())
self.user_state_context = user_state_context
self.is_splittable = signature.is_splittable_dofn()
self.threadsafe_restriction_tracker = None
self.threadsafe_watermark_estimator = None
self.threadsafe_restriction_tracker = None # type: Optional[ThreadsafeRestrictionTracker]
self.threadsafe_watermark_estimator = None # type: Optional[ThreadsafeWatermarkEstimator]
self.current_windowed_value = None # type: Optional[WindowedValue]
self.bundle_finalizer_param = bundle_finalizer_param
self.is_key_param_required = False
Expand Down Expand Up @@ -640,12 +642,12 @@ def __init__(self, placeholder):

def invoke_process(self,
windowed_value, # type: WindowedValue
restriction_tracker=None,
watermark_estimator=None,
restriction_tracker=None, # type: Optional[RestrictionTracker]
watermark_estimator=None, # type: Optional[WatermarkEstimator]
additional_args=None,
additional_kwargs=None
):
# type: (...) -> Optional[SplitResultType]
# type: (...) -> Optional[SplitResultResidual]
if not additional_args:
additional_args = []
if not additional_kwargs:
Expand Down Expand Up @@ -790,9 +792,6 @@ def _invoke_process_per_window(self,

if self.is_splittable:
assert self.threadsafe_restriction_tracker is not None
# TODO: Consider calling check_done right after SDF.Process() finishing.
# In order to do this, we need to know that current invoking dofn is
# ProcessSizedElementAndRestriction.
self.threadsafe_restriction_tracker.check_done()
deferred_status = self.threadsafe_restriction_tracker.deferred_status()
if deferred_status:
Expand Down
17 changes: 8 additions & 9 deletions sdks/python/apache_beam/runners/sdf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
from apache_beam.utils.windowed_value import WindowedValue

if TYPE_CHECKING:
from apache_beam.io.iobase import RestrictionProgress
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our linters do not enforce alphanumeric order for modules inside the TYPE_CHECKING block, but we should still be diligent about it. Can you move this up one, please.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for mentioning that!

from apache_beam.io.iobase import RestrictionTracker
from apache_beam.io.iobase import WatermarkEstimator

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -113,6 +115,7 @@ def check_done(self):
return self._restriction_tracker.check_done()

def current_progress(self):
# type: () -> RestrictionProgress
with self._lock:
return self._restriction_tracker.current_progress()

Expand Down Expand Up @@ -158,6 +161,7 @@ class RestrictionTrackerView(object):
restriction_tracker.
"""
def __init__(self, threadsafe_restriction_tracker):
# type: (ThreadsafeRestrictionTracker) -> None
if not isinstance(threadsafe_restriction_tracker,
ThreadsafeRestrictionTracker):
raise ValueError(
Expand All @@ -180,6 +184,7 @@ class ThreadsafeWatermarkEstimator(object):
mechanism to guarantee multi-thread safety.
"""
def __init__(self, watermark_estimator):
# type: (WatermarkEstimator) -> None
from apache_beam.io.iobase import WatermarkEstimator
if not isinstance(watermark_estimator, WatermarkEstimator):
raise ValueError('Initializing Threadsafe requires a WatermarkEstimator')
Expand All @@ -200,19 +205,13 @@ def get_estimator_state(self):
with self._lock:
return self._watermark_estimator.get_estimator_state()

def current_watermark_with_lock(self):
# The caller should hold the lock before entering this function.
if not self._lock.locked():
raise RuntimeError(
'Expected lock to be held to guarantee thread-safe '
'access.')
return self._watermark_estimator.current_watermark()

def current_watermark(self):
# type: () -> Timestamp
with self._lock:
return self.current_watermark_with_lock()
return self._watermark_estimator.current_watermark()

def observe_timestamp(self, timestamp):
# type: (Timestamp) -> None
if not isinstance(timestamp, Timestamp):
raise ValueError(
'Input of observe_timestamp should be a Timestamp '
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ def reset(self):
class _WatermarkEstimatorParam(_DoFnParam):
"""WatermarkEstomator DoFn parameter."""
def __init__(self, watermark_estimator_provider):
# type: (WatermarkEstimatorProvider) -> None
if not isinstance(watermark_estimator_provider, WatermarkEstimatorProvider):
raise ValueError(
'DoFn._WatermarkEstimatorParam expected'
Expand Down