-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1872] Add IdentityWindowFn for use in Reshuffle #4040
Conversation
R: @robertwb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if IdentityWindowFn is the right thing to use for Reshuffle. Instead, it seems it'd be better to reify the windows into the data, letting the windows be single-timestamp-windows, then after Reshuffle re-aligning the windows with what existed before (thinking about streaming in particular).
Is there a usecase for this other than Reshuffle? While I generally favor breaking changes up, it might make more sense to review this in the context where it's used.
super(_WindowedValueMatcherDoFn, self).__init__() | ||
self.expected_elements = copy.copy(expected_elements) | ||
|
||
def process(self, element, timestamp=DoFn.TimestampParam, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not work if there are multiple bundles.
I would propose that WindowedValueMatcher instead be implemented on top of assert_that, where the windows and timestamps are reified. Similar to how reify_windows is used in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window_test.py . If we want to make this a full util, we should probably provide a similar API to assert_that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this code does work with multiple bundles. It "Verifies that all processed elements are in the list of expected
elements."
That being said, I could try to rewrite this to use assert_that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should clarify, it's use in WindowedValueMatcher is not correct in the presence of multiple bundles. E.g. suppose the expected values were [a, b] and the actual values were [a, a] split into two bundles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL. This is by no means complete, but I'd like to get some feedback from you Robert before I go further.
This implementation is based on https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
sdks/python/apache_beam/pvalue.py
Outdated
@@ -124,6 +124,11 @@ def windowing(self): | |||
self.producer.inputs) | |||
return self._windowing | |||
|
|||
# TODO(ehudm): Make this internal. | |||
@windowing.setter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, remove this setter.
super(IdentityWindowFn, self).__init__() | ||
if coder is None: | ||
raise ValueError('coder should not be None') | ||
self._coder = coder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this window_coder?
@@ -423,3 +431,115 @@ def expand(self, pcoll): | |||
self._batch_size_estimator)) | |||
else: | |||
return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) | |||
|
|||
|
|||
class IdentityWindowFn(NonMergingWindowFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private (with an underscore)?
def get_window_coder(self): | ||
return self._coder | ||
|
||
class TriggerForEveryElement(TriggerFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use AfterCount(1).
# TODO: are these typehints necessary? | ||
@typehints.with_input_types(typehints.KV[K, V]) | ||
@typehints.with_output_types(typehints.KV[K, V]) | ||
class Reshuffle(PTransform): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned, rename this ReshufflePerKey, and add a Reshuffle that appends then strips a random key (e.g. random.getrandbits(32))
|
||
# TODO: is it safe to reapply this value? | ||
windowing_saved = pcoll.windowing | ||
# TODO: add .with_input_types, .with_output_types to PTransforms below? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be able to infer.
""" | ||
|
||
def expand(self, pcoll): | ||
class ExpandIterableDoFn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A DoFn with nothing but a process method can be more simply implemented via beam.Map or beam.FlatMap.
) | ||
| 'GroupByKey' >> GroupByKey() | ||
| 'ExpandIterable' >> ParDo(ExpandIterableDoFn())) | ||
pcoll_intermediate.windowing = windowing_saved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd probably apply assign windowing as the very last thing rather than on this intermediate.
1. A matcher function taking as argument the actual value of a | ||
materialized PCollection. The matcher validates this actual value | ||
against expectations and raises BeamAssertException if they are not met. | ||
2. An instance of WindowedValueMatcher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than branching here an introducing a new WindowedValueMatcher, perhaps introduce a windowed_equal_to sibling to equal_to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of doing that, but how does assert_that know if matcher accepts values or TestWindowedValues?
@@ -46,6 +56,26 @@ class BeamAssertException(Exception): | |||
pass | |||
|
|||
|
|||
# Used for reifying timestamps and windows for assert_that matchers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get around the need for introducing this new class by pairing values with None internally on reification (or at least making it an internal implementation detail).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to use this class for user-implemented matchers (inheriting from WindowedValueMatcher). Am I wrong to assume that there are matchers defined outside this repo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally one should be able to use any callable, e.g. hamcrest matchers, rather than have to implement a windowed variant of each. I misspoke about having a windowed_equals_to, we should have a assert_that_windowed(pcoll, equal_to([WindowedValue(...), ...]))
, or assert_that(pcoll, equal_to([WindowedValue(...), ...], reify_windows=True)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
""" | ||
|
||
def expand(self, pcoll): | ||
class ExpandIterableDoFn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
A DoFn with nothing but a process method can be more simply implemented via beam.Map or beam.FlatMap.
Done.
@@ -423,3 +431,115 @@ def expand(self, pcoll): | |||
self._batch_size_estimator)) | |||
else: | |||
return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) | |||
|
|||
|
|||
class IdentityWindowFn(NonMergingWindowFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Make this private (with an underscore)?
Done.
@@ -46,6 +56,26 @@ class BeamAssertException(Exception): | |||
pass | |||
|
|||
|
|||
# Used for reifying timestamps and windows for assert_that matchers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Ideally one should be able to use any callable, e.g. hamcrest matchers, rather than have to implement a windowed variant of each. I misspoke about having a windowed_equals_to, we should have aassert_that_windowed(pcoll, equal_to([WindowedValue(...), ...]))
, orassert_that(pcoll, equal_to([WindowedValue(...), ...], reify_windows=True)
.
Done.
super(IdentityWindowFn, self).__init__() | ||
if coder is None: | ||
raise ValueError('coder should not be None') | ||
self._coder = coder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Rename this window_coder?
Done.
sdks/python/apache_beam/pvalue.py
Outdated
@@ -124,6 +124,11 @@ def windowing(self): | |||
self.producer.inputs) | |||
return self._windowing | |||
|
|||
# TODO(ehudm): Make this internal. | |||
@windowing.setter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
As discussed, remove this setter.
Done.
1. A matcher function taking as argument the actual value of a | ||
materialized PCollection. The matcher validates this actual value | ||
against expectations and raises BeamAssertException if they are not met. | ||
2. An instance of WindowedValueMatcher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
udim wrote:
I thought of doing that, but how does assert_that know if matcher accepts values or TestWindowedValues?
Acknowledged.
def get_window_coder(self): | ||
return self._coder | ||
|
||
class TriggerForEveryElement(TriggerFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Just use AfterCount(1).
Done.
|
||
# TODO: is it safe to reapply this value? | ||
windowing_saved = pcoll.windowing | ||
# TODO: add .with_input_types, .with_output_types to PTransforms below? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
It should be able to infer.
Done.
# TODO: are these typehints necessary? | ||
@typehints.with_input_types(typehints.KV[K, V]) | ||
@typehints.with_output_types(typehints.KV[K, V]) | ||
class Reshuffle(PTransform): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
As mentioned, rename this ReshufflePerKey, and add a Reshuffle that appends then strips a random key (e.g. random.getrandbits(32))
Done.
) | ||
| 'GroupByKey' >> GroupByKey() | ||
| 'ExpandIterable' >> ParDo(ExpandIterableDoFn())) | ||
pcoll_intermediate.windowing = windowing_saved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Nit: I'd probably apply assign windowing as the very last thing rather than on this intermediate.
Done.
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, just a couple more comments.
def test_assert_that_fails(self): | ||
with self.assertRaises(Exception): | ||
with self.assertRaises(BeamAssertException): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all runners preserve exceptions from remote workers. Please change back. (Same elsewhere.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:/ I changed this because my code had a bug that raised another exception.
""" | ||
|
||
def expand(self, pcoll): | ||
class ReifyTimestampsIn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reify means to make explicit. Should just be ReifyTimestamps.
def process(self, element, timestamp=DoFn.TimestampParam): | ||
if (isinstance(timestamp, type(DoFn.TimestampParam)) and | ||
timestamp == DoFn.TimestampParam): | ||
raise ValueError('timestamp was unset for element: %r' % element) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't have to check for this.
|
||
class ReifyTimestampsExtract(DoFn): | ||
def process(self, element, window=DoFn.WindowParam): | ||
# Return a WindowedValue so that IdentityWindowFn can reuse the window |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not following this comment...
Shouldn't things already be in the correct window? We just need to emit a TimestampedValue here.
raise ValueError('timestamp was unset for element: %r' % element) | ||
yield element[0], TimestampedValue(element[1], timestamp) | ||
|
||
class ReifyTimestampsExtract(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ApplyReifiedTimestamps?
|
||
windowing_saved = pcoll.windowing | ||
result = (pcoll | ||
| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these explicit stage names are redundant with what the default would already be.
return (pcoll | ||
| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t)) | ||
| ReshufflePerKey() | ||
| 'RemoveTempKeys' >> Map(lambda t: t[1])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RemoveRandomKeys (for symmetry).
Two flavors of Reshuffle: ReshufflePerKey operates on key-value pairs, while Reshuffle adds a random key to each element (key-value or other). Add _IdentityWindowFn, for internal use in Reshuffle. Add and pass current window to WindowFn.AssignContext, for IdentityWindowFn implementation. testing/util.py: - Extend assert_that with reify_windows keyword, allowing verification of timestamp values and windowing functions. - Add contains_in_any_order matcher.
|
||
class ReifyTimestampsExtract(DoFn): | ||
def process(self, element, window=DoFn.WindowParam): | ||
# Return a WindowedValue so that IdentityWindowFn can reuse the window |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
I'm not following this comment...Shouldn't things already be in the correct window? We just need to emit a TimestampedValue here.
Passing a TimestampedValue would make the output processor for DoFn call the windowing function.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L451
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Maybe call this RestoreTimestamps or RestoreWindows?
|
||
windowing_saved = pcoll.windowing | ||
result = (pcoll | ||
| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Most of these explicit stage names are redundant with what the default would already be.
Done.
return (pcoll | ||
| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t)) | ||
| ReshufflePerKey() | ||
| 'RemoveTempKeys' >> Map(lambda t: t[1])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
RemoveRandomKeys (for symmetry).
Done.
raise ValueError('timestamp was unset for element: %r' % element) | ||
yield element[0], TimestampedValue(element[1], timestamp) | ||
|
||
class ReifyTimestampsExtract(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
ApplyReifiedTimestamps?
Done.
""" | ||
|
||
def expand(self, pcoll): | ||
class ReifyTimestampsIn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Reify means to make explicit. Should just be ReifyTimestamps.
Done.
def process(self, element, timestamp=DoFn.TimestampParam): | ||
if (isinstance(timestamp, type(DoFn.TimestampParam)) and | ||
timestamp == DoFn.TimestampParam): | ||
raise ValueError('timestamp was unset for element: %r' % element) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
We shouldn't have to check for this.
Done.
Looks good. Jenkins is complaining. Jenkins: retest this please. |
|
||
class ReifyTimestampsExtract(DoFn): | ||
def process(self, element, window=DoFn.WindowParam): | ||
# Return a WindowedValue so that IdentityWindowFn can reuse the window |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
robertwb wrote:
Good point. Maybe call this RestoreTimestamps or RestoreWindows?
Done.
LGTM, thanks. |
* Implement Reshuffle for Python SDK. Two flavors of Reshuffle: ReshufflePerKey operates on key-value pairs, while Reshuffle adds a random key to each element (key-value or other). Add _IdentityWindowFn, for internal use in Reshuffle. Add and pass current window to WindowFn.AssignContext, for IdentityWindowFn implementation. testing/util.py: - Extend assert_that with reify_windows keyword, allowing verification of timestamp values and windowing functions. - Add contains_in_any_order matcher.
BEAM-1872 IdentityWindowFn is intended for internal use in a future
implementation of Reshuffle.
Add and pass current window to WindowFn.AssignContext, for
IdentityWindowFn implementation.
Add WindowedValueMatcher in testing/util.py, for use in IdentityWindowFn
unit tests.
Hi @robertwb. Can you please take a look?