Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3759] Add support for PaneInfo in WindowedValues #4763

Merged
merged 1 commit into from Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.pxd
Expand Up @@ -132,11 +132,16 @@ cdef class IterableCoderImpl(SequenceCoderImpl):
pass


cdef class PaneInfoCoderImpl(StreamCoderImpl):
cdef int _choose_encoding(self, value)


cdef class WindowedValueCoderImpl(StreamCoderImpl):
"""A coder for windowed values."""
cdef CoderImpl _value_coder
cdef CoderImpl _timestamp_coder
cdef CoderImpl _windows_coder
cdef CoderImpl _pane_info_coder

@cython.locals(c=CoderImpl)
cpdef get_estimated_size_and_observables(self, value, bint nested=?)
Expand Down
91 changes: 83 additions & 8 deletions sdks/python/apache_beam/coders/coder_impl.py
Expand Up @@ -672,6 +672,82 @@ def _construct_from_sequence(self, components):
return components


class PaneInfoEncoding(object):
"""For internal use only; no backwards-compatibility guarantees.

Encoding used to describe a PaneInfo descriptor. A PaneInfo descriptor
can be encoded in three different ways: with a single byte (FIRST), with a
single byte followed by a varint describing a single index (ONE_INDEX) or
with a single byte followed by two varints describing two separate indices:
the index and nonspeculative index.
"""

FIRST = 0
ONE_INDEX = 1
TWO_INDICES = 2


class PaneInfoCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Coder for a PaneInfo descriptor."""

def _choose_encoding(self, value):
if ((value.index == 0 and value.nonspeculative_index == 0) or
value.timing == windowed_value.PaneInfoTiming.UNKNOWN):
return PaneInfoEncoding.FIRST
elif (value.index == value.nonspeculative_index or
value.timing == windowed_value.PaneInfoTiming.EARLY):
return PaneInfoEncoding.ONE_INDEX
else:
return PaneInfoEncoding.TWO_INDICES

def encode_to_stream(self, value, out, nested):
encoding_type = self._choose_encoding(value)
out.write_byte(value.encoded_byte | (encoding_type << 4))
if encoding_type == PaneInfoEncoding.FIRST:
return
elif encoding_type == PaneInfoEncoding.ONE_INDEX:
out.write_var_int64(value.index)
elif encoding_type == PaneInfoEncoding.TWO_INDICES:
out.write_var_int64(value.index)
out.write_var_int64(value.nonspeculative_index)
else:
raise NotImplementedError('Invalid PaneInfoEncoding: %s' % encoding_type)

def decode_from_stream(self, in_stream, nested):
encoded_first_byte = in_stream.read_byte()
base = windowed_value._BYTE_TO_PANE_INFO[encoded_first_byte & 0xF]
assert base is not None
encoding_type = encoded_first_byte >> 4
if encoding_type == PaneInfoEncoding.FIRST:
return base
elif encoding_type == PaneInfoEncoding.ONE_INDEX:
index = in_stream.read_var_int64()
if base.timing == windowed_value.PaneInfoTiming.EARLY:
nonspeculative_index = -1
else:
nonspeculative_index = index
elif encoding_type == PaneInfoEncoding.TWO_INDICES:
index = in_stream.read_var_int64()
nonspeculative_index = in_stream.read_var_int64()
else:
raise NotImplementedError('Invalid PaneInfoEncoding: %s' % encoding_type)
return windowed_value.PaneInfo(
base.is_first, base.is_last, base.timing, index, nonspeculative_index)

def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
size = 1
encoding_type = self._choose_encoding(value)
if encoding_type == PaneInfoEncoding.ONE_INDEX:
size += get_varint_size(value.index)
elif encoding_type == PaneInfoEncoding.TWO_INDICES:
size += get_varint_size(value.index)
size += get_varint_size(value.nonspeculative_index)
return size


class WindowedValueCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Expand All @@ -694,6 +770,7 @@ def __init__(self, value_coder, timestamp_coder, window_coder):
self._value_coder = value_coder
self._timestamp_coder = timestamp_coder
self._windows_coder = TupleSequenceCoderImpl(window_coder)
self._pane_info_coder = PaneInfoCoderImpl()

def encode_to_stream(self, value, out, nested):
wv = value # type cast
Expand All @@ -709,8 +786,7 @@ def encode_to_stream(self, value, out, nested):
restore_sign * (abs(wv.timestamp_micros) / 1000)))
self._windows_coder.encode_to_stream(wv.windows, out, True)
# Default PaneInfo encoded byte representing NO_FIRING.
# TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported.
out.write_byte(0xF)
self._pane_info_coder.encode_to_stream(wv.pane_info, out, True)
self._value_coder.encode_to_stream(wv.value, out, nested)

def decode_from_stream(self, in_stream, nested):
Expand All @@ -734,15 +810,14 @@ def decode_from_stream(self, in_stream, nested):

windows = self._windows_coder.decode_from_stream(in_stream, True)
# Read PaneInfo encoded byte.
# TODO(BEAM-1522): Ignored for now but should be converted to pane info once
# it is supported.
in_stream.read_byte()
pane_info = self._pane_info_coder.decode_from_stream(in_stream, True)
value = self._value_coder.decode_from_stream(in_stream, nested)
return windowed_value.create(
value,
# Avoid creation of Timestamp object.
timestamp,
windows)
windows,
pane_info)

def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
Expand All @@ -761,8 +836,8 @@ def get_estimated_size_and_observables(self, value, nested=False):
self._timestamp_coder.estimate_size(value.timestamp, nested=True))
estimated_size += (
self._windows_coder.estimate_size(value.windows, nested=True))
# for pane info
estimated_size += 1
estimated_size += (
self._pane_info_coder.estimate_size(value.pane_info, nested=True))
return estimated_size, observables


Expand Down
32 changes: 32 additions & 0 deletions sdks/python/apache_beam/coders/coders_test_common.py
Expand Up @@ -275,6 +275,38 @@ def iter_generator(count):
iterable_coder.decode(
iterable_coder.encode(iter_generator(count))))

def test_windowedvalue_coder_paneinfo(self):
coder = coders.WindowedValueCoder(coders.VarIntCoder(),
coders.GlobalWindowCoder())
test_paneinfo_values = [
windowed_value.PANE_INFO_UNKNOWN,
windowed_value.PaneInfo(
True, True, windowed_value.PaneInfoTiming.EARLY, 0, -1),
windowed_value.PaneInfo(
True, False, windowed_value.PaneInfoTiming.ON_TIME, 0, 0),
windowed_value.PaneInfo(
True, False, windowed_value.PaneInfoTiming.ON_TIME, 10, 0),
windowed_value.PaneInfo(
False, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 23),
windowed_value.PaneInfo(
False, True, windowed_value.PaneInfoTiming.ON_TIME, 12, 23),
windowed_value.PaneInfo(
False, False, windowed_value.PaneInfoTiming.LATE, 0, 123),]

test_values = [windowed_value.WindowedValue(123, 234, (GlobalWindow(),), p)
for p in test_paneinfo_values]

# Test unnested.
self.check_coder(coder, windowed_value.WindowedValue(
123, 234, (GlobalWindow(),), windowed_value.PANE_INFO_UNKNOWN))
for value in test_values:
self.check_coder(coder, value)

# Test nested.
for value1 in test_values:
for value2 in test_values:
self.check_coder(coders.TupleCoder((coder, coder)), (value1, value2))

def test_windowed_value_coder(self):
coder = coders.WindowedValueCoder(coders.VarIntCoder(),
coders.GlobalWindowCoder())
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/utils/windowed_value.pxd
Expand Up @@ -25,6 +25,7 @@ cdef type Timestamp
cdef class WindowedValue(object):
cdef public object value
cdef public object windows
cdef public object pane_info
cdef public int64_t timestamp_micros
cdef object timestamp_object

Expand All @@ -35,4 +36,4 @@ cdef class WindowedValue(object):

@cython.locals(wv=WindowedValue)
cpdef WindowedValue create(
object value, int64_t timestamp_micros, object windows)
object value, int64_t timestamp_micros, object windows, object pane_info=*)
133 changes: 125 additions & 8 deletions sdks/python/apache_beam/utils/windowed_value.py
Expand Up @@ -32,6 +32,111 @@
from apache_beam.utils.timestamp import Timestamp


class PaneInfoTiming(object):
"""The timing of a PaneInfo."""

EARLY = 0
ON_TIME = 1
LATE = 2
UNKNOWN = 3


class PaneInfo(object):
"""Describes the trigger firing information for a given WindowedValue."""

def __init__(self, is_first, is_last, timing, index, nonspeculative_index):
self._is_first = is_first
self._is_last = is_last
self._timing = timing
self._index = index
self._nonspeculative_index = nonspeculative_index
self._encoded_byte = self._get_encoded_byte()

def _get_encoded_byte(self):
byte = 0
if self.is_first:
byte |= 1
if self.is_last:
byte |= 2
byte |= self.timing << 2
return byte

@staticmethod
def from_encoded_byte(encoded_byte):
assert encoded_byte in _BYTE_TO_PANE_INFO
return _BYTE_TO_PANE_INFO[encoded_byte]

# Because common PaneInfo objects are cached, it is important that the value
# is immutable. We therefore explicitly enforce this here with read-only
# properties.

@property
def is_first(self):
return self._is_first

@property
def is_last(self):
return self._is_last

@property
def timing(self):
return self._timing

@property
def index(self):
return self._index

@property
def nonspeculative_index(self):
return self._nonspeculative_index

@property
def encoded_byte(self):
return self._encoded_byte

def __repr__(self):
return ('PaneInfo(first: %r, last: %r, timing: %s, index: %d, '
'nonspeculative_index: %d)') % (self.is_first, self.is_last,
self.timing, self.index,
self.nonspeculative_index)

def __eq__(self, other):
if self is other:
return True
return (self.is_first == other.is_first and
self.is_last == other.is_last and
self.timing == other.timing and
self.index == other.index and
self.nonspeculative_index == other.nonspeculative_index)

def __hash__(self):
return hash((self.is_first, self.is_last, self.timing, self.index,
self.nonspeculative_index))


def _construct_well_known_pane_infos():
pane_infos = []
for timing in (PaneInfoTiming.EARLY, PaneInfoTiming.ON_TIME,
PaneInfoTiming.LATE, PaneInfoTiming.UNKNOWN):
nonspeculative_index = -1 if timing == PaneInfoTiming.EARLY else 0
pane_infos.append(PaneInfo(True, True, timing, 0, nonspeculative_index))
pane_infos.append(PaneInfo(True, False, timing, 0, nonspeculative_index))
pane_infos.append(PaneInfo(False, True, timing, -1, nonspeculative_index))
pane_infos.append(PaneInfo(False, False, timing, -1, nonspeculative_index))
result = [None] * (max(p.encoded_byte for p in pane_infos) + 1)
for pane_info in pane_infos:
result[pane_info.encoded_byte] = pane_info
return result


# Cache of well-known PaneInfo objects.
_BYTE_TO_PANE_INFO = _construct_well_known_pane_infos()


# Default PaneInfo descriptor for when a value is not the output of triggering.
PANE_INFO_UNKNOWN = _BYTE_TO_PANE_INFO[0xF]


class WindowedValue(object):
"""A windowed value having a value, a timestamp and set of windows.

Expand All @@ -40,9 +145,12 @@ class WindowedValue(object):
timestamp: Timestamp associated with the value as seconds since Unix epoch.
windows: A set (iterable) of window objects for the value. The window
object are descendants of the BoundedWindow class.
pane_info: A PaneInfo descriptor describing the triggering information for
the pane that contained this value. If None, will be set to
PANE_INFO_UNKNOWN.
"""

def __init__(self, value, timestamp, windows):
def __init__(self, value, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
# For performance reasons, only timestamp_micros is stored by default
# (as a C int). The Timestamp object is created on demand below.
self.value = value
Expand All @@ -53,6 +161,7 @@ def __init__(self, value, timestamp, windows):
else Timestamp.of(timestamp))
self.timestamp_micros = self.timestamp_object.micros
self.windows = windows
self.pane_info = pane_info

@property
def timestamp(self):
Expand All @@ -61,15 +170,19 @@ def timestamp(self):
return self.timestamp_object

def __repr__(self):
return '(%s, %s, %s)' % (
return '(%s, %s, %s, %s)' % (
repr(self.value),
'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else
'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else
float(self.timestamp),
self.windows)
self.windows,
self.pane_info)

def __hash__(self):
return hash(self.value) + 3 * self.timestamp_micros + 7 * hash(self.windows)
return (hash(self.value) +
3 * self.timestamp_micros +
7 * hash(self.windows) +
11 * hash(self.pane_info))

# We'd rather implement __eq__, but Cython supports that via __richcmp__
# instead. Fortunately __cmp__ is understood by both (but not by Python 3).
Expand All @@ -90,25 +203,29 @@ def __cmp__(left, right): # pylint: disable=no-self-argument
def _typed_eq(left, right):
return (left.timestamp_micros == right.timestamp_micros
and left.value == right.value
and left.windows == right.windows)
and left.windows == right.windows
and left.pane_info == right.pane_info)

def with_value(self, new_value):
"""Creates a new WindowedValue with the same timestamps and windows as this.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix with_value to propagate the pane info (and test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Fix with_value to propagate the pane info (and test).

Done.


This is the fasted way to create a new WindowedValue.
"""
return create(new_value, self.timestamp_micros, self.windows)
return create(new_value, self.timestamp_micros, self.windows,
self.pane_info)

def __reduce__(self):
return WindowedValue, (self.value, self.timestamp, self.windows)
return WindowedValue, (self.value, self.timestamp, self.windows,
self.pane_info)

Copy link
Contributor

@robertwb robertwb Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix __reduce__ to rememeber the PaneInfo (and test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Fix __reduce__ to rememeber the PaneInfo (and test).

Done.


# TODO(robertwb): Move this to a static method.
def create(value, timestamp_micros, windows):
def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
wv = WindowedValue.__new__(WindowedValue)
wv.value = value
wv.timestamp_micros = timestamp_micros
wv.windows = windows
wv.pane_info = pane_info
return wv


Expand Down