From af352a670df3cdf235e9895d46e4fec0d063b782 Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Fri, 7 Apr 2017 13:41:28 -0700 Subject: [PATCH 1/7] Updates DoFn invocation logic to be more extensible. Adds following abstractions. DoFnSignature: describes the signature of a given DoFn object. DoFnInvoker: defines a particular way for invoking DoFn methods. --- sdks/python/apache_beam/runners/common.pxd | 25 +- sdks/python/apache_beam/runners/common.py | 343 ++++++++++++++------- 2 files changed, 244 insertions(+), 124 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 59529424554f..c3143d230aa9 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -28,6 +28,25 @@ cdef class Receiver(object): cpdef receive(self, WindowedValue windowed_value) +cdef class Method(object): + cdef public object args + cdef public object defaults + cdef object _method_value + + +cdef class DoFnSignature(object): + cdef public Method process_method + cdef public Method start_bundle_method + cdef public Method finish_bundle_method + cdef public object do_fn + + +cdef class DoFnInvoker(object): + cpdef invoke_process(self, WindowedValue element, process_output_fn) + cpdef invoke_start_bundle(self, process_output_fn) + cpdef invoke_finish_bundle(self, process_output_fn) + + cdef class DoFnRunner(Receiver): cdef object dofn @@ -47,11 +66,9 @@ cdef class DoFnRunner(Receiver): cdef Receiver main_receivers - cpdef process(self, WindowedValue element) - cdef _dofn_invoker(self, WindowedValue element) - cdef _dofn_simple_invoker(self, WindowedValue element) - cdef _dofn_per_window_invoker(self, WindowedValue element) + cdef DoFnInvoker do_fn_invoker + cpdef process(self, WindowedValue element) @cython.locals(windowed_value=WindowedValue) cpdef _process_outputs(self, WindowedValue element, results) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 64d6d0010bb3..59ed043c66a1 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -51,117 +51,150 @@ def receive(self, windowed_value): raise NotImplementedError -class DoFnRunner(Receiver): - """A helper class for executing ParDo operations. +class Method(object): + """Represents a method of a DoFn object.""" + + def __init__(self, method_value, args, defaults): + self.args = args + self.defaults = defaults + self._method_value = method_value + + def __call__(self, *args, **kwargs): + return self._method_value(*args, **kwargs) + + +class DoFnSignature(object): + """Represents the signature of a given ``DoFn`` object. + + Signature of a ``DoFn`` provides a view of the properties of a given ``DoFn``. + Among other things, this will give an extensible way for for (1) accessing the + structure of the ``DoFn`` including methods and method parameters + (2) identifying features that a given ``DoFn`` support, for example, whether + a given ``DoFn`` is a Splittable ``DoFn`` ( + https://s.apache.org/splittable-do-fn) (3) validating a ``DoFn`` based on the + feature set offered by it. """ - def __init__(self, - fn, - args, - kwargs, - side_inputs, - windowing, - context=None, - tagged_receivers=None, - logger=None, - step_name=None, - # Preferred alternative to logger - # TODO(robertwb): Remove once all runners are updated. - logging_context=None, - # Preferred alternative to context - # TODO(robertwb): Remove once all runners are updated. - state=None, - scoped_metrics_container=None): - """Initializes a DoFnRunner. + def __init__(self): + # We add a property here for all methods defined by Beam DoFn features. + self.process_method = None + self.start_bundle_method = None + self.finish_bundle_method = None + self.do_fn = None + + @staticmethod + def create_signature(do_fn): + """Creates the signature fo a given DoFn object.""" + assert isinstance(do_fn, core.DoFn) + signature = DoFnSignature() + + def _create_do_fn_method(do_fn, method_name): + arguments, _, _, defaults = do_fn.get_function_arguments(method_name) + defaults = defaults if defaults else [] + method_value = getattr(do_fn, method_name) + return Method(method_value, arguments, defaults) - Args: - fn: user DoFn to invoke - args: positional side input arguments (static and placeholder), if any - kwargs: keyword side input arguments (static and placeholder), if any - side_inputs: list of sideinput.SideInputMaps for deferred side inputs - windowing: windowing properties of the output PCollection(s) - context: a DoFnContext to use (deprecated) - tagged_receivers: a dict of tag name to Receiver objects - logger: a logging module (deprecated) - step_name: the name of this step - logging_context: a LoggingContext object - state: handle for accessing DoFn state - scoped_metrics_container: Context switcher for metrics container - """ - self.step_name = step_name - self.window_fn = windowing.windowfn - self.tagged_receivers = tagged_receivers - self.scoped_metrics_container = (scoped_metrics_container - or ScopedMetricsContainer()) + signature.do_fn = do_fn + signature.process_method = _create_do_fn_method(do_fn, 'process') + signature.start_bundle_method = _create_do_fn_method(do_fn, 'start_bundle') + signature.finish_bundle_method = _create_do_fn_method( + do_fn, 'finish_bundle') - global_window = GlobalWindow() + return signature - # Need to support multiple iterations. - side_inputs = list(side_inputs) - if logging_context: - self.logging_context = logging_context - else: - self.logging_context = get_logging_context(logger, step_name=step_name) +class DoFnInvoker(object): + """An abstraction that can be used to execute DoFn methods. - # Optimize for the common case. - self.main_receivers = as_receiver(tagged_receivers[None]) + A DoFnInvoker describes a particular way for invoking methods of a DoFn + represented by a given DoFnSignature.""" - # TODO(sourabh): Deprecate the use of context - if state: - assert context is None - self.context = DoFnContext(self.step_name, state=state) - else: - assert context is not None - self.context = context + def __init__(self, signature): + self.signature = signature - class ArgPlaceholder(object): - def __init__(self, placeholder): - self.placeholder = placeholder + @staticmethod + def create_invoker( + signature, use_simple_invoker, context, side_inputs, input_args, + input_kwargs): + if use_simple_invoker: + return SimpleInvoker(signature) + else: + return PerWindowInvoker( + signature, context, side_inputs, input_args, input_kwargs) - # Stash values for use in dofn_process. - self.side_inputs = side_inputs - self.has_windowed_inputs = not all( - si.is_globally_windowed() for si in self.side_inputs) + def invoke_process(self, element, process_output_fn): + raise NotImplementedError - self.args = args if args else [] - self.kwargs = kwargs if kwargs else {} - self.dofn = fn - self.dofn_process = fn.process + def invoke_start_bundle(self, process_output_fn): + defaults = self.signature.start_bundle_method.defaults + defaults = defaults if defaults else [] + args = [self.context if d == core.DoFn.ContextParam else d + for d in defaults] + process_output_fn(None, self.signature.start_bundle_method(*args)) - arguments, _, _, defaults = self.dofn.get_function_arguments('process') + def invoke_finish_bundle(self, process_output_fn): + defaults = self.signature.start_bundle_method.defaults defaults = defaults if defaults else [] - self_in_args = int(self.dofn.is_process_bounded()) + args = [self.context if d == core.DoFn.ContextParam else d + for d in defaults] + process_output_fn(None, self.signature.finish_bundle_method(*args)) - self.use_simple_invoker = ( - not side_inputs and not args and not kwargs and not defaults) - if self.use_simple_invoker: - # As we're using the simple invoker we don't need to compute placeholders - return +class SimpleInvoker(DoFnInvoker): + """An invoker that processes elements ignoring windowing information.""" + + def invoke_process(self, element, process_output_fn): + process_output_fn(element, self.signature.process_method(element.value)) + + +class PerWindowInvoker(DoFnInvoker): + """An invoker that processes elements considering windowing information.""" + + def __init__(self, signature, context, side_inputs, input_args, input_kwargs): + super(PerWindowInvoker, self).__init__(signature) + self.side_inputs = side_inputs + self.context = context + self.has_windowed_inputs = not all( + si.is_globally_windowed() for si in side_inputs) + default_arg_values = signature.process_method.defaults self.has_windowed_inputs = (self.has_windowed_inputs or - core.DoFn.WindowParam in defaults) + core.DoFn.WindowParam in default_arg_values) # Try to prepare all the arguments that can just be filled in # without any additional work. in the process function. # Also cache all the placeholders needed in the process function. # Fill in sideInputs if they are globally windowed + + global_window = GlobalWindow() + + self.args = input_args if input_args else [] + self.kwargs = input_kwargs if input_kwargs else {} + if not self.has_windowed_inputs: self.args, self.kwargs = util.insert_values_in_args( - args, kwargs, [si[global_window] for si in side_inputs]) + input_args, input_kwargs, [si[global_window] for si in side_inputs]) + + arguments = signature.process_method.args + defaults = signature.process_method.defaults + + # Create placeholder for element parameter of DoFn.process() method. + self_in_args = int(signature.do_fn.is_process_bounded()) + + class ArgPlaceholder(object): + def __init__(self, placeholder): + self.placeholder = placeholder - # Create placeholder for element parameter - if core.DoFn.ElementParam not in defaults: - args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args - final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \ - self.args[:args_to_pick] + if core.DoFn.ElementParam not in default_arg_values: + args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args + final_args = ( + [ArgPlaceholder(core.DoFn.ElementParam)] + self.args[:args_to_pick]) else: args_to_pick = len(arguments) - len(defaults) - self_in_args final_args = self.args[:args_to_pick] # Fill the OtherPlaceholders for context, window or timestamp - args = iter(self.args[args_to_pick:]) + input_args = iter(self.args[args_to_pick:]) for a, d in zip(arguments[-len(defaults):], defaults): if d == core.DoFn.ElementParam: final_args.append(ArgPlaceholder(d)) @@ -174,30 +207,37 @@ def __init__(self, placeholder): elif d == core.DoFn.SideInputParam: # If no more args are present then the value must be passed via kwarg try: - final_args.append(args.next()) + final_args.append(input_args.next()) except StopIteration: if a not in self.kwargs: raise ValueError("Value for sideinput %s not provided" % a) else: # If no more args are present then the value must be passed via kwarg try: - final_args.append(args.next()) + final_args.append(input_args.next()) except StopIteration: pass - final_args.extend(list(args)) + final_args.extend(list(input_args)) self.args = final_args # Stash the list of placeholder positions for performance self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args) if isinstance(x, ArgPlaceholder)] - def receive(self, windowed_value): - self.process(windowed_value) - - def _dofn_simple_invoker(self, element): - self._process_outputs(element, self.dofn_process(element.value)) + def invoke_process(self, element, process_output_fn): + self.context.set_element(element) + # Call for the process function for each window if has windowed side inputs + # or if the process accesses the window parameter. We can just call it once + # otherwise as none of the arguments are changing + if self.has_windowed_inputs and len(element.windows) != 1: + for w in element.windows: + self._invoke_per_window( + WindowedValue(element.value, element.timestamp, (w,)), + process_output_fn) + else: + self._invoke_per_window(element, process_output_fn) - def _dofn_per_window_invoker(self, element): + def _invoke_per_window(self, element, process_output_fn): if self.has_windowed_inputs: window, = element.windows args, kwargs = util.insert_values_in_args( @@ -214,62 +254,125 @@ def _dofn_per_window_invoker(self, element): args[i] = window elif p == core.DoFn.TimestampParam: args[i] = element.timestamp + if not kwargs: - self._process_outputs(element, self.dofn_process(*args)) + process_output_fn(element, self.signature.process_method(*args)) else: - self._process_outputs(element, self.dofn_process(*args, **kwargs)) + process_output_fn(element, self.signature.process_method( + *args, **kwargs)) - def _dofn_invoker(self, element): - self.context.set_element(element) - # Call for the process function for each window if has windowed side inputs - # or if the process accesses the window parameter. We can just call it once - # otherwise as none of the arguments are changing - if self.has_windowed_inputs and len(element.windows) != 1: - for w in element.windows: - self._dofn_per_window_invoker( - WindowedValue(element.value, element.timestamp, (w,))) + +class DoFnRunner(Receiver): + """A helper class for executing ParDo operations. + """ + + def __init__(self, + fn, + args, + kwargs, + side_inputs, + windowing, + context=None, + tagged_receivers=None, + logger=None, + step_name=None, + # Preferred alternative to logger + # TODO(robertwb): Remove once all runners are updated. + logging_context=None, + # Preferred alternative to context + # TODO(robertwb): Remove once all runners are updated. + state=None, + scoped_metrics_container=None): + """Initializes a DoFnRunner. + + Args: + fn: user DoFn to invoke + args: positional side input arguments (static and placeholder), if any + kwargs: keyword side input arguments (static and placeholder), if any + side_inputs: list of sideinput.SideInputMaps for deferred side inputs + windowing: windowing properties of the output PCollection(s) + context: a DoFnContext to use (deprecated) + tagged_receivers: a dict of tag name to Receiver objects + logger: a logging module (deprecated) + step_name: the name of this step + logging_context: a LoggingContext object + state: handle for accessing DoFn state + scoped_metrics_container: Context switcher for metrics container + """ + self.tagged_receivers = tagged_receivers + self.scoped_metrics_container = (scoped_metrics_container + or ScopedMetricsContainer()) + + # Optimize for the common case. + self.main_receivers = as_receiver(tagged_receivers[None]) + + self.step_name = step_name + + # Need to support multiple iterations. + side_inputs = list(side_inputs) + + if logging_context: + self.logging_context = logging_context else: - self._dofn_per_window_invoker(element) + self.logging_context = get_logging_context(logger, step_name=step_name) + + # TODO(sourabh): Deprecate the use of context + if state: + assert context is None + context = DoFnContext(step_name, state=state) + else: + assert context is not None + context = context + + self.context = context + + do_fn_signature = DoFnSignature.create_signature(fn) + + default_arg_values = do_fn_signature.process_method.defaults + use_simple_invoker = ( + not side_inputs and not args and not kwargs and not default_arg_values) - def _invoke_bundle_method(self, method): + self.window_fn = windowing.windowfn + + self.do_fn_invoker = DoFnInvoker.create_invoker( + do_fn_signature, use_simple_invoker, context, side_inputs, args, kwargs) + + def receive(self, windowed_value): + self.process(windowed_value) + + def process(self, windowed_value): + self._invoke_process_method(windowed_value) + + def _invoke_bundle_method(self, bundle_method): try: self.logging_context.enter() self.scoped_metrics_container.enter() self.context.set_element(None) - f = getattr(self.dofn, method) - - _, _, _, defaults = self.dofn.get_function_arguments(method) - defaults = defaults if defaults else [] - args = [self.context if d == core.DoFn.ContextParam else d - for d in defaults] - self._process_outputs(None, f(*args)) + bundle_method(self._process_outputs) except BaseException as exn: - self.reraise_augmented(exn) + self._reraise_augmented(exn) finally: self.scoped_metrics_container.exit() self.logging_context.exit() def start(self): - self._invoke_bundle_method('start_bundle') + self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle) def finish(self): - self._invoke_bundle_method('finish_bundle') + self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle) - def process(self, element): + def _invoke_process_method(self, element): try: self.logging_context.enter() self.scoped_metrics_container.enter() - if self.use_simple_invoker: - self._dofn_simple_invoker(element) - else: - self._dofn_invoker(element) + self.do_fn_invoker.invoke_process(element, self._process_outputs) except BaseException as exn: - self.reraise_augmented(exn) + self._reraise_augmented(exn) finally: self.scoped_metrics_container.exit() self.logging_context.exit() - def reraise_augmented(self, exn): + def _reraise_augmented(self, exn): if getattr(exn, '_tagged_with_step', False) or not self.step_name: raise args = exn.args From eba13a6efd1d92292a17985ee832b7fd2ee7ec8e Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Wed, 19 Apr 2017 02:46:25 -0700 Subject: [PATCH 2/7] Addressing reviewer comments. --- sdks/python/apache_beam/runners/common.pxd | 38 ++++-- sdks/python/apache_beam/runners/common.py | 152 +++++++++++++-------- 2 files changed, 119 insertions(+), 71 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index c3143d230aa9..286bb2b1ee8d 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -28,44 +28,56 @@ cdef class Receiver(object): cpdef receive(self, WindowedValue windowed_value) -cdef class Method(object): +cdef class DoFnMethodWrapper(object): cdef public object args cdef public object defaults cdef object _method_value + cpdef call(self, list args, dict kwargs) + cdef class DoFnSignature(object): - cdef public Method process_method - cdef public Method start_bundle_method - cdef public Method finish_bundle_method + cdef public DoFnMethodWrapper process_method + cdef public DoFnMethodWrapper start_bundle_method + cdef public DoFnMethodWrapper finish_bundle_method cdef public object do_fn cdef class DoFnInvoker(object): + + cdef DoFnSignature signature + cpdef invoke_process(self, WindowedValue element, process_output_fn) cpdef invoke_start_bundle(self, process_output_fn) cpdef invoke_finish_bundle(self, process_output_fn) + # TODO(chamikara) define static method create_invoker() here. + + +cdef class SimpleInvoker(DoFnInvoker): + pass + + +cdef class PerWindowInvoker(DoFnInvoker): + + cdef list side_inputs + cdef DoFnContext context + cdef list args + cdef dict kwargs + cdef list placeholders + cdef bint has_windowed_inputs + cdef class DoFnRunner(Receiver): - cdef object dofn - cdef object dofn_process cdef object window_fn cdef DoFnContext context cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name - cdef list args - cdef dict kwargs cdef ScopedMetricsContainer scoped_metrics_container cdef list side_inputs - cdef bint has_windowed_inputs - cdef list placeholders - cdef bint use_simple_invoker - cdef Receiver main_receivers - cdef DoFnInvoker do_fn_invoker cpdef process(self, WindowedValue element) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 59ed043c66a1..563766a6f953 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -51,16 +51,36 @@ def receive(self, windowed_value): raise NotImplementedError -class Method(object): +class DoFnMethodWrapper(object): """Represents a method of a DoFn object.""" def __init__(self, method_value, args, defaults): + """ + Initiates a ``DoFnMethodWrapper``. + + Args: + method_value: Python method represented by this object. + args: a list that gives the arguments of the given method. + defaults: a list that gives the default values of arguments mentioned + above. If len(defaults) > len(args) default values are for the last + len(defaults) arguments in args. + """ self.args = args self.defaults = defaults self._method_value = method_value - def __call__(self, *args, **kwargs): - return self._method_value(*args, **kwargs) + def call(self, input_args, input_kwargs): + """Invokes the method represented by this object. + + Args: + input_args: a list of arguments to be passed when invoking the method. + input_kwargs: a dictionary of keyword arguments to be passed when invoking + the method. + + Using a regular method call here instead of __call__ to reduce per-element + overhead. + """ + return self._method_value(*input_args, **input_kwargs) class DoFnSignature(object): @@ -75,32 +95,43 @@ class DoFnSignature(object): feature set offered by it. """ - def __init__(self): + def __init__(self, do_fn): # We add a property here for all methods defined by Beam DoFn features. self.process_method = None self.start_bundle_method = None self.finish_bundle_method = None - self.do_fn = None - @staticmethod - def create_signature(do_fn): - """Creates the signature fo a given DoFn object.""" assert isinstance(do_fn, core.DoFn) - signature = DoFnSignature() + self.do_fn = do_fn def _create_do_fn_method(do_fn, method_name): arguments, _, _, defaults = do_fn.get_function_arguments(method_name) defaults = defaults if defaults else [] method_value = getattr(do_fn, method_name) - return Method(method_value, arguments, defaults) + return DoFnMethodWrapper(method_value, arguments, defaults) + + self.process_method = _create_do_fn_method(do_fn, 'process') + self.start_bundle_method = _create_do_fn_method(do_fn, 'start_bundle') + self.finish_bundle_method = _create_do_fn_method(do_fn, 'finish_bundle') + self._validate() + + def _validate(self): + # start_bundle and finish_bundle methods should only have ContextParam as a + # default argument. + self._validate_start_bundle() + self._validate_finish_bundle() - signature.do_fn = do_fn - signature.process_method = _create_do_fn_method(do_fn, 'process') - signature.start_bundle_method = _create_do_fn_method(do_fn, 'start_bundle') - signature.finish_bundle_method = _create_do_fn_method( - do_fn, 'finish_bundle') + def _validate_start_bundle(self): + self._validate_bundle_method() - return signature + def _validate_finish_bundle(self): + self._validate_bundle_method() + + def _validate_bundle_method(self): + assert core.DoFn.ElementParam not in self.start_bundle_method.defaults + assert core.DoFn.SideInputParam not in self.start_bundle_method.defaults + assert core.DoFn.TimestampParam not in self.start_bundle_method.defaults + assert core.DoFn.WindowParam not in self.start_bundle_method.defaults class DoFnInvoker(object): @@ -114,8 +145,21 @@ def __init__(self, signature): @staticmethod def create_invoker( - signature, use_simple_invoker, context, side_inputs, input_args, + signature, context, side_inputs, input_args, input_kwargs): + """ Creates a new DoFnInvoker based on given arguments. + + Args: + signature: A DoFnSignature for the DoFn being invoked. + context: Context to be used when invoking the DoFn (deprecated). + side_inputs: side inputs to be used when invoking th process method. + input_args: arguments to be used when invoking the process method + input_kwargs: kwargs to be used when invoking the process method. + """ + default_arg_values = signature.process_method.defaults + use_simple_invoker = ( + not side_inputs and not input_args and not input_kwargs and + not default_arg_values) if use_simple_invoker: return SimpleInvoker(signature) else: @@ -127,24 +171,23 @@ def invoke_process(self, element, process_output_fn): def invoke_start_bundle(self, process_output_fn): defaults = self.signature.start_bundle_method.defaults - defaults = defaults if defaults else [] args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] - process_output_fn(None, self.signature.start_bundle_method(*args)) + process_output_fn(None, self.signature.start_bundle_method.call(args, {})) def invoke_finish_bundle(self, process_output_fn): - defaults = self.signature.start_bundle_method.defaults - defaults = defaults if defaults else [] + defaults = self.signature.finish_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] - process_output_fn(None, self.signature.finish_bundle_method(*args)) + process_output_fn(None, self.signature.finish_bundle_method.call(args, {})) class SimpleInvoker(DoFnInvoker): """An invoker that processes elements ignoring windowing information.""" def invoke_process(self, element, process_output_fn): - process_output_fn(element, self.signature.process_method(element.value)) + process_output_fn(element, self.signature.process_method.call( + [element.value], {})) class PerWindowInvoker(DoFnInvoker): @@ -154,11 +197,10 @@ def __init__(self, signature, context, side_inputs, input_args, input_kwargs): super(PerWindowInvoker, self).__init__(signature) self.side_inputs = side_inputs self.context = context - self.has_windowed_inputs = not all( - si.is_globally_windowed() for si in side_inputs) default_arg_values = signature.process_method.defaults - self.has_windowed_inputs = (self.has_windowed_inputs or - core.DoFn.WindowParam in default_arg_values) + self.has_windowed_inputs = ( + not all(si.is_globally_windowed() for si in side_inputs) or + (core.DoFn.WindowParam in default_arg_values)) # Try to prepare all the arguments that can just be filled in # without any additional work. in the process function. @@ -168,11 +210,11 @@ def __init__(self, signature, context, side_inputs, input_args, input_kwargs): global_window = GlobalWindow() - self.args = input_args if input_args else [] - self.kwargs = input_kwargs if input_kwargs else {} + args = input_args if input_args else [] + kwargs = input_kwargs if input_kwargs else {} if not self.has_windowed_inputs: - self.args, self.kwargs = util.insert_values_in_args( + args, kwargs = util.insert_values_in_args( input_args, input_kwargs, [si[global_window] for si in side_inputs]) arguments = signature.process_method.args @@ -188,13 +230,13 @@ def __init__(self, placeholder): if core.DoFn.ElementParam not in default_arg_values: args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args final_args = ( - [ArgPlaceholder(core.DoFn.ElementParam)] + self.args[:args_to_pick]) + [ArgPlaceholder(core.DoFn.ElementParam)] + args[:args_to_pick]) else: args_to_pick = len(arguments) - len(defaults) - self_in_args - final_args = self.args[:args_to_pick] + final_args = args[:args_to_pick] # Fill the OtherPlaceholders for context, window or timestamp - input_args = iter(self.args[args_to_pick:]) + input_args = iter(args[args_to_pick:]) for a, d in zip(arguments[-len(defaults):], defaults): if d == core.DoFn.ElementParam: final_args.append(ArgPlaceholder(d)) @@ -209,7 +251,7 @@ def __init__(self, placeholder): try: final_args.append(input_args.next()) except StopIteration: - if a not in self.kwargs: + if a not in kwargs: raise ValueError("Value for sideinput %s not provided" % a) else: # If no more args are present then the value must be passed via kwarg @@ -218,12 +260,14 @@ def __init__(self, placeholder): except StopIteration: pass final_args.extend(list(input_args)) - self.args = final_args # Stash the list of placeholder positions for performance - self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args) + self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(final_args) if isinstance(x, ArgPlaceholder)] + self.args = final_args + self.kwargs = kwargs + def invoke_process(self, element, process_output_fn): self.context.set_element(element) # Call for the process function for each window if has windowed side inputs @@ -256,10 +300,10 @@ def _invoke_per_window(self, element, process_output_fn): args[i] = element.timestamp if not kwargs: - process_output_fn(element, self.signature.process_method(*args)) + process_output_fn(element, self.signature.process_method.call(args, {})) else: - process_output_fn(element, self.signature.process_method( - *args, **kwargs)) + process_output_fn(element, self.signature.process_method.call( + args, kwargs)) class DoFnRunner(Receiver): @@ -326,22 +370,25 @@ def __init__(self, self.context = context - do_fn_signature = DoFnSignature.create_signature(fn) - - default_arg_values = do_fn_signature.process_method.defaults - use_simple_invoker = ( - not side_inputs and not args and not kwargs and not default_arg_values) - + do_fn_signature = DoFnSignature(fn) self.window_fn = windowing.windowfn self.do_fn_invoker = DoFnInvoker.create_invoker( - do_fn_signature, use_simple_invoker, context, side_inputs, args, kwargs) + do_fn_signature, context, side_inputs, args, kwargs) def receive(self, windowed_value): self.process(windowed_value) def process(self, windowed_value): - self._invoke_process_method(windowed_value) + try: + self.logging_context.enter() + self.scoped_metrics_container.enter() + self.do_fn_invoker.invoke_process(windowed_value, self._process_outputs) + except BaseException as exn: + self._reraise_augmented(exn) + finally: + self.scoped_metrics_container.exit() + self.logging_context.exit() def _invoke_bundle_method(self, bundle_method): try: @@ -361,17 +408,6 @@ def start(self): def finish(self): self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle) - def _invoke_process_method(self, element): - try: - self.logging_context.enter() - self.scoped_metrics_container.enter() - self.do_fn_invoker.invoke_process(element, self._process_outputs) - except BaseException as exn: - self._reraise_augmented(exn) - finally: - self.scoped_metrics_container.exit() - self.logging_context.exit() - def _reraise_augmented(self, exn): if getattr(exn, '_tagged_with_step', False) or not self.step_name: raise From 49cd48a7646e4d7b0738e557bd3c86ceeade94fd Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Thu, 20 Apr 2017 17:45:34 -0700 Subject: [PATCH 3/7] Addressing reviewer comments. --- sdks/python/apache_beam/runners/common.pxd | 20 ++-- sdks/python/apache_beam/runners/common.py | 120 +++++++++++++-------- 2 files changed, 87 insertions(+), 53 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 286bb2b1ee8d..d2b5cee60770 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -31,7 +31,7 @@ cdef class Receiver(object): cdef class DoFnMethodWrapper(object): cdef public object args cdef public object defaults - cdef object _method_value + cdef public object method_value cpdef call(self, list args, dict kwargs) @@ -45,9 +45,9 @@ cdef class DoFnSignature(object): cdef class DoFnInvoker(object): - cdef DoFnSignature signature + cdef public DoFnSignature signature - cpdef invoke_process(self, WindowedValue element, process_output_fn) + cpdef invoke_process(self, WindowedValue windowed_value, process_output_fn) cpdef invoke_start_bundle(self, process_output_fn) cpdef invoke_finish_bundle(self, process_output_fn) @@ -55,17 +55,23 @@ cdef class DoFnInvoker(object): cdef class SimpleInvoker(DoFnInvoker): - pass + + cdef object process_method + + cpdef invoke_process(self, WindowedValue windowed_value, process_output_fn) cdef class PerWindowInvoker(DoFnInvoker): cdef list side_inputs cdef DoFnContext context - cdef list args - cdef dict kwargs + cdef list args_for_process + cdef dict kwargs_for_process cdef list placeholders cdef bint has_windowed_inputs + cdef object process_method + + cpdef invoke_process(self, WindowedValue windowed_value, process_output_fn) cdef class DoFnRunner(Receiver): @@ -80,7 +86,7 @@ cdef class DoFnRunner(Receiver): cdef Receiver main_receivers cdef DoFnInvoker do_fn_invoker - cpdef process(self, WindowedValue element) + cpdef process(self, WindowedValue windowed_value) @cython.locals(windowed_value=WindowedValue) cpdef _process_outputs(self, WindowedValue element, results) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 563766a6f953..26ee75296130 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -67,7 +67,7 @@ def __init__(self, method_value, args, defaults): """ self.args = args self.defaults = defaults - self._method_value = method_value + self.method_value = method_value def call(self, input_args, input_kwargs): """Invokes the method represented by this object. @@ -80,7 +80,7 @@ def call(self, input_args, input_kwargs): Using a regular method call here instead of __call__ to reduce per-element overhead. """ - return self._method_value(*input_args, **input_kwargs) + return self.method_value(*input_args, **input_kwargs) class DoFnSignature(object): @@ -120,6 +120,7 @@ def _validate(self): # default argument. self._validate_start_bundle() self._validate_finish_bundle() + self._validate_process() def _validate_start_bundle(self): self._validate_bundle_method() @@ -127,6 +128,9 @@ def _validate_start_bundle(self): def _validate_finish_bundle(self): self._validate_bundle_method() + def _validate_process(self): + pass + def _validate_bundle_method(self): assert core.DoFn.ElementParam not in self.start_bundle_method.defaults assert core.DoFn.SideInputParam not in self.start_bundle_method.defaults @@ -150,7 +154,7 @@ def create_invoker( """ Creates a new DoFnInvoker based on given arguments. Args: - signature: A DoFnSignature for the DoFn being invoked. + signature: a DoFnSignature for the DoFn being invoked. context: Context to be used when invoking the DoFn (deprecated). side_inputs: side inputs to be used when invoking th process method. input_args: arguments to be used when invoking the process method @@ -166,16 +170,37 @@ def create_invoker( return PerWindowInvoker( signature, context, side_inputs, input_args, input_kwargs) - def invoke_process(self, element, process_output_fn): + def invoke_process(self, windowed_value, process_output_fn): + """Invokes the DoFn.process() function. + + Args: + windowed_value: a WindowedValue object that gives the element for which + process() method should be invoked along with the window + the element belongs to. + process_output_fn: a function to which the result of DoFn.process() + invocation should be passed. + """ raise NotImplementedError def invoke_start_bundle(self, process_output_fn): + """Invokes the DoFn.start_bundle() method. + + Args: + process_output_fn: a function to which the result of DoFn.start_bundle() + invocation should be passed. + """ defaults = self.signature.start_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] process_output_fn(None, self.signature.start_bundle_method.call(args, {})) def invoke_finish_bundle(self, process_output_fn): + """Invokes the DoFn.finish_bundle() method. + + Args: + process_output_fn: a function to which the result of DoFn.finish_bundle() + invocation should be passed. + """ defaults = self.signature.finish_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] @@ -185,9 +210,12 @@ def invoke_finish_bundle(self, process_output_fn): class SimpleInvoker(DoFnInvoker): """An invoker that processes elements ignoring windowing information.""" - def invoke_process(self, element, process_output_fn): - process_output_fn(element, self.signature.process_method.call( - [element.value], {})) + def __init__(self, signature): + super(SimpleInvoker, self).__init__(signature) + self.process_method = signature.process_method.method_value + + def invoke_process(self, windowed_value, process_output_fn): + process_output_fn(windowed_value, self.process_method(windowed_value.value)) class PerWindowInvoker(DoFnInvoker): @@ -197,6 +225,7 @@ def __init__(self, signature, context, side_inputs, input_args, input_kwargs): super(PerWindowInvoker, self).__init__(signature) self.side_inputs = side_inputs self.context = context + self.process_method = signature.process_method.method_value default_arg_values = signature.process_method.defaults self.has_windowed_inputs = ( not all(si.is_globally_windowed() for si in side_inputs) or @@ -207,14 +236,13 @@ def __init__(self, signature, context, side_inputs, input_args, input_kwargs): # Also cache all the placeholders needed in the process function. # Fill in sideInputs if they are globally windowed - global_window = GlobalWindow() - args = input_args if input_args else [] - kwargs = input_kwargs if input_kwargs else {} + input_args = input_args if input_args else [] + input_kwargs = input_kwargs if input_kwargs else {} if not self.has_windowed_inputs: - args, kwargs = util.insert_values_in_args( + input_args, input_kwargs = util.insert_values_in_args( input_args, input_kwargs, [si[global_window] for si in side_inputs]) arguments = signature.process_method.args @@ -229,81 +257,81 @@ def __init__(self, placeholder): if core.DoFn.ElementParam not in default_arg_values: args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args - final_args = ( - [ArgPlaceholder(core.DoFn.ElementParam)] + args[:args_to_pick]) + args_with_placeholders = ( + [ArgPlaceholder(core.DoFn.ElementParam)] + input_args[:args_to_pick]) else: args_to_pick = len(arguments) - len(defaults) - self_in_args - final_args = args[:args_to_pick] + args_with_placeholders = input_args[:args_to_pick] # Fill the OtherPlaceholders for context, window or timestamp - input_args = iter(args[args_to_pick:]) + remaining_args_iter = iter(input_args[args_to_pick:]) for a, d in zip(arguments[-len(defaults):], defaults): if d == core.DoFn.ElementParam: - final_args.append(ArgPlaceholder(d)) + args_with_placeholders.append(ArgPlaceholder(d)) elif d == core.DoFn.ContextParam: - final_args.append(ArgPlaceholder(d)) + args_with_placeholders.append(ArgPlaceholder(d)) elif d == core.DoFn.WindowParam: - final_args.append(ArgPlaceholder(d)) + args_with_placeholders.append(ArgPlaceholder(d)) elif d == core.DoFn.TimestampParam: - final_args.append(ArgPlaceholder(d)) + args_with_placeholders.append(ArgPlaceholder(d)) elif d == core.DoFn.SideInputParam: # If no more args are present then the value must be passed via kwarg try: - final_args.append(input_args.next()) + args_with_placeholders.append(remaining_args_iter.next()) except StopIteration: - if a not in kwargs: + if a not in input_kwargs: raise ValueError("Value for sideinput %s not provided" % a) else: # If no more args are present then the value must be passed via kwarg try: - final_args.append(input_args.next()) + args_with_placeholders.append(remaining_args_iter.next()) except StopIteration: pass - final_args.extend(list(input_args)) + args_with_placeholders.extend(list(remaining_args_iter)) # Stash the list of placeholder positions for performance - self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(final_args) + self.placeholders = [(i, x.placeholder) for (i, x) in enumerate( + args_with_placeholders) if isinstance(x, ArgPlaceholder)] - self.args = final_args - self.kwargs = kwargs + self.args_for_process = args_with_placeholders + self.kwargs_for_process = input_kwargs - def invoke_process(self, element, process_output_fn): - self.context.set_element(element) + def invoke_process(self, windowed_value, process_output_fn): + self.context.set_element(windowed_value) # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing - if self.has_windowed_inputs and len(element.windows) != 1: - for w in element.windows: + if self.has_windowed_inputs and len(windowed_value.windows) != 1: + for w in windowed_value.windows: self._invoke_per_window( - WindowedValue(element.value, element.timestamp, (w,)), + WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)), process_output_fn) else: - self._invoke_per_window(element, process_output_fn) + self._invoke_per_window(windowed_value, process_output_fn) - def _invoke_per_window(self, element, process_output_fn): + def _invoke_per_window(self, windowed_value, process_output_fn): if self.has_windowed_inputs: - window, = element.windows - args, kwargs = util.insert_values_in_args( - self.args, self.kwargs, [si[window] for si in self.side_inputs]) + window, = windowed_value.windows + args_for_process, kwargs_for_process = util.insert_values_in_args( + self.args_for_process, self.kwargs_for_process, + [si[window] for si in self.side_inputs]) else: - args, kwargs = self.args, self.kwargs + args_for_process, kwargs_for_process = ( + self.args_for_process, self.kwargs_for_process) # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == for i, p in self.placeholders: if p == core.DoFn.ElementParam: - args[i] = element.value + args_for_process[i] = windowed_value.value elif p == core.DoFn.ContextParam: - args[i] = self.context + args_for_process[i] = self.context elif p == core.DoFn.WindowParam: - args[i] = window + args_for_process[i] = window elif p == core.DoFn.TimestampParam: - args[i] = element.timestamp + args_for_process[i] = windowed_value.timestamp - if not kwargs: - process_output_fn(element, self.signature.process_method.call(args, {})) - else: - process_output_fn(element, self.signature.process_method.call( - args, kwargs)) + process_output_fn(windowed_value, self.process_method( + *args_for_process, **kwargs_for_process)) class DoFnRunner(Receiver): From 0c418d91132b2d3f6b395c0c0007a7f3e2d77e3b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 24 Apr 2017 13:12:13 -0700 Subject: [PATCH 4/7] Statically bind output processor. This avoid the overhead of passing around (and calliing) a Python callable, completely resolving the performance regression. --- sdks/python/apache_beam/runners/common.pxd | 11 ++-- sdks/python/apache_beam/runners/common.py | 70 +++++++++++----------- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index d2b5cee60770..f1c55b08747d 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -46,10 +46,11 @@ cdef class DoFnSignature(object): cdef class DoFnInvoker(object): cdef public DoFnSignature signature + cdef DoFnRunner output_processor - cpdef invoke_process(self, WindowedValue windowed_value, process_output_fn) - cpdef invoke_start_bundle(self, process_output_fn) - cpdef invoke_finish_bundle(self, process_output_fn) + cpdef invoke_process(self, WindowedValue windowed_value) + cpdef invoke_start_bundle(self) + cpdef invoke_finish_bundle(self) # TODO(chamikara) define static method create_invoker() here. @@ -58,8 +59,6 @@ cdef class SimpleInvoker(DoFnInvoker): cdef object process_method - cpdef invoke_process(self, WindowedValue windowed_value, process_output_fn) - cdef class PerWindowInvoker(DoFnInvoker): @@ -71,8 +70,6 @@ cdef class PerWindowInvoker(DoFnInvoker): cdef bint has_windowed_inputs cdef object process_method - cpdef invoke_process(self, WindowedValue windowed_value, process_output_fn) - cdef class DoFnRunner(Receiver): diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 26ee75296130..864e7ac74130 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -144,13 +144,14 @@ class DoFnInvoker(object): A DoFnInvoker describes a particular way for invoking methods of a DoFn represented by a given DoFnSignature.""" - def __init__(self, signature): + def __init__(self, output_processor, signature): + self.output_processor = output_processor self.signature = signature @staticmethod def create_invoker( - signature, context, side_inputs, input_args, - input_kwargs): + output_processor, + signature, context, side_inputs, input_args, input_kwargs): """ Creates a new DoFnInvoker based on given arguments. Args: @@ -165,64 +166,59 @@ def create_invoker( not side_inputs and not input_args and not input_kwargs and not default_arg_values) if use_simple_invoker: - return SimpleInvoker(signature) + return SimpleInvoker(output_processor, signature) else: return PerWindowInvoker( + output_processor, signature, context, side_inputs, input_args, input_kwargs) - def invoke_process(self, windowed_value, process_output_fn): + def invoke_process(self, windowed_value): """Invokes the DoFn.process() function. Args: windowed_value: a WindowedValue object that gives the element for which process() method should be invoked along with the window the element belongs to. - process_output_fn: a function to which the result of DoFn.process() - invocation should be passed. """ raise NotImplementedError - def invoke_start_bundle(self, process_output_fn): + def invoke_start_bundle(self): """Invokes the DoFn.start_bundle() method. - - Args: - process_output_fn: a function to which the result of DoFn.start_bundle() - invocation should be passed. """ defaults = self.signature.start_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] - process_output_fn(None, self.signature.start_bundle_method.call(args, {})) + self.output_processor._process_outputs( + None, self.signature.start_bundle_method.call(args, {})) - def invoke_finish_bundle(self, process_output_fn): + def invoke_finish_bundle(self): """Invokes the DoFn.finish_bundle() method. - - Args: - process_output_fn: a function to which the result of DoFn.finish_bundle() - invocation should be passed. """ defaults = self.signature.finish_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] - process_output_fn(None, self.signature.finish_bundle_method.call(args, {})) + self.output_processor._process_outputs( + None, self.signature.finish_bundle_method.call(args, {})) class SimpleInvoker(DoFnInvoker): """An invoker that processes elements ignoring windowing information.""" - def __init__(self, signature): - super(SimpleInvoker, self).__init__(signature) + def __init__(self, output_processor, signature): + super(SimpleInvoker, self).__init__(output_processor, signature) self.process_method = signature.process_method.method_value - def invoke_process(self, windowed_value, process_output_fn): - process_output_fn(windowed_value, self.process_method(windowed_value.value)) + def invoke_process(self, windowed_value): + self.output_processor._process_outputs( + windowed_value, self.process_method(windowed_value.value)) class PerWindowInvoker(DoFnInvoker): """An invoker that processes elements considering windowing information.""" - def __init__(self, signature, context, side_inputs, input_args, input_kwargs): - super(PerWindowInvoker, self).__init__(signature) + def __init__(self, output_processor, signature, context, + side_inputs, input_args, input_kwargs): + super(PerWindowInvoker, self).__init__(signature, output_processor) self.side_inputs = side_inputs self.context = context self.process_method = signature.process_method.method_value @@ -297,7 +293,7 @@ def __init__(self, placeholder): self.args_for_process = args_with_placeholders self.kwargs_for_process = input_kwargs - def invoke_process(self, windowed_value, process_output_fn): + def invoke_process(self, windowed_value): self.context.set_element(windowed_value) # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once @@ -305,12 +301,11 @@ def invoke_process(self, windowed_value, process_output_fn): if self.has_windowed_inputs and len(windowed_value.windows) != 1: for w in windowed_value.windows: self._invoke_per_window( - WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)), - process_output_fn) + WindowedValue(windowed_value.value, windowed_value.timestamp, (w,))) else: - self._invoke_per_window(windowed_value, process_output_fn) + self._invoke_per_window(windowed_value) - def _invoke_per_window(self, windowed_value, process_output_fn): + def _invoke_per_window(self, windowed_value): if self.has_windowed_inputs: window, = windowed_value.windows args_for_process, kwargs_for_process = util.insert_values_in_args( @@ -330,8 +325,13 @@ def _invoke_per_window(self, windowed_value, process_output_fn): elif p == core.DoFn.TimestampParam: args_for_process[i] = windowed_value.timestamp - process_output_fn(windowed_value, self.process_method( - *args_for_process, **kwargs_for_process)) + if kwargs_for_process: + self.output_processor._process_outputs( + windowed_value, + self.process_method(*args_for_process, **kwargs_for_process)) + else: + self.output_processor._process_outputs( + windowed_value, self.process_method(*args_for_process)) class DoFnRunner(Receiver): @@ -402,7 +402,7 @@ def __init__(self, self.window_fn = windowing.windowfn self.do_fn_invoker = DoFnInvoker.create_invoker( - do_fn_signature, context, side_inputs, args, kwargs) + self, do_fn_signature, context, side_inputs, args, kwargs) def receive(self, windowed_value): self.process(windowed_value) @@ -411,7 +411,7 @@ def process(self, windowed_value): try: self.logging_context.enter() self.scoped_metrics_container.enter() - self.do_fn_invoker.invoke_process(windowed_value, self._process_outputs) + self.do_fn_invoker.invoke_process(windowed_value) except BaseException as exn: self._reraise_augmented(exn) finally: @@ -423,7 +423,7 @@ def _invoke_bundle_method(self, bundle_method): self.logging_context.enter() self.scoped_metrics_container.enter() self.context.set_element(None) - bundle_method(self._process_outputs) + bundle_method() except BaseException as exn: self._reraise_augmented(exn) finally: From 808764c4a38282e717053eb0bf5a2a72c5519003 Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Mon, 24 Apr 2017 15:12:52 -0700 Subject: [PATCH 5/7] Addressing reviewer comments. --- sdks/python/apache_beam/runners/common.pxd | 4 +- sdks/python/apache_beam/runners/common.py | 71 ++++++++-------------- 2 files changed, 27 insertions(+), 48 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index f1c55b08747d..4ee065e606e5 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -33,8 +33,6 @@ cdef class DoFnMethodWrapper(object): cdef public object defaults cdef public object method_value - cpdef call(self, list args, dict kwargs) - cdef class DoFnSignature(object): cdef public DoFnMethodWrapper process_method @@ -85,7 +83,7 @@ cdef class DoFnRunner(Receiver): cpdef process(self, WindowedValue windowed_value) @cython.locals(windowed_value=WindowedValue) - cpdef _process_outputs(self, WindowedValue element, results) + cpdef process_outputs(self, WindowedValue element, results) cdef class DoFnContext(object): diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 864e7ac74130..3de97d6f7fa3 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -54,33 +54,21 @@ def receive(self, windowed_value): class DoFnMethodWrapper(object): """Represents a method of a DoFn object.""" - def __init__(self, method_value, args, defaults): + def __init__(self, do_fn, method_name): """ Initiates a ``DoFnMethodWrapper``. Args: - method_value: Python method represented by this object. - args: a list that gives the arguments of the given method. - defaults: a list that gives the default values of arguments mentioned - above. If len(defaults) > len(args) default values are for the last - len(defaults) arguments in args. + do_fn: A DoFn object that contains the method. + method_name: name of the method as a string. """ + + args, _, _, defaults = do_fn.get_function_arguments(method_name) + defaults = defaults if defaults else [] + method_value = getattr(do_fn, method_name) + self.method_value = method_value self.args = args self.defaults = defaults - self.method_value = method_value - - def call(self, input_args, input_kwargs): - """Invokes the method represented by this object. - - Args: - input_args: a list of arguments to be passed when invoking the method. - input_kwargs: a dictionary of keyword arguments to be passed when invoking - the method. - - Using a regular method call here instead of __call__ to reduce per-element - overhead. - """ - return self.method_value(*input_args, **input_kwargs) class DoFnSignature(object): @@ -97,22 +85,13 @@ class DoFnSignature(object): def __init__(self, do_fn): # We add a property here for all methods defined by Beam DoFn features. - self.process_method = None - self.start_bundle_method = None - self.finish_bundle_method = None assert isinstance(do_fn, core.DoFn) self.do_fn = do_fn - def _create_do_fn_method(do_fn, method_name): - arguments, _, _, defaults = do_fn.get_function_arguments(method_name) - defaults = defaults if defaults else [] - method_value = getattr(do_fn, method_name) - return DoFnMethodWrapper(method_value, arguments, defaults) - - self.process_method = _create_do_fn_method(do_fn, 'process') - self.start_bundle_method = _create_do_fn_method(do_fn, 'start_bundle') - self.finish_bundle_method = _create_do_fn_method(do_fn, 'finish_bundle') + self.process_method = DoFnMethodWrapper(do_fn, 'process') + self.start_bundle_method = DoFnMethodWrapper(do_fn, 'start_bundle') + self.finish_bundle_method = DoFnMethodWrapper(do_fn, 'finish_bundle') self._validate() def _validate(self): @@ -132,10 +111,12 @@ def _validate_process(self): pass def _validate_bundle_method(self): - assert core.DoFn.ElementParam not in self.start_bundle_method.defaults - assert core.DoFn.SideInputParam not in self.start_bundle_method.defaults - assert core.DoFn.TimestampParam not in self.start_bundle_method.defaults - assert core.DoFn.WindowParam not in self.start_bundle_method.defaults + # Bundle methods may only contain ContextParam. + unsupported_dofn_params = [i for i in core.DoFn.__dict__ if ( + i.endswith('Param') and i != 'ContextParam')] + + for param in unsupported_dofn_params: + assert param not in self.start_bundle_method.defaults class DoFnInvoker(object): @@ -188,8 +169,8 @@ def invoke_start_bundle(self): defaults = self.signature.start_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] - self.output_processor._process_outputs( - None, self.signature.start_bundle_method.call(args, {})) + self.output_processor.process_outputs( + None, self.signature.start_bundle_method.method_value(*args)) def invoke_finish_bundle(self): """Invokes the DoFn.finish_bundle() method. @@ -197,8 +178,8 @@ def invoke_finish_bundle(self): defaults = self.signature.finish_bundle_method.defaults args = [self.context if d == core.DoFn.ContextParam else d for d in defaults] - self.output_processor._process_outputs( - None, self.signature.finish_bundle_method.call(args, {})) + self.output_processor.process_outputs( + None, self.signature.finish_bundle_method.method_value(*args)) class SimpleInvoker(DoFnInvoker): @@ -209,7 +190,7 @@ def __init__(self, output_processor, signature): self.process_method = signature.process_method.method_value def invoke_process(self, windowed_value): - self.output_processor._process_outputs( + self.output_processor.process_outputs( windowed_value, self.process_method(windowed_value.value)) @@ -218,7 +199,7 @@ class PerWindowInvoker(DoFnInvoker): def __init__(self, output_processor, signature, context, side_inputs, input_args, input_kwargs): - super(PerWindowInvoker, self).__init__(signature, output_processor) + super(PerWindowInvoker, self).__init__(output_processor, signature) self.side_inputs = side_inputs self.context = context self.process_method = signature.process_method.method_value @@ -326,11 +307,11 @@ def _invoke_per_window(self, windowed_value): args_for_process[i] = windowed_value.timestamp if kwargs_for_process: - self.output_processor._process_outputs( + self.output_processor.process_outputs( windowed_value, self.process_method(*args_for_process, **kwargs_for_process)) else: - self.output_processor._process_outputs( + self.output_processor.process_outputs( windowed_value, self.process_method(*args_for_process)) @@ -447,7 +428,7 @@ def _reraise_augmented(self, exn): else: raise - def _process_outputs(self, windowed_input_element, results): + def process_outputs(self, windowed_input_element, results): """Dispatch the result of computation to the appropriate receivers. A value wrapped in a OutputValue object will be unwrapped and From 60e397746f6793221e936e96bbe526b09c6a7b2e Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Mon, 24 Apr 2017 20:39:02 -0700 Subject: [PATCH 6/7] Addressing reviewer comments.. --- sdks/python/apache_beam/runners/common.pxd | 22 ++++++++----- sdks/python/apache_beam/runners/common.py | 37 +++++++++++++++------- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 4ee065e606e5..b9e23d8cf10f 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -40,11 +40,16 @@ cdef class DoFnSignature(object): cdef public DoFnMethodWrapper finish_bundle_method cdef public object do_fn + cdef _validate(self) + cdef _validate_start_bundle(self) + cdef _validate_finish_bundle(self) + cdef _validate_process(self) + cdef _validate_bundle_method(self, DoFnMethodWrapper bundle_method) -cdef class DoFnInvoker(object): +cdef class DoFnInvoker(object): cdef public DoFnSignature signature - cdef DoFnRunner output_processor + cdef OutputProcessor output_processor cpdef invoke_process(self, WindowedValue windowed_value) cpdef invoke_start_bundle(self) @@ -54,12 +59,10 @@ cdef class DoFnInvoker(object): cdef class SimpleInvoker(DoFnInvoker): - cdef object process_method cdef class PerWindowInvoker(DoFnInvoker): - cdef list side_inputs cdef DoFnContext context cdef list args_for_process @@ -70,18 +73,21 @@ cdef class PerWindowInvoker(DoFnInvoker): cdef class DoFnRunner(Receiver): - - cdef object window_fn cdef DoFnContext context - cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name cdef ScopedMetricsContainer scoped_metrics_container cdef list side_inputs - cdef Receiver main_receivers cdef DoFnInvoker do_fn_invoker cpdef process(self, WindowedValue windowed_value) + + +cdef class OutputProcessor(object): + cdef object window_fn + cdef Receiver main_receivers + cdef object tagged_receivers + @cython.locals(windowed_value=WindowedValue) cpdef process_outputs(self, WindowedValue element, results) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 3de97d6f7fa3..4f84871b5700 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -102,21 +102,21 @@ def _validate(self): self._validate_process() def _validate_start_bundle(self): - self._validate_bundle_method() + self._validate_bundle_method(self.start_bundle_method) def _validate_finish_bundle(self): - self._validate_bundle_method() + self._validate_bundle_method(self.finish_bundle_method) def _validate_process(self): pass - def _validate_bundle_method(self): + def _validate_bundle_method(self, method_wrapper): # Bundle methods may only contain ContextParam. unsupported_dofn_params = [i for i in core.DoFn.__dict__ if ( i.endswith('Param') and i != 'ContextParam')] for param in unsupported_dofn_params: - assert param not in self.start_bundle_method.defaults + assert param not in method_wrapper.defaults class DoFnInvoker(object): @@ -352,13 +352,8 @@ def __init__(self, state: handle for accessing DoFn state scoped_metrics_container: Context switcher for metrics container """ - self.tagged_receivers = tagged_receivers self.scoped_metrics_container = (scoped_metrics_container or ScopedMetricsContainer()) - - # Optimize for the common case. - self.main_receivers = as_receiver(tagged_receivers[None]) - self.step_name = step_name # Need to support multiple iterations. @@ -380,10 +375,14 @@ def __init__(self, self.context = context do_fn_signature = DoFnSignature(fn) - self.window_fn = windowing.windowfn + + # Optimize for the common case. + main_receivers = as_receiver(tagged_receivers[None]) + output_processor = OutputProcessor( + windowing.windowfn, main_receivers, tagged_receivers) self.do_fn_invoker = DoFnInvoker.create_invoker( - self, do_fn_signature, context, side_inputs, args, kwargs) + output_processor, do_fn_signature, context, side_inputs, args, kwargs) def receive(self, windowed_value): self.process(windowed_value) @@ -428,6 +427,22 @@ def _reraise_augmented(self, exn): else: raise + +class OutputProcessor(object): + """Processes output produced by DoFn method invocations.""" + + def __init__(self, window_fn, main_receivers, tagged_receivers): + """Initializes ``OutputProcessor``. + + Args: + window_fn: a windowing function (WindowFn). + main_receivers: a dict of tag name to Receiver objects. + tagged_receivers: main receiver object. + """ + self.window_fn = window_fn + self.main_receivers = main_receivers + self.tagged_receivers = tagged_receivers + def process_outputs(self, windowed_input_element, results): """Dispatch the result of computation to the appropriate receivers. From 976f847a42ca9a0b60de1bb083462b4cab2969d2 Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Tue, 25 Apr 2017 23:01:52 -0700 Subject: [PATCH 7/7] Addressing reviewer comments. --- sdks/python/apache_beam/runners/common.pxd | 6 ------ sdks/python/apache_beam/runners/common.py | 16 ++++------------ 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index b9e23d8cf10f..f3395c1ddf10 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -40,12 +40,6 @@ cdef class DoFnSignature(object): cdef public DoFnMethodWrapper finish_bundle_method cdef public object do_fn - cdef _validate(self) - cdef _validate_start_bundle(self) - cdef _validate_finish_bundle(self) - cdef _validate_process(self) - cdef _validate_bundle_method(self, DoFnMethodWrapper bundle_method) - cdef class DoFnInvoker(object): cdef public DoFnSignature signature diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 4f84871b5700..08071a62ea28 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -95,23 +95,15 @@ def __init__(self, do_fn): self._validate() def _validate(self): - # start_bundle and finish_bundle methods should only have ContextParam as a - # default argument. - self._validate_start_bundle() - self._validate_finish_bundle() - self._validate_process() - - def _validate_start_bundle(self): self._validate_bundle_method(self.start_bundle_method) - - def _validate_finish_bundle(self): self._validate_bundle_method(self.finish_bundle_method) - def _validate_process(self): - pass - def _validate_bundle_method(self, method_wrapper): # Bundle methods may only contain ContextParam. + + # Here we use the fact that every DoFn parameter defined in core.DoFn has + # the value that is the same as the name of the parameter and ends with + # string 'Param'. unsupported_dofn_params = [i for i in core.DoFn.__dict__ if ( i.endswith('Param') and i != 'ContextParam')]