From 21f1bbd3efeaa0ada3859791bac808ef3264cdc8 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Mon, 3 Apr 2017 10:01:45 -0700 Subject: [PATCH 1/4] Create --- sdks/python/apache_beam/io/create.py | 58 ++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 sdks/python/apache_beam/io/create.py diff --git a/sdks/python/apache_beam/io/create.py b/sdks/python/apache_beam/io/create.py new file mode 100644 index 000000000000..776291c27886 --- /dev/null +++ b/sdks/python/apache_beam/io/create.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A transform that creates a PCollection from an iterable.""" + +from apache_beam import PTransform, Read +from apache_beam.io import iobase + + +class Create(PTransform): + def __init__(self, value): + """Initializes a Create transform. + + Args: + value: An object of values for the PCollection + """ + super(Create, self).__init__() + if isinstance(value, basestring): + raise TypeError('PTransform Create: Refusing to treat string as ' + 'an iterable. (string=%r)' % value) + elif isinstance(value, dict): + value = value.items() + self.value = tuple(value) + + + def expand(self, pvalue): + return pvalue.pipeline | Read(self._source) + + +class _CreateSource(iobase.BoundedSource): + def __init__(self, values): + self._values = values + + def read(self, range_tracker): + pass + + def split(self, desired_bundle_size, start_position=None, stop_position=None): + pass + + def get_range_tracker(self, start_position, stop_position): + pass + + def estimate_size(self): + pass \ No newline at end of file From 2b6c29fcdd0f499e80fe03b167e06090e521fc9f Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Tue, 4 Apr 2017 10:31:41 -0700 Subject: [PATCH 2/4] Create as custom source --- sdks/python/apache_beam/internal/pickler.py | 13 ++-- sdks/python/apache_beam/io/create.py | 58 --------------- sdks/python/apache_beam/pipeline.py | 3 +- .../runners/dataflow/dataflow_runner.py | 23 ------ .../runners/direct/transform_evaluator.py | 31 -------- sdks/python/apache_beam/transforms/core.py | 73 ++++++++++++++++++- 6 files changed, 82 insertions(+), 119 deletions(-) delete mode 100644 sdks/python/apache_beam/io/create.py diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 67f9fc33f236..4fb3ed2aae18 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -181,12 +181,15 @@ def new_log_info(msg, *args, **kwargs): # TODO(ccy): Currently, there are still instances of pickler.dumps() and # pickler.loads() being used for data, which results in an unnecessary base64 # encoding. This should be cleaned up. -def dumps(o): +def dumps(o, enable_trace=True): try: s = dill.dumps(o) - except Exception: # pylint: disable=broad-except - dill.dill._trace(True) # pylint: disable=protected-access - s = dill.dumps(o) + except Exception as e: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + s = dill.dumps(o) + else: + raise e finally: dill.dill._trace(False) # pylint: disable=protected-access @@ -207,7 +210,7 @@ def loads(encoded): try: return dill.loads(s) - except Exception: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except dill.dill._trace(True) # pylint: disable=protected-access return dill.loads(s) finally: diff --git a/sdks/python/apache_beam/io/create.py b/sdks/python/apache_beam/io/create.py deleted file mode 100644 index 776291c27886..000000000000 --- a/sdks/python/apache_beam/io/create.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A transform that creates a PCollection from an iterable.""" - -from apache_beam import PTransform, Read -from apache_beam.io import iobase - - -class Create(PTransform): - def __init__(self, value): - """Initializes a Create transform. - - Args: - value: An object of values for the PCollection - """ - super(Create, self).__init__() - if isinstance(value, basestring): - raise TypeError('PTransform Create: Refusing to treat string as ' - 'an iterable. (string=%r)' % value) - elif isinstance(value, dict): - value = value.items() - self.value = tuple(value) - - - def expand(self, pvalue): - return pvalue.pipeline | Read(self._source) - - -class _CreateSource(iobase.BoundedSource): - def __init__(self, values): - self._values = values - - def read(self, range_tracker): - pass - - def split(self, desired_bundle_size, start_position=None, stop_position=None): - pass - - def get_range_tracker(self, start_position, stop_position): - pass - - def estimate_size(self): - pass \ No newline at end of file diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 3c416eb9d577..519d61a8f018 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -315,7 +315,8 @@ def visit_transform(self, transform_node): Visitor.ok = False try: # Transforms must be picklable. - pickler.loads(pickler.dumps(transform_node.transform)) + pickler.loads(pickler.dumps(transform_node.transform, + enable_trace=False)) except Exception: Visitor.ok = False self.visit(Visitor()) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index fe9f8c0d3f7a..13cb820d0d5b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -261,29 +261,6 @@ def _add_step(self, step_kind, step_label, transform_node, side_tags=()): return step - def run_Create(self, transform_node): - transform = transform_node.transform - step = self._add_step(TransformNames.CREATE_PCOLLECTION, - transform_node.full_label, transform_node) - # TODO(silviuc): Eventually use a coder based on typecoders. - # Note that we base64-encode values here so that the service will accept - # the values. - element_coder = coders.PickleCoder() - step.add_property( - PropertyNames.ELEMENT, - [base64.b64encode(element_coder.encode(v)) - for v in transform.value]) - # The service expects a WindowedValueCoder here, so we wrap the actual - # encoding in a WindowedValueCoder. - step.encoding = self._get_cloud_encoding( - coders.WindowedValueCoder(element_coder)) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def _add_singleton_step(self, label, full_label, tag, input_step): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 0c35d992ee1b..662c61dab93a 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -49,7 +49,6 @@ def __init__(self, evaluation_context): self._evaluation_context = evaluation_context self._evaluators = { io.Read: _BoundedReadEvaluator, - core.Create: _CreateEvaluator, core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator, @@ -233,36 +232,6 @@ def finish_bundle(self): self._applied_ptransform, bundles, None, None, None, None) -class _CreateEvaluator(_TransformEvaluator): - """TransformEvaluator for Create transform.""" - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not input_committed_bundle - assert not side_inputs - super(_CreateEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - def start_bundle(self): - assert len(self._outputs) == 1 - output_pcollection = list(self._outputs)[0] - self.bundle = self._evaluation_context.create_bundle(output_pcollection) - - def finish_bundle(self): - bundles = [] - transform = self._applied_ptransform.transform - - assert transform.value is not None - create_result = [GlobalWindows.windowed_value(v) for v in transform.value] - for result in create_result: - self.bundle.output(result) - bundles.append(self.bundle) - - return TransformResult( - self._applied_ptransform, bundles, None, None, None, None) - - class _TaggedReceivers(dict): """Received ParDo output and redirect to the associated output bundle.""" diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 88fdec846313..5cfb34fef81e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1373,11 +1373,82 @@ def infer_output_type(self, unused_input_type): def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline - return pvalue.PCollection(self.pipeline) + coder = typecoders.registry.get_coder(self.infer_output_type(None)) + source = self._create_source(self.value, coder) + return pbegin.pipeline | Read(source) def get_windowing(self, unused_inputs): return Windowing(GlobalWindows()) + def _create_source(self, values, coder): + from apache_beam import io + + class _CreateSource(io.iobase.BoundedSource): + def __init__(self, values, coder, is_serialized=False): + self._coder = coder + self._serialized_values = [] + self._total_size = 0 + if is_serialized: + self._serialized_values = values + else: + for value in values: + serialized_value = self._coder.encode(value) + self._serialized_values.append(serialized_value) + + for serialized_value in self._serialized_values: + self._total_size += len(serialized_value) + + def read(self, range_tracker): + start_position = range_tracker.start_position() + element_iter = iter(self._serialized_values[start_position:]) + for i in range(start_position, range_tracker.stop_position()): + if not range_tracker.try_claim(i): + return + yield self._coder.decode(next(element_iter)) + + def split(self, desired_bundle_size, start_position=None, stop_position=None): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = len(self._serialized_values) + + avg_size_per_element = self._total_size / len(self._serialized_values) + num_elements_per_split = desired_bundle_size / avg_size_per_element + + start = start_position + while start < stop_position: + end = min(start + num_elements_per_split, stop_position) + remaining = stop_position - end + # Avoid having a too small bundle at the end. + if remaining < (desired_bundle_size / 4): + end = stop_position + + sub_source = _CreateSource(self._serialized_values[start:end], + self._coder, + is_serialized=True) + + from apache_beam import io + yield io.iobase.SourceBundle(weight=(end - start), + source=sub_source, + start_position=start, + stop_position=end) + + start = end + + def get_range_tracker(self, start_position, stop_position): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = len(self._serialized_values) + + from apache_beam import io + return io.OffsetRangeTracker(start_position, stop_position) + + def estimate_size(self): + return self._total_size + + return _CreateSource(values, coder) + def Read(*args, **kwargs): from apache_beam import io From 35fc478d2fdf524e463cb256c6ac515806f15c7b Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Tue, 4 Apr 2017 13:10:35 -0700 Subject: [PATCH 3/4] dill trace enable/disable --- sdks/python/apache_beam/internal/pickler.py | 9 ++++++--- sdks/python/apache_beam/pipeline.py | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 4fb3ed2aae18..a4ab7b9c97ee 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -202,7 +202,7 @@ def dumps(o, enable_trace=True): return base64.b64encode(c) -def loads(encoded): +def loads(encoded, enable_trace=True): c = base64.b64decode(encoded) s = zlib.decompress(c) @@ -211,8 +211,11 @@ def loads(encoded): try: return dill.loads(s) except Exception as e: # pylint: disable=broad-except - dill.dill._trace(True) # pylint: disable=protected-access - return dill.loads(s) + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + return dill.loads(s) + else: + raise e finally: dill.dill._trace(False) # pylint: disable=protected-access diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 519d61a8f018..d113931b12ea 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -277,6 +277,7 @@ def apply(self, transform, pvalueish=None, label=None): else typehints.Any) type_hints = transform.get_type_hints() declared_output_type = type_hints.simple_output_type(transform.label) + print "Resul: %s" % result if declared_output_type: input_types = type_hints.input_types if input_types and input_types[0]: @@ -290,6 +291,7 @@ def apply(self, transform, pvalueish=None, label=None): else: result.element_type = transform.infer_output_type(input_element_type) + print "Result element type: %s" % result.element_type assert isinstance(result.producer.inputs, tuple) current.add_output(result) @@ -316,7 +318,8 @@ def visit_transform(self, transform_node): try: # Transforms must be picklable. pickler.loads(pickler.dumps(transform_node.transform, - enable_trace=False)) + enable_trace=False), + enable_trace=False) except Exception: Visitor.ok = False self.visit(Visitor()) From 6d5f336753e1816aba5875277f50eca524b28899 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Tue, 4 Apr 2017 14:11:12 -0700 Subject: [PATCH 4/4] Fixes --- sdks/python/apache_beam/pipeline.py | 2 -- sdks/python/apache_beam/pipeline_test.py | 6 +++--- sdks/python/apache_beam/runners/direct/direct_runner.py | 4 ++++ sdks/python/apache_beam/transforms/core.py | 4 ++-- sdks/python/apache_beam/transforms/ptransform.py | 1 + 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index d113931b12ea..191a532ee8e4 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -277,7 +277,6 @@ def apply(self, transform, pvalueish=None, label=None): else typehints.Any) type_hints = transform.get_type_hints() declared_output_type = type_hints.simple_output_type(transform.label) - print "Resul: %s" % result if declared_output_type: input_types = type_hints.input_types if input_types and input_types[0]: @@ -291,7 +290,6 @@ def apply(self, transform, pvalueish=None, label=None): else: result.element_type = transform.infer_output_type(input_element_type) - print "Result element type: %s" % result.element_type assert isinstance(result.producer.inputs, tuple) current.add_output(result) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index ba219bfda125..a831e4c13e7c 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -173,9 +173,9 @@ def test_visit_entire_graph(self): set(visitor.visited)) self.assertEqual(set(visitor.enter_composite), set(visitor.leave_composite)) - self.assertEqual(2, len(visitor.enter_composite)) - self.assertEqual(visitor.enter_composite[1].transform, transform) - self.assertEqual(visitor.leave_composite[0].transform, transform) + self.assertEqual(3, len(visitor.enter_composite)) + self.assertEqual(visitor.enter_composite[2].transform, transform) + self.assertEqual(visitor.leave_composite[1].transform, transform) def test_apply_custom_transform(self): pipeline = TestPipeline() diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index efad2e0c7087..fd3aab67de32 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -93,6 +93,7 @@ def run(self, pipeline): # We are running in eager mode, block until the pipeline execution # completes in order to have full results in the cache. result.wait_until_finish() + print "finishing" self._cache.finalize() return result @@ -124,12 +125,15 @@ def pvalue_cache(self): return self._pvalue_cache def append(self, applied_ptransform, tag, elements): + print "append" assert not self._finalized assert elements is not None + assert self._cache is not None self._cache[(applied_ptransform, tag)].extend(elements) def finalize(self): """Make buffered cache elements visible to the underlying PValueCache.""" + print "finalize" assert not self._finalized for key, value in self._cache.iteritems(): applied_ptransform, tag = key diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 5cfb34fef81e..2abb3e25864e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1088,7 +1088,6 @@ def expand(self, pcoll): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. input_type = pcoll.element_type - if input_type is not None: # Initialize type-hints used below to enforce type-checking and to pass # downstream to further PTransforms. @@ -1375,7 +1374,8 @@ def expand(self, pbegin): self.pipeline = pbegin.pipeline coder = typecoders.registry.get_coder(self.infer_output_type(None)) source = self._create_source(self.value, coder) - return pbegin.pipeline | Read(source) + return (pbegin.pipeline + | Read(source).with_output_types(self.infer_output_type(None))) def get_windowing(self, unused_inputs): return Windowing(GlobalWindows()) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 93d751d47131..c4da7c064d2d 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -375,6 +375,7 @@ def __ror__(self, left, label=None): replacements = {id(v): p | 'CreatePInput%s' % ix >> Create(v) for ix, v in enumerate(pvalues) if not isinstance(v, pvalue.PValue) and v is not None} + pvalueish = _SetInputPValues().visit(pvalueish, replacements) self.pipeline = p result = p.apply(self, pvalueish, label)