From 3aa157a9be4f5bf8e390fcb3acf37d8bdb250954 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 31 Mar 2017 14:57:44 -0700 Subject: [PATCH] Fix side inputs on dataflow runner. --- .../apache_beam/runners/dataflow/dataflow_runner.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index db433df5eb0c..fe9f8c0d3f7a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -287,7 +287,7 @@ def run_Create(self, transform_node): def _add_singleton_step(self, label, full_label, tag, input_step): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. - from google.cloud.dataflow.internal import apiclient + from apache_beam.runners.dataflow.internal import apiclient step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label) self.job.proto.steps.append(step.proto) step.add_property(PropertyNames.USER_NAME, full_label) @@ -302,7 +302,7 @@ def _add_singleton_step(self, label, full_label, tag, input_step): [{PropertyNames.USER_NAME: ( '%s.%s' % (full_label, PropertyNames.OUTPUT)), PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}]) + PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) return step def run_Flatten(self, transform_node): @@ -374,12 +374,10 @@ def run_ParDo(self, transform_node): si_dict = {} # We must call self._cache.get_pvalue exactly once due to refcounting. si_labels = {} - for side_pval in transform_node.side_inputs: - si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name lookup_label = lambda side_pval: si_labels[side_pval] for side_pval in transform_node.side_inputs: assert isinstance(side_pval, AsSideInput) - si_label = self._get_unique_step_name() + si_label = 'SideInput-' + self._get_unique_step_name() si_full_label = '%s/%s' % (transform_node.full_label, si_label) self._add_singleton_step( si_label, si_full_label, side_pval.pvalue.tag, @@ -388,10 +386,13 @@ def run_ParDo(self, transform_node): '@type': 'OutputReference', PropertyNames.STEP_NAME: si_label, PropertyNames.OUTPUT_NAME: PropertyNames.OUT} + si_labels[side_pval] = si_label # Now create the step for the ParDo transform being handled. step = self._add_step( - TransformNames.DO, transform_node.full_label, transform_node, + TransformNames.DO, + transform_node.full_label + '/Do' if transform_node.side_inputs else '', + transform_node, transform_node.transform.side_output_tags) fn_data = self._pardo_fn_data(transform_node, lookup_label) step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))