From 843f3f5a7ad7cf5297b3078aa9e929efcdc7092d Mon Sep 17 00:00:00 2001 From: Charles Chen Date: Thu, 15 Jun 2017 15:27:18 -0700 Subject: [PATCH] Populate PBegin input when decoding from Runner API --- sdks/python/apache_beam/pipeline.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index ab77956a0c1a..d84a2b7b59cc 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -515,7 +515,18 @@ def from_runner_api(proto, runner, options): p.applied_labels = set([ t.unique_name for t in proto.components.transforms.values()]) for id in proto.components.pcollections: - context.pcollections.get_by_id(id).pipeline = p + pcollection = context.pcollections.get_by_id(id) + pcollection.pipeline = p + + # Inject PBegin input where necessary. + from apache_beam.io.iobase import Read + from apache_beam.transforms.core import Create + has_pbegin = [Read, Create] + for id in proto.components.transforms: + transform = context.transforms.get_by_id(id) + if not transform.inputs and transform.transform.__class__ in has_pbegin: + transform.inputs = (pvalue.PBegin(p),) + return p