From 75a7d6075e2c41f01123625cac0a2136eedbd943 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 26 Sep 2017 17:03:41 -0700 Subject: [PATCH 1/2] Better error for reused pcollections. --- sdks/python/apache_beam/transforms/ptransform.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 0b6d608b231f..bbc6706cc1f0 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -375,6 +375,12 @@ def __ror__(self, left, label=None): p = pipeline.Pipeline( 'DirectRunner', PipelineOptions(sys.argv)) else: + if any(getattr(p, 'is_ephemeral', False) for p in pipelines): + raise ValueError( + 'Re-using PCollection(s) from previous implicit run not allowed: %s' + % [v for v in pvalues + if isinstance(v, pvalue.PValue) + and getattr(v.pipeline, 'is_ephemeral', False)]) if not pipelines: if self.pipeline is not None: p = self.pipeline @@ -403,6 +409,7 @@ def __ror__(self, left, label=None): # clean it after run. cache = p.runner.cache p.run().wait_until_finish() + p.is_ephemeral = True return _MaterializePValues(cache).visit(result) def _extract_input_pvalues(self, pvalueish): From 92ab60d9d93bb22d1b900a50b89cade411877e5c Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 24 Jan 2018 17:45:21 -0800 Subject: [PATCH 2/2] Add test. --- sdks/python/apache_beam/pipeline_test.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 634483e41608..a5d04e7ec38e 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -306,6 +306,28 @@ def raise_exception(exn): # p = Pipeline('EagerRunner') # self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x)) + def test_pcollection_reuse_errors(self): + class MyObject(object): + def __init__(self, pcollection): + self.pcollection = pcollection + + class CreateMyObject(beam.PTransform): + def expand(self, data): + return MyObject(data) + + class TransformMyObject(beam.PTransform): + def _extract_input_pvalues(self, data_and_my_object): + data, my_object = data_and_my_object + return data_and_my_object, [data, my_object.pcollection] + + def expand(self, data_and_my_object): + return data_and_my_object + + def testErrorOnReuseOfPCollection(self): + input_data = [1, 2, 3, 4] + my_object = input_data | 'CreateMyObject' >> CreateMyObject() + _ = (input_data, my_object) | 'TransformMyObject' >> TransformMyObject() + @mock.patch( 'apache_beam.runners.direct.direct_runner._get_transform_overrides') def test_ptransform_overrides(self, file_system_override_mock):