Skip to content

Commit

Permalink
[BEAM-8537] Provide WatermarkEstimator to track watermark (#10375)
Browse files Browse the repository at this point in the history
* Provide WatermarkEstimator to track watermark

* Merged conflicts and resolved conflicts

* Change if to if not None

* Changed deferred_watermark to deferred_timestamp

* Add NoOpWatermarkEstimator

* Plump NoOpWatermarkEstimatorProvider through bundle_processor.

* Changes to try_split in common.py

* fix lint

* Fix common.pxd

* Fix formatter

* Fix formatter again

* Clean up if branch for watermark_estimator

* Add default_provider() method to watermark_estimators

* Use separate locks for ThreadsafeWatermarkEstimator and ThreadsafeRestrictionTracker

* Fix formatter again

* Add TODO to ManualWatermarkEstimator

* Fix Frometter

* Fix lint

* More comments

* Minor performance update for critical piece of code.

Co-authored-by: Robert Bradshaw <robertwb@gmail.com>
  • Loading branch information
boyuanzz and robertwb committed Feb 21, 2020
1 parent fdaf5d8 commit c6f812f
Show file tree
Hide file tree
Showing 12 changed files with 563 additions and 200 deletions.
35 changes: 35 additions & 0 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
'RangeTracker',
'Read',
'RestrictionTracker',
'WatermarkEstimator',
'Sink',
'Write',
'Writer'
Expand Down Expand Up @@ -1243,6 +1244,40 @@ def try_claim(self, position):
raise NotImplementedError


class WatermarkEstimator(object):
"""A WatermarkEstimator which is used for estimating output_watermark based on
the timestamp of output records or manual modifications.
The base class provides common APIs that are called by the framework, which
are also accessible inside a DoFn.process() body. Derived watermark estimator
should implement all APIs listed below. Additional methods can be implemented
and will be available when invoked within a DoFn.
Internal state must not be updated asynchronously.
"""
def get_estimator_state(self):
"""Get current state of the WatermarkEstimator instance, which can be used
to recreate the WatermarkEstimator when processing the restriction. See
WatermarkEstimatorProvider.create_watermark_estimator.
"""
raise NotImplementedError(type(self))

def current_watermark(self):
"""Return estimated output_watermark. This function must return
monotonically increasing watermarks."""
raise NotImplementedError(type(self))

def observe_timestamp(self, timestamp):
"""Update tracking watermark with latest output timestamp.
Args:
timestamp: the `timestamp.Timestamp` of current output element.
This is called with the timestamp of every element output from the DoFn.
"""
raise NotImplementedError(type(self))


class RestrictionProgress(object):
"""Used to record the progress of a restriction.
Expand Down
150 changes: 150 additions & 0 deletions sdks/python/apache_beam/io/watermark_estimators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A collection of WatermarkEstimator implementations that SplittableDoFns
can use."""

# pytype: skip-file

from __future__ import absolute_import

from apache_beam.io.iobase import WatermarkEstimator
from apache_beam.transforms.core import WatermarkEstimatorProvider
from apache_beam.utils.timestamp import Timestamp


class MonotonicWatermarkEstimator(WatermarkEstimator):
"""A WatermarkEstimator which assumes that timestamps of all ouput records
are increasing monotonically.
"""
def __init__(self, timestamp):
"""For a new <element, restriction> pair, the initial value is None. When
resuming processing, the initial timestamp will be the last reported
watermark.
"""
self._watermark = timestamp

def observe_timestamp(self, timestamp):
if self._watermark is None:
self._watermark = timestamp
else:
# TODO(BEAM-9312): Consider making it configurable to deal with late
# timestamp.
if timestamp < self._watermark:
raise ValueError(
'A MonotonicWatermarkEstimator expects output '
'timestamp to be increasing monotonically.')
self._watermark = timestamp

def current_watermark(self):
return self._watermark

def get_estimator_state(self):
return self._watermark

@staticmethod
def default_provider():
"""Provide a default WatermarkEstimatorProvider for
MonotonicWatermarkEstimator.
"""
class DefaultMonotonicWatermarkEstimator(WatermarkEstimatorProvider):
def initial_estimator_state(self, element, restriction):
return None

def create_watermark_estimator(self, estimator_state):
return MonotonicWatermarkEstimator(estimator_state)

return DefaultMonotonicWatermarkEstimator()


class WalltimeWatermarkEstimator(WatermarkEstimator):
"""A WatermarkEstimator which uses processing time as the estimated watermark.
"""
def __init__(self, timestamp=None):
self._timestamp = timestamp or Timestamp.now()

def observe_timestamp(self, timestamp):
pass

def current_watermark(self):
self._timestamp = max(self._timestamp, Timestamp.now())
return self._timestamp

def get_estimator_state(self):
return self._timestamp

@staticmethod
def default_provider():
"""Provide a default WatermarkEstimatorProvider for
WalltimeWatermarkEstimator.
"""
class DefaultWalltimeWatermarkEstimator(WatermarkEstimatorProvider):
def initial_estimator_state(self, element, restriction):
return None

def create_watermark_estimator(self, estimator_state):
return WalltimeWatermarkEstimator(estimator_state)

return DefaultWalltimeWatermarkEstimator()


class ManualWatermarkEstimator(WatermarkEstimator):
"""A WatermarkEstimator which is controlled manually from within a DoFn.
The DoFn must invoke set_watermark to advance the watermark.
"""
def __init__(self, watermark):
self._watermark = watermark

def observe_timestamp(self, timestamp):
pass

def current_watermark(self):
return self._watermark

def get_estimator_state(self):
return self._watermark

def set_watermark(self, timestamp):
# Please call set_watermark after calling restriction_tracker.try_claim() to
# prevent advancing watermark early.
# TODO(BEAM-7473): It's possible that getting a slightly stale watermark
# when performing split.
if not isinstance(timestamp, Timestamp):
raise ValueError('set_watermark expects a Timestamp as input')
if self._watermark and self._watermark > timestamp:
raise ValueError(
'Watermark must be monotonically increasing.'
'Provided watermark %s is less than '
'current watermark %s',
timestamp,
self._watermark)
self._watermark = timestamp

@staticmethod
def default_provider():
"""Provide a default WatermarkEstimatorProvider for
WalltimeWatermarkEstimator.
"""
class DefaultManualWatermarkEstimatorProvider(WatermarkEstimatorProvider):
def initial_estimator_state(self, element, restriction):
return None

def create_watermark_estimator(self, estimator_state):
return ManualWatermarkEstimator(estimator_state)

return DefaultManualWatermarkEstimatorProvider()
106 changes: 106 additions & 0 deletions sdks/python/apache_beam/io/watermark_estimators_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for built-in WatermarkEstimators"""

# pytype: skip-file

from __future__ import absolute_import

import unittest

import mock

from apache_beam.io.iobase import WatermarkEstimator
from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
from apache_beam.io.watermark_estimators import MonotonicWatermarkEstimator
from apache_beam.io.watermark_estimators import WalltimeWatermarkEstimator
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp


class MonotonicWatermarkEstimatorTest(unittest.TestCase):
def test_initialize_from_state(self):
timestamp = Timestamp(10)
watermark_estimator = MonotonicWatermarkEstimator(timestamp)
self.assertIsInstance(watermark_estimator, WatermarkEstimator)
self.assertEqual(watermark_estimator.get_estimator_state(), timestamp)

def test_observe_timestamp(self):
watermark_estimator = MonotonicWatermarkEstimator(Timestamp(10))
watermark_estimator.observe_timestamp(Timestamp(15))
self.assertEqual(watermark_estimator.current_watermark(), Timestamp(15))
watermark_estimator.observe_timestamp(Timestamp(20))
self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20))
watermark_estimator.observe_timestamp(Timestamp(20))
self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20))
with self.assertRaises(ValueError):
watermark_estimator.observe_timestamp(Timestamp(10))

def test_get_estimator_state(self):
watermark_estimator = MonotonicWatermarkEstimator(Timestamp(10))
watermark_estimator.observe_timestamp(Timestamp(15))
self.assertEqual(watermark_estimator.get_estimator_state(), Timestamp(15))


class WalltimeWatermarkEstimatorTest(unittest.TestCase):
@mock.patch('apache_beam.utils.timestamp.Timestamp.now')
def test_initialization(self, mock_timestamp):
now_time = Timestamp.now() - Duration(10)
mock_timestamp.side_effect = lambda: now_time
watermark_estimator = WalltimeWatermarkEstimator()
self.assertIsInstance(watermark_estimator, WatermarkEstimator)
self.assertEqual(watermark_estimator.get_estimator_state(), now_time)

def test_observe_timestamp(self):
now_time = Timestamp.now() + Duration(10)
watermark_estimator = WalltimeWatermarkEstimator(now_time)
watermark_estimator.observe_timestamp(Timestamp(10))
watermark_estimator.observe_timestamp(Timestamp(10))
self.assertEqual(watermark_estimator.current_watermark(), now_time)

def test_advance_watermark_with_incorrect_sys_clock(self):
initial_timestamp = Timestamp.now() + Duration(100)
watermark_estimator = WalltimeWatermarkEstimator(initial_timestamp)
self.assertEqual(watermark_estimator.current_watermark(), initial_timestamp)
self.assertEqual(
watermark_estimator.get_estimator_state(), initial_timestamp)


class ManualWatermarkEstimatorTest(unittest.TestCase):
def test_initialization(self):
watermark_estimator = ManualWatermarkEstimator(None)
self.assertIsNone(watermark_estimator.get_estimator_state())
self.assertIsNone(watermark_estimator.current_watermark())
watermark_estimator = ManualWatermarkEstimator(Timestamp(10))
self.assertEqual(watermark_estimator.get_estimator_state(), Timestamp(10))

def test_set_watermark(self):
watermark_estimator = ManualWatermarkEstimator(None)
self.assertIsNone(watermark_estimator.current_watermark())
watermark_estimator.observe_timestamp(Timestamp(10))
self.assertIsNone(watermark_estimator.current_watermark())
watermark_estimator.set_watermark(Timestamp(20))
self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20))
watermark_estimator.set_watermark(Timestamp(30))
self.assertEqual(watermark_estimator.current_watermark(), Timestamp(30))
with self.assertRaises(ValueError):
watermark_estimator.set_watermark(Timestamp(25))


if __name__ == '__main__':
unittest.main()
17 changes: 9 additions & 8 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ cdef class MethodWrapper(object):
cdef object key_arg_name
cdef object restriction_provider
cdef object restriction_provider_arg_name
cdef object watermark_estimator
cdef object watermark_estimator_arg_name
cdef object watermark_estimator_provider
cdef object watermark_estimator_provider_arg_name


cdef class DoFnSignature(object):
Expand All @@ -52,8 +52,8 @@ cdef class DoFnSignature(object):
cdef public MethodWrapper finish_bundle_method
cdef public MethodWrapper setup_lifecycle_method
cdef public MethodWrapper teardown_lifecycle_method
cdef public MethodWrapper create_watermark_estimator_method
cdef public MethodWrapper initial_restriction_method
cdef public MethodWrapper restriction_coder_method
cdef public MethodWrapper create_tracker_method
cdef public MethodWrapper split_method
cdef public object do_fn
Expand All @@ -69,12 +69,12 @@ cdef class DoFnInvoker(object):

cpdef invoke_process(self, WindowedValue windowed_value,
restriction_tracker=*,
watermark_estimator=*,
additional_args=*, additional_kwargs=*)
cpdef invoke_start_bundle(self)
cpdef invoke_finish_bundle(self)
cpdef invoke_split(self, element, restriction)
cpdef invoke_initial_restriction(self, element)
cpdef invoke_restriction_coder(self)
cpdef invoke_create_tracker(self, restriction)


Expand All @@ -93,8 +93,7 @@ cdef class PerWindowInvoker(DoFnInvoker):
cdef object process_method
cdef bint is_splittable
cdef object threadsafe_restriction_tracker
cdef object watermark_estimator
cdef object watermark_estimator_param
cdef object threadsafe_watermark_estimator
cdef WindowedValue current_windowed_value
cdef bint is_key_param_required

Expand All @@ -111,7 +110,8 @@ cdef class DoFnRunner:
cdef class OutputProcessor(object):
@cython.locals(windowed_value=WindowedValue,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results)
cpdef process_outputs(self, WindowedValue element, results,
watermark_estimator=*)


cdef class _OutputProcessor(OutputProcessor):
Expand All @@ -121,7 +121,8 @@ cdef class _OutputProcessor(OutputProcessor):
cdef DataflowDistributionCounter per_element_output_counter
@cython.locals(windowed_value=WindowedValue,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results)
cpdef process_outputs(self, WindowedValue element, results,
watermark_estimator=*)

cdef class DoFnContext(object):
cdef object label
Expand Down
Loading

0 comments on commit c6f812f

Please sign in to comment.