From 5b8e7685b5d5582e894b726ce0189c76199f0b7a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 7 Oct 2016 15:28:50 -0700 Subject: [PATCH 1/5] Dissallow (unimplemented) windowed side inputs. --- sdks/python/apache_beam/transforms/sideinputs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 6484a7cf3866b..6c698da5fe19d 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -50,6 +50,10 @@ def infer_output_type(self, input_type): return input_type def apply(self, pcoll): + if not pcoll.windowing.is_default(): + raise ValueError( + "Side inputs only supported for global windows, default triggering. " + "Found %s" % pcoll.windowing) return self.view From fa079b99fcf300dc210d21020f821a88b6142b79 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 7 Oct 2016 16:17:47 -0700 Subject: [PATCH 2/5] Fix tests. --- sdks/python/apache_beam/pipeline_test.py | 1 + sdks/python/apache_beam/transforms/util.py | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 8a0d2467dd2bd..fc1107abf1441 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -239,6 +239,7 @@ def create_dupes(o, _): 'oom:combine/GroupByKey/group_by_key': 1, ('oom:check', None): 1, 'assert_that/singleton': 1, + ('assert_that/WindowInto', None): 1, ('assert_that/Map(match)', None): 1, ('oom:combine/GroupByKey/group_by_window', None): 1, ('oom:combine/Combine/ParDo(CombineValuesDoFn)', None): 1}) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 4564cf92e6eaa..aeac0d94cbd6a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -21,6 +21,8 @@ from __future__ import absolute_import from apache_beam.pvalue import AsIter as AllOf +from apache_beam.transforms import core +from apache_beam.transforms import window from apache_beam.transforms.core import CombinePerKey, Create, Flatten, GroupByKey, Map from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import ptransform_fn @@ -220,8 +222,9 @@ def match(_, actual): class AssertThat(PTransform): def apply(self, pipeline): - return pipeline | 'singleton' >> Create([None]) | Map(match, - AllOf(actual)) + return pipeline | 'singleton' >> Create([None]) | Map( + match, + AllOf(actual | core.WindowInto(window.GlobalWindows()))) def default_label(self): return label From e10587b162944475290581dc8bb35dfec4cf0afd Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 7 Oct 2016 16:31:20 -0700 Subject: [PATCH 3/5] Add windowed side inputs failure test. --- .../apache_beam/transforms/sideinputs_test.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 sdks/python/apache_beam/transforms/sideinputs_test.py diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py new file mode 100644 index 0000000000000..ddb7ef53c1e4b --- /dev/null +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for side inputs.""" + +import logging +import unittest + +import apache_beam as beam +from apache_beam.transforms import window + +class SideInputsTest(unittest.TestCase): + + # TODO(BEAM-733): Actually support this. + def test_no_sideinput_windowing(self): + p = beam.Pipeline('DirectPipelineRunner') + pc = p | beam.Create([0, 1]) | beam.WindowInto(window.FixedWindows(10)) + with self.assertRaises(ValueError): + res = pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + unittest.main() From 27638a0a0f3239ffa685a34131583b868b756432 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 10 Oct 2016 10:35:15 -0700 Subject: [PATCH 4/5] lint --- sdks/python/apache_beam/transforms/sideinputs_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index ddb7ef53c1e4b..0dc9d0ce0ff1e 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -30,7 +30,8 @@ def test_no_sideinput_windowing(self): p = beam.Pipeline('DirectPipelineRunner') pc = p | beam.Create([0, 1]) | beam.WindowInto(window.FixedWindows(10)) with self.assertRaises(ValueError): - res = pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc)) + # pylint: disable=expression-not-assigned + pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc)) if __name__ == '__main__': From b16ef196840746c5fd81cf2dbbcb7ed0b472c0e7 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 10 Oct 2016 11:27:48 -0700 Subject: [PATCH 5/5] more lint --- sdks/python/apache_beam/transforms/sideinputs_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 0dc9d0ce0ff1e..8e292e3657ec7 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -23,6 +23,7 @@ import apache_beam as beam from apache_beam.transforms import window + class SideInputsTest(unittest.TestCase): # TODO(BEAM-733): Actually support this.