From 16a138bb506fb6458923d0bdc4dfda9078abe189 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Jan 2017 21:13:36 -0800 Subject: [PATCH 1/3] Code cleanup now that all runners support windowed side inputs. --- sdks/python/apache_beam/runners/common.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 0f63cbc6a2cc..6632424c7c74 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -98,13 +98,9 @@ def __init__(self, self.is_new_dofn = True # SideInputs - self.side_inputs = [side_input - if isinstance(side_input, sideinputs.SideInputMap) - else {global_window: side_input} - for side_input in side_inputs] + self.side_inputs = side_inputs self.has_windowed_side_inputs = not all( - isinstance(si, dict) or si.is_globally_windowed() - for si in self.side_inputs) + si.is_globally_windowed() for si in self.side_inputs) self.args = args if args else [] self.kwargs = kwargs if kwargs else {} @@ -117,14 +113,8 @@ def __init__(self, self.dofn = fn self.dofn_process = fn.process else: - # TODO(robertwb): Remove when all runners pass side input maps. - side_inputs = [side_input - if isinstance(side_input, sideinputs.SideInputMap) - else {global_window: side_input} - for side_input in side_inputs] if side_inputs and all( - isinstance(side_input, dict) or side_input.is_globally_windowed() - for side_input in side_inputs): + side_input.is_globally_windowed() for side_input in side_inputs): args, kwargs = util.insert_values_in_args( args, kwargs, [side_input[global_window] for side_input in side_inputs]) From 6ff06f0ff23428d70710b52b00172edbcabe180a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Jan 2017 21:59:24 -0800 Subject: [PATCH 2/3] lint --- sdks/python/apache_beam/runners/common.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 6632424c7c74..b2ece128d974 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -24,7 +24,6 @@ from apache_beam.internal import util from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core -from apache_beam.transforms import sideinputs from apache_beam.transforms import window from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn From 0bbb51a398c1d9fe0e12df587243d37ca3c9fd7f Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 23 Jan 2017 10:23:01 -0800 Subject: [PATCH 3/3] resolve review comments --- sdks/python/apache_beam/runners/common.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index b2ece128d974..9c8fdfc58fb0 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -70,6 +70,21 @@ def __init__(self, # Preferred alternative to context # TODO(robertwb): Remove once all runners are updated. state=None): + """Initializes a DoFnRunner. + + Args: + fn: user DoFn to invoke + args: positional side input arguments (static and placeholder), if any + kwargs: keyword side input arguments (static and placeholder), if any + side_inputs: list of sideinput.SideInputMaps for deferred side inputs + windowing: windowing properties of the output PCollection(s) + context: a DoFnContext to use (deprecated) + tagged_receivers: a dict of tag name to Receiver objects + logger: a logging module (deprecated) + step_name: the name of this step + logging_context: a LoggingContext object + state: handle for accessing DoFn state + """ self.step_name = step_name self.window_fn = windowing.windowfn self.tagged_receivers = tagged_receivers @@ -96,7 +111,7 @@ def __init__(self, if isinstance(fn, core.NewDoFn): self.is_new_dofn = True - # SideInputs + # Stash values for use in new_dofn_process. self.side_inputs = side_inputs self.has_windowed_side_inputs = not all( si.is_globally_windowed() for si in self.side_inputs)