From 4993e311733ec227a5bb2e95b902a95ceceb3b97 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 28 Oct 2016 13:44:06 -0700 Subject: [PATCH 1/2] Adds IterableCoder, fast coding for sets, booleans. --- sdks/python/apache_beam/coders/coder_impl.pxd | 8 ++++-- sdks/python/apache_beam/coders/coder_impl.py | 27 +++++++++++++++---- sdks/python/apache_beam/coders/coders.py | 23 ++++++++++++++++ .../apache_beam/coders/coders_test_common.py | 9 +++++++ 4 files changed, 60 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 2ca674321d8a..194cbb8b9b18 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -66,8 +66,8 @@ cdef class DeterministicPickleCoderImpl(CoderImpl): cdef object NoneType -cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE -cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE +cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE +cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE cdef class FastPrimitivesCoderImpl(StreamCoderImpl): cdef CoderImpl fallback_coder_impl @@ -125,6 +125,10 @@ cdef class TupleSequenceCoderImpl(SequenceCoderImpl): pass +cdef class IterableCoderImpl(SequenceCoderImpl): + pass + + cdef class WindowedValueCoderImpl(StreamCoderImpl): """A coder for windowed values.""" cdef CoderImpl _value_coder diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 0024fd85ce83..606d51cc7acc 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -202,9 +202,11 @@ def decode(self, encoded): FLOAT_TYPE = 2 STR_TYPE = 3 UNICODE_TYPE = 4 +BOOL_TYPE = 9 LIST_TYPE = 5 TUPLE_TYPE = 6 DICT_TYPE = 7 +SET_TYPE = 8 class FastPrimitivesCoderImpl(StreamCoderImpl): @@ -229,18 +231,22 @@ def encode_to_stream(self, value, stream, nested): unicode_value = value # for typing stream.write_byte(UNICODE_TYPE) stream.write(unicode_value.encode('utf-8'), nested) - elif t is list or t is tuple: - stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE) + elif t is list or t is tuple or t is set: + stream.write_byte( + LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE) stream.write_var_int64(len(value)) for e in value: self.encode_to_stream(e, stream, True) elif t is dict: dict_value = value # for typing stream.write_byte(DICT_TYPE) - stream.write_var_int64(len(value)) + stream.write_var_int64(len(dict_value)) for k, v in dict_value.iteritems(): self.encode_to_stream(k, stream, True) self.encode_to_stream(v, stream, True) + elif t is bool: + stream.write_byte(BOOL_TYPE) + stream.write_byte(value) else: stream.write_byte(UNKNOWN_TYPE) self.fallback_coder_impl.encode_to_stream(value, stream, nested) @@ -257,13 +263,15 @@ def decode_from_stream(self, stream, nested): return stream.read_all(nested) elif t == UNICODE_TYPE: return stream.read_all(nested).decode('utf-8') - elif t == LIST_TYPE or t == TUPLE_TYPE: + elif t == LIST_TYPE or t == TUPLE_TYPE or t == SET_TYPE: vlen = stream.read_var_int64() vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)] if t == LIST_TYPE: return vlist - else: + elif t == TUPLE_TYPE: return tuple(vlist) + else: + return set(vlist) elif t == DICT_TYPE: vlen = stream.read_var_int64() v = {} @@ -271,6 +279,8 @@ def decode_from_stream(self, stream, nested): k = self.decode_from_stream(stream, True) v[k] = self.decode_from_stream(stream, True) return v + elif t == BOOL_TYPE: + return not not stream.read_byte() else: return self.fallback_coder_impl.decode_from_stream(stream, nested) @@ -489,6 +499,13 @@ def _construct_from_sequence(self, components): return tuple(components) +class IterableCoderImpl(SequenceCoderImpl): + """A coder for homogeneous iterable objects.""" + + def _construct_from_sequence(self, components): + return components + + class WindowedValueCoderImpl(StreamCoderImpl): """A coder for windowed values.""" diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 11964b084422..da06db178f7e 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -520,6 +520,29 @@ def __repr__(self): return 'TupleSequenceCoder[%r]' % self._elem_coder +class IterableCoder(FastCoder): + """Coder of iterables of homogeneous objects.""" + + def __init__(self, elem_coder): + self._elem_coder = elem_coder + + def _create_impl(self): + return coder_impl.IterableCoderImpl(self._elem_coder.get_impl()) + + def is_deterministic(self): + return self._elem_coder.is_deterministic() + + @staticmethod + def from_type_hint(typehint, registry): + return IterableCoder(registry.get_coder(typehint.inner_type)) + + def _get_component_coders(self): + return (self._elem_coder,) + + def __repr__(self): + return 'IterableCoder[%r]' % self._elem_coder + + class WindowCoder(PickleCoder): """Coder for windows in windowed values.""" diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index dd4a8730b888..86a717f9aa7a 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -124,6 +124,8 @@ def test_fast_primitives_coder(self): self.check_coder(coder, (), (1, 2, 3)) self.check_coder(coder, [], [1, 2, 3]) self.check_coder(coder, dict(), {'a': 'b'}, {0: dict(), 1: len}) + self.check_coder(coder, set(), {'a', 'b'}) + self.check_coder(coder, True, False) self.check_coder(coder, len) self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,)) @@ -193,6 +195,13 @@ def test_base64_pickle_coder(self): def test_utf8_coder(self): self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0') + def test_iterable_coder(self): + self.check_coder(coders.IterableCoder(coders.VarIntCoder()), [1], [-1, 0, 100]) + self.check_coder( + coders.TupleCoder((coders.VarIntCoder(), + coders.IterableCoder(coders.VarIntCoder()))), + (1, [1, 2, 3])) + def test_nested_observables(self): class FakeObservableIterator(observable.ObservableMixin): From a05d97cb9a62c6b44b2754d469d4d87e9ec9396b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 28 Oct 2016 14:25:45 -0700 Subject: [PATCH 2/2] lint --- sdks/python/apache_beam/coders/coders_test_common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 86a717f9aa7a..008fa9f5217b 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -196,7 +196,8 @@ def test_utf8_coder(self): self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0') def test_iterable_coder(self): - self.check_coder(coders.IterableCoder(coders.VarIntCoder()), [1], [-1, 0, 100]) + self.check_coder(coders.IterableCoder(coders.VarIntCoder()), + [1], [-1, 0, 100]) self.check_coder( coders.TupleCoder((coders.VarIntCoder(), coders.IterableCoder(coders.VarIntCoder()))),