Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3759] Add support for PaneInfo in WindowedValues #4763

Merged
merged 1 commit into from Mar 19, 2018

Conversation

charlesccychen
Copy link
Contributor

This change allows triggering information to be stored along with WindowedValues.

@charlesccychen
Copy link
Contributor Author

R: @robertwb

"""

def __init__(self, value, timestamp, windows):
def __init__(self, value, timestamp, windows, pane_info=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robert, can you comment on whether this extra kwarg is suitable, performance-wise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine, as long as it's not usually passed as a kwarg. (Also, most of the time we create these with .with_value which is more optimized.)

@@ -670,6 +670,85 @@ def _construct_from_sequence(self, components):
return components


# A PaneInfo descriptor can be encoded in three different ways: (1) with a
# single byte (PANE_INFO_ENCODING_FIRST), (2) with a single byte followed by
# a varint describing the a single index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and (3)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps just remove this, as the comment below is sufficient.

@@ -104,11 +211,12 @@ def __reduce__(self):


# TODO(robertwb): Move this to a static method.
def create(value, timestamp_micros, windows):
def create(value, timestamp_micros, windows, pane_info=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just let the default value be PANE_INFO_UNKNOWN.

@@ -90,7 +196,8 @@ def __cmp__(left, right): # pylint: disable=no-self-argument
def _typed_eq(left, right):
return (left.timestamp_micros == right.timestamp_micros
and left.value == right.value
and left.windows == right.windows)
and left.windows == right.windows
and left.pane_info == right.pane_info)

def with_value(self, new_value):
"""Creates a new WindowedValue with the same timestamps and windows as this.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix with_value to propagate the pane info (and test).

@@ -104,11 +211,12 @@ def __reduce__(self):

Copy link
Contributor

@robertwb robertwb Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix __reduce__ to rememeber the PaneInfo (and test).



def _construct_pane_info_map():
result = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a list, not a map.


def _create_impl(self):
return coder_impl.WindowedValueCoderImpl(
self.wrapped_value_coder.get_impl(),
self.timestamp_coder.get_impl(),
self.window_coder.get_impl())
self.window_coder.get_impl(),
self.pane_info_coder.get_impl())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than introducing a PaneInfoCoder, just instantiate the PaneInfoCoderImpl in WindowedValueCoderImpl's constructor. (There's a TODO to remove the parameterizablility of the TimestampCoder as well.) Actually, we could consider just making {en,de}code_pane_info methods on WindowedValueCoder itself.

@@ -734,13 +814,14 @@ def decode_from_stream(self, in_stream, nested):
# Read PaneInfo encoded byte.
# TODO(BEAM-1522): Ignored for now but should be converted to pane info once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can now remove this TODO.

@aaltay
Copy link
Member

aaltay commented Mar 17, 2018

@charlesccychen any updates on this?

@charlesccychen charlesccychen force-pushed the paneinfo-part1 branch 4 times, most recently from 077b2dd to 79e421b Compare March 19, 2018 06:36
This change allows triggering information to be stored along with
WindowedValues.
Copy link
Contributor Author

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, PTAL.

@@ -90,7 +196,8 @@ def __cmp__(left, right): # pylint: disable=no-self-argument
def _typed_eq(left, right):
return (left.timestamp_micros == right.timestamp_micros
and left.value == right.value
and left.windows == right.windows)
and left.windows == right.windows
and left.pane_info == right.pane_info)

def with_value(self, new_value):
"""Creates a new WindowedValue with the same timestamps and windows as this.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Fix with_value to propagate the pane info (and test).

Done.

@@ -30,8 +30,6 @@

from types import NoneType

import six

from apache_beam.coders import observable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Or perhaps just remove this, as the comment below is sufficient.

Done.

def __hash__(self):
return hash(type(self))


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Rather than introducing a PaneInfoCoder, just instantiate the PaneInfoCoderImpl in WindowedValueCoderImpl's constructor. (There's a TODO to remove the parameterizablility of the TimestampCoder as well.) Actually, we could consider just making {en,de}code_pane_info methods on WindowedValueCoder itself.

I am going to get rid of PaneInfoCoder but keep the PaneInfoCoderImpl, because it's more natural to keep it separate, as it would clutter the WindowedValueCoderImpl with encode/decode_paneinfo_to_stream and estimate_paneinfo_size.

@@ -689,11 +766,13 @@ def _from_normal_time(self, value):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
You can now remove this TODO.

Done.

@@ -104,11 +211,12 @@ def __reduce__(self):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Fix __reduce__ to rememeber the PaneInfo (and test).

Done.

@@ -104,11 +211,12 @@ def __reduce__(self):


# TODO(robertwb): Move this to a static method.
def create(value, timestamp_micros, windows):
def create(value, timestamp_micros, windows, pane_info=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Just let the default value be PANE_INFO_UNKNOWN.

Done.



def _construct_pane_info_map():
result = {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Make this a list, not a map.

Done.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks. Just to be sure, could you run the benchmarks at #4741 ?

@charlesccychen
Copy link
Contributor Author

Thanks. The benchmarks seem very comparable.

Without this change (at HEAD^):

[(1, 1.0581729412078857)]
[(1, 1.0581729412078857), (1001, 1.0961029529571533)]
[(1, 1.0581729412078857), (1001, 1.0961029529571533), (2001, 1.304905891418457)]
[(1, 1.0581729412078857), (1001, 1.0961029529571533), (2001, 1.304905891418457), (3001, 1.3804380893707275)]
[(1, 1.0581729412078857), (1001, 1.0961029529571533), (2001, 1.304905891418457), (3001, 1.3804380893707275), (4001, 1.6390268802642822)]
Fixed cost  1.00637614384
Per-element 1.44604301453e-06
R^2         0.94447668637

With this change:

[(1, 1.0614569187164307)]
[(1, 1.0614569187164307), (1001, 1.1248741149902344)]
[(1, 1.0614569187164307), (1001, 1.1248741149902344), (2001, 1.315816879272461)]
[(1, 1.0614569187164307), (1001, 1.1248741149902344), (2001, 1.315816879272461), (3001, 1.3426880836486816)]
[(1, 1.0614569187164307), (1001, 1.1248741149902344), (2001, 1.315816879272461), (3001, 1.3426880836486816), (4001, 1.6559441089630127)]
Fixed cost  1.01865767245
Per-element 1.40678834915e-06
R^2         0.914786773244

@robertwb
Copy link
Contributor

Totally within the margin of error. Thanks.

@robertwb robertwb merged commit 8139899 into apache:master Mar 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants