[BEAM-1925] Updates DoFn invocation logic to be more extensible.#2519
[BEAM-1925] Updates DoFn invocation logic to be more extensible.#2519chamikaramj wants to merge 8 commits intoapache:masterfrom
Conversation
|
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 1.51 MB...] at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:336) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Archetype IT 'basic' failed: Execution failure: exit code = 1 at org.apache.maven.archetype.mojos.IntegrationTestMojo.execute(IntegrationTestMojo.java:269) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-04-13T00:46:12.122 [ERROR] 2017-04-13T00:46:12.122 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-04-13T00:46:12.122 [ERROR] 2017-04-13T00:46:12.122 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-04-13T00:46:12.122 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-04-13T00:46:12.122 [ERROR] 2017-04-13T00:46:12.122 [ERROR] After correcting the problems, you can resume the build with the command2017-04-13T00:46:12.122 [ERROR] mvn -rf :beam-sdks-java-maven-archetypes-examples-java8channel stoppedSetting status of ea54211 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/9482/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
|
Retest this please |
|
Refer to this link for build results (access rights to CI server needed): |
| self.process(windowed_value) | ||
|
|
||
| def process(self, windowed_value): | ||
| self._invoke_process_method(windowed_value) |
There was a problem hiding this comment.
Why do we need this extra invoke function? Can we move the contents of _invoke_process_method here?
There was a problem hiding this comment.
Removed this function.
|
|
||
| arguments, _, _, defaults = self.dofn.get_function_arguments('process') | ||
| def invoke_finish_bundle(self, process_output_fn): | ||
| defaults = self.signature.start_bundle_method.defaults |
| self.finish_bundle_method = None | ||
| self.do_fn = None | ||
|
|
||
| @staticmethod |
There was a problem hiding this comment.
can we just do all this in the __init__ instead of having a static method ?
| self.dofn_process = fn.process | ||
| def invoke_start_bundle(self, process_output_fn): | ||
| defaults = self.signature.start_bundle_method.defaults | ||
| defaults = defaults if defaults else [] |
There was a problem hiding this comment.
don't need this as signature takes care of this.
| def _invoke_bundle_method(self, method): | ||
| self.window_fn = windowing.windowfn | ||
|
|
||
| self.do_fn_invoker = DoFnInvoker.create_invoker( |
There was a problem hiding this comment.
we should move the use_simple_invoker boolean inside this as that makes it a pure factory .. the runner shouldn't decide what invoker is returned.
|
|
||
|
|
||
| cdef class DoFnInvoker(object): | ||
| cpdef invoke_process(self, WindowedValue element, process_output_fn) |
There was a problem hiding this comment.
do static methods need to be specified here?
There was a problem hiding this comment.
I tried by keep getting errors. Left a TODO for now. I don't think this will have an significant impact on performance anyways.
|
|
||
| class DoFnRunner(Receiver): | ||
| """A helper class for executing ParDo operations. | ||
| class Method(object): |
There was a problem hiding this comment.
This name is rather generic, perhaps DoFnMethodWrapper or similar?
There was a problem hiding this comment.
It's also unclear here what args and defaults are supposed to be.
| self.do_fn = None | ||
|
|
||
| @staticmethod | ||
| def create_signature(do_fn): |
There was a problem hiding this comment.
It feels like all of this would be better put in DoFnSignature's constructor.
| def create_invoker( | ||
| signature, use_simple_invoker, context, side_inputs, input_args, | ||
| input_kwargs): | ||
| if use_simple_invoker: |
There was a problem hiding this comment.
Docstring?
Also, it feels like "use_simple_invoker" (and perhaps other arguments) should be deduced here, not passed in.
There was a problem hiding this comment.
Done.
I removed use_simple_invoker. Other arguments have to be passed in here or during process call. Seems like passing here is the better option.
| self.dofn_process = fn.process | ||
| def invoke_start_bundle(self, process_output_fn): | ||
| defaults = self.signature.start_bundle_method.defaults | ||
| defaults = defaults if defaults else [] |
There was a problem hiding this comment.
Normalize to the empty list (or better, empty tuple) earlier.
| def invoke_start_bundle(self, process_output_fn): | ||
| defaults = self.signature.start_bundle_method.defaults | ||
| defaults = defaults if defaults else [] | ||
| args = [self.context if d == core.DoFn.ContextParam else d |
There was a problem hiding this comment.
Where is the validation that ContextParam is the only valid pluggable value here?
There was a problem hiding this comment.
We don't have this validation currently, right ? I added this to DoFnSignature class.
| default_arg_values = signature.process_method.defaults | ||
| self.has_windowed_inputs = (self.has_windowed_inputs or | ||
| core.DoFn.WindowParam in defaults) | ||
| core.DoFn.WindowParam in default_arg_values) |
There was a problem hiding this comment.
Avoid re-assigning values (here and elsewhere).
| super(PerWindowInvoker, self).__init__(signature) | ||
| self.side_inputs = side_inputs | ||
| self.context = context | ||
| self.has_windowed_inputs = not all( |
There was a problem hiding this comment.
I wonder if the has_windowed_inputs bit should instead be a third class, not a bit in the second Invoker class.
There was a problem hiding this comment.
Could you explain the rational for a third class ? I don't see a justification for another class here. I might be missing something.
There was a problem hiding this comment.
We're branching on this in a couple of places. However, looking at again, I think that's OK (and the primary branch depends on what's known only at runtime, i.e. the number of actual windows for this element).
| self._dofn_per_window_invoker(element) | ||
| self.logging_context = get_logging_context(logger, step_name=step_name) | ||
|
|
||
| # TODO(sourabh): Deprecate the use of context |
There was a problem hiding this comment.
Is this happening by first stable release?
There was a problem hiding this comment.
This is already done. I can clean it up after the PR is merged
| self.defaults = defaults | ||
| self._method_value = method_value | ||
|
|
||
| def __call__(self, *args, **kwargs): |
There was a problem hiding this comment.
This worries me from a performance standpoint. One of the key reasons to have different invokers (methods) was to avoid the generic overhead for the simple case(s).
There was a problem hiding this comment.
I assume you meant performance could be impacted due to using a callable here ?
I removed call and replaced it with a call() method which is also more readable.
|
cc: @jkff |
0128f6b to
9c49ed4
Compare
| self.finish_bundle_method = None | ||
| self.do_fn = None | ||
|
|
||
| @staticmethod |
|
|
||
| class DoFnRunner(Receiver): | ||
| """A helper class for executing ParDo operations. | ||
| class Method(object): |
| self.do_fn = None | ||
|
|
||
| @staticmethod | ||
| def create_signature(do_fn): |
| self.dofn_process = fn.process | ||
| def invoke_start_bundle(self, process_output_fn): | ||
| defaults = self.signature.start_bundle_method.defaults | ||
| defaults = defaults if defaults else [] |
| self.dofn_process = fn.process | ||
| def invoke_start_bundle(self, process_output_fn): | ||
| defaults = self.signature.start_bundle_method.defaults | ||
| defaults = defaults if defaults else [] |
| self.process(windowed_value) | ||
|
|
||
| def process(self, windowed_value): | ||
| self._invoke_process_method(windowed_value) |
There was a problem hiding this comment.
Removed this function.
| self.defaults = defaults | ||
| self._method_value = method_value | ||
|
|
||
| def __call__(self, *args, **kwargs): |
There was a problem hiding this comment.
I assume you meant performance could be impacted due to using a callable here ?
I removed call and replaced it with a call() method which is also more readable.
| default_arg_values = signature.process_method.defaults | ||
| self.has_windowed_inputs = (self.has_windowed_inputs or | ||
| core.DoFn.WindowParam in defaults) | ||
| core.DoFn.WindowParam in default_arg_values) |
|
|
||
|
|
||
| cdef class DoFnInvoker(object): | ||
| cpdef invoke_process(self, WindowedValue element, process_output_fn) |
There was a problem hiding this comment.
I tried by keep getting errors. Left a TODO for now. I don't think this will have an significant impact on performance anyways.
| super(PerWindowInvoker, self).__init__(signature) | ||
| self.side_inputs = side_inputs | ||
| self.context = context | ||
| self.has_windowed_inputs = not all( |
There was a problem hiding this comment.
Could you explain the rational for a third class ? I don't see a justification for another class here. I might be missing something.
sb2nov
left a comment
There was a problem hiding this comment.
Just some small comments now. This makes the code really readable.
| cdef Receiver main_receivers | ||
| cdef DoFnInvoker do_fn_invoker | ||
|
|
||
| cpdef process(self, WindowedValue element) |
There was a problem hiding this comment.
change the variable name here to reflect the python file change
|
|
||
| def _dofn_simple_invoker(self, element): | ||
| self._process_outputs(element, self.dofn_process(element.value)) | ||
| def invoke_process(self, element, process_output_fn): |
There was a problem hiding this comment.
element -> windowed_value ??
| class SimpleInvoker(DoFnInvoker): | ||
| """An invoker that processes elements ignoring windowing information.""" | ||
|
|
||
| def invoke_process(self, element, process_output_fn): |
There was a problem hiding this comment.
element -> windowed_value ??
|
|
||
| cdef DoFnSignature signature | ||
|
|
||
| cpdef invoke_process(self, WindowedValue element, process_output_fn) |
There was a problem hiding this comment.
element -> windowed_value ??
| A DoFnInvoker describes a particular way for invoking methods of a DoFn | ||
| represented by a given DoFnSignature.""" | ||
|
|
||
| def __init__(self, signature): |
There was a problem hiding this comment.
Should keep the constructor across all invoker implementations and then each subclass can choose to use it or not ??
There was a problem hiding this comment.
Not sure if I understood this comment. This constructor is already used by sub-classes.
| # Also cache all the placeholders needed in the process function. | ||
|
|
||
| # Fill in sideInputs if they are globally windowed | ||
|
|
There was a problem hiding this comment.
remove this empty line as the comment is related
|
|
||
| global_window = GlobalWindow() | ||
|
|
||
| args = input_args if input_args else [] |
There was a problem hiding this comment.
Thanks for changing to input_args. I think we can improve it a bit more as it is still confusing to understand what is input_args/args/final_args ??
|
|
||
| if not kwargs: | ||
| self._process_outputs(element, self.dofn_process(*args)) | ||
| process_output_fn(element, self.signature.process_method.call(args, {})) |
There was a problem hiding this comment.
I don't think this case is possible so we should just pass kwargs directly
|
|
||
|
|
||
| cdef class SimpleInvoker(DoFnInvoker): | ||
| pass |
There was a problem hiding this comment.
invoke_process method here ??
|
|
||
|
|
||
| cdef class PerWindowInvoker(DoFnInvoker): | ||
|
|
There was a problem hiding this comment.
invoke_process method here ??
robertwb
left a comment
There was a problem hiding this comment.
Please add a microbenchmark that instantiates a DoFnRunner and calls it with trivial DoFns of various signatures and compares timings before and after.
| """An invoker that processes elements ignoring windowing information.""" | ||
|
|
||
| def invoke_process(self, element, process_output_fn): | ||
| process_output_fn(element, self.signature.process_method.call( |
There was a problem hiding this comment.
As mentioned, this is performance critical code. "def call" vs call doesn't make much of a difference here, but what does matter is that we're creating a new list containing the single element.value, creating a new empty dictionary, then invoking this via _method_value(*input_args, **input_kwargs). In other words, passing through DoFnMethodWrapper is negating much if not all of the benefits of having a special SimpleInvoker. (We could probably simply eliminate/inline this class altogether. On that note I think it'd be fine for now if start/finish always ran the generic code as long as process was as fast as possible if that kept the code simpler.)
There was a problem hiding this comment.
I could get rid of the extra per-element overhead you mentioned above by caching the process method within invokers. There is still an extra method call compared to the current implementation (calling a method within DoFnInvoker instead of a method within DoFnRunner). But this does not seem to be significant based on results of some benchmarks I ran. I'll post a comment with benchmark results.
Adds following abstractions. DoFnSignature: describes the signature of a given DoFn object. DoFnInvoker: defines a particular way for invoking DoFn methods.
9c49ed4 to
49cd48a
Compare
| # start_bundle and finish_bundle methods should only have ContextParam as a | ||
| # default argument. | ||
| self._validate_start_bundle() | ||
| self._validate_finish_bundle() |
There was a problem hiding this comment.
Done.
We can validate at construction time by building a signature before job submission.
| A DoFnInvoker describes a particular way for invoking methods of a DoFn | ||
| represented by a given DoFnSignature.""" | ||
|
|
||
| def __init__(self, signature): |
There was a problem hiding this comment.
Not sure if I understood this comment. This constructor is already used by sub-classes.
|
|
||
| cdef DoFnSignature signature | ||
|
|
||
| cpdef invoke_process(self, WindowedValue element, process_output_fn) |
|
|
||
|
|
||
| cdef class SimpleInvoker(DoFnInvoker): | ||
| pass |
|
|
||
|
|
||
| cdef class PerWindowInvoker(DoFnInvoker): | ||
|
|
|
|
||
| def _dofn_simple_invoker(self, element): | ||
| self._process_outputs(element, self.dofn_process(element.value)) | ||
| def invoke_process(self, element, process_output_fn): |
|
|
||
| if not kwargs: | ||
| self._process_outputs(element, self.dofn_process(*args)) | ||
| process_output_fn(element, self.signature.process_method.call(args, {})) |
| # Also cache all the placeholders needed in the process function. | ||
|
|
||
| # Fill in sideInputs if they are globally windowed | ||
|
|
|
|
||
| global_window = GlobalWindow() | ||
|
|
||
| args = input_args if input_args else [] |
| """An invoker that processes elements ignoring windowing information.""" | ||
|
|
||
| def invoke_process(self, element, process_output_fn): | ||
| process_output_fn(element, self.signature.process_method.call( |
There was a problem hiding this comment.
I could get rid of the extra per-element overhead you mentioned above by caching the process method within invokers. There is still an extra method call compared to the current implementation (calling a method within DoFnInvoker instead of a method within DoFnRunner). But this does not seem to be significant based on results of some benchmarks I ran. I'll post a comment with benchmark results.
|
I ran some benchmarks to compare results with and without this PR. See following for results. https://docs.google.com/document/d/1SZa6C3a7EHy9-qE_9QTBRxyN3DR9yenLKLjK2wXNkhw/edit?usp=sharing Based on these results, this PR does not seem to be adding extra overhead. Please let me know if you want me to run additional experiments. |
|
PTAL. |
|
Looks good to me |
|
The benchmarks are probably running uncompiled, and the direct runner does not do fusion (as well as having a lot more overhead than our worker code). https://gist.github.com/robertwb/5b64829fa91d9a61f55886e5ff6b1f8c shows on average a 30% overhead with this change. Once we have an external worker runner, this'll be easier to test. |
|
Thanks. I'll try to determine the reason for the performance difference here and get back to you. |
|
Thanks Robert for creating the benchmark. So the values I get when running your benchmark are following. Around 0.2 sec when running without this PR. So, to clarify, the overhead seems to be 0.1 sec per 10k elements per DoFn which is within the variation of my experiment. (and was not significant when running jobs such as WordCount and BigShuffle) using Dataflow runner (both have several ParDo/Map steps).
|
|
The extra overhead mentioned above for |
This avoid the overhead of passing around (and calliing) a Python callable, completely resolving the performance regression.
| """ | ||
| self.args = args | ||
| self.defaults = defaults | ||
| self.method_value = method_value |
There was a problem hiding this comment.
Nit: order the same as the argument order.
| """ | ||
|
|
||
| def __init__(self, do_fn): | ||
| # We add a property here for all methods defined by Beam DoFn features. |
There was a problem hiding this comment.
Omit these unused assignments, they are written to their final values below.
| cdef public object defaults | ||
| cdef public object method_value | ||
|
|
||
| cpdef call(self, list args, dict kwargs) |
There was a problem hiding this comment.
I'd just remove this method and inline its two uses.
| pass | ||
|
|
||
| def _validate_bundle_method(self): | ||
| assert core.DoFn.ElementParam not in self.start_bundle_method.defaults |
There was a problem hiding this comment.
I wonder if this could be a positive assertion instead, so any *ParamType that's added wouldn't need to get listed here.
There was a problem hiding this comment.
Done.
Updated this to support new params as well.
| assert isinstance(do_fn, core.DoFn) | ||
| self.do_fn = do_fn | ||
|
|
||
| def _create_do_fn_method(do_fn, method_name): |
There was a problem hiding this comment.
Perhaps this should just be the DoFnMethodWrapper constructor?
| input_args: arguments to be used when invoking the process method | ||
| input_kwargs: kwargs to be used when invoking the process method. | ||
| """ | ||
| default_arg_values = signature.process_method.defaults |
There was a problem hiding this comment.
Should this be a method on signature rather than reaching into its innards?
There was a problem hiding this comment.
I think this is fine since DoFnMethodWrapper is a known interface.
Statically bind output processor.
|
Just a reminder, please squash before merging to make it easier to rollback if need be. |
| def invoke_start_bundle(self): | ||
| """Invokes the DoFn.start_bundle() method. | ||
|
|
||
| Args: |
There was a problem hiding this comment.
Do not remove the Args string
There was a problem hiding this comment.
Removed since it's empty.
| def invoke_finish_bundle(self): | ||
| """Invokes the DoFn.finish_bundle() method. | ||
|
|
||
| Args: |
There was a problem hiding this comment.
Removed since it's empty.
|
|
||
| self.do_fn_invoker = DoFnInvoker.create_invoker( | ||
| do_fn_signature, context, side_inputs, args, kwargs) | ||
| self, do_fn_signature, context, side_inputs, args, kwargs) |
There was a problem hiding this comment.
can we pass just the _process_output function instead of self so that dependent class doesn't need to know about _process_output function
There was a problem hiding this comment.
Not unless we want to declare it as a C function pointer (or pay the overhead of calling it as a Python function, which was the improvement here). One could create another object to just hold this method, but that'd probably be overkill. Renaming _process_output now that it's not private could make sense as well.
There was a problem hiding this comment.
Got it. I think it'll be good to rename it to process_output if we don't want to go down the function pointer router.
There was a problem hiding this comment.
Turns out, memory usage significantly increases when we pass DoFnRunner to DoFnInvoker (as the output_processor) and maintain a reference to it. I suspect this is due to recursive referencing between DoFnInvoker and DoFnRunner. I could fix this by creating a new OutputProcessor class that contains the process_output method. Processing time improvement introduced by Robert's update stays the same.
chamikaramj
left a comment
There was a problem hiding this comment.
Thanks. PTAL.
(I'll squash commits before merging. Leaving as separate commits for now for ease of reviewing).
| cdef public object defaults | ||
| cdef public object method_value | ||
|
|
||
| cpdef call(self, list args, dict kwargs) |
| """ | ||
| self.args = args | ||
| self.defaults = defaults | ||
| self.method_value = method_value |
| """ | ||
|
|
||
| def __init__(self, do_fn): | ||
| # We add a property here for all methods defined by Beam DoFn features. |
| assert isinstance(do_fn, core.DoFn) | ||
| self.do_fn = do_fn | ||
|
|
||
| def _create_do_fn_method(do_fn, method_name): |
| pass | ||
|
|
||
| def _validate_bundle_method(self): | ||
| assert core.DoFn.ElementParam not in self.start_bundle_method.defaults |
There was a problem hiding this comment.
Done.
Updated this to support new params as well.
| input_args: arguments to be used when invoking the process method | ||
| input_kwargs: kwargs to be used when invoking the process method. | ||
| """ | ||
| default_arg_values = signature.process_method.defaults |
There was a problem hiding this comment.
I think this is fine since DoFnMethodWrapper is a known interface.
| def invoke_finish_bundle(self): | ||
| """Invokes the DoFn.finish_bundle() method. | ||
|
|
||
| Args: |
There was a problem hiding this comment.
Removed since it's empty.
| def invoke_start_bundle(self): | ||
| """Invokes the DoFn.start_bundle() method. | ||
|
|
||
| Args: |
There was a problem hiding this comment.
Removed since it's empty.
sb2nov
left a comment
There was a problem hiding this comment.
LGTM apart from two minor comments.
| cdef public DoFnMethodWrapper start_bundle_method | ||
| cdef public DoFnMethodWrapper finish_bundle_method | ||
| cdef public object do_fn | ||
|
|
There was a problem hiding this comment.
Do validate functions need to be mentioned here
There was a problem hiding this comment.
There's no need to make these cdef methods.
There was a problem hiding this comment.
Done. So, private methods should not be be defined in pxd files ?
| i.endswith('Param') and i != 'ContextParam')] | ||
|
|
||
| for param in unsupported_dofn_params: | ||
| assert param not in self.start_bundle_method.defaults |
There was a problem hiding this comment.
this can be both start/finish
| cdef public DoFnMethodWrapper start_bundle_method | ||
| cdef public DoFnMethodWrapper finish_bundle_method | ||
| cdef public object do_fn | ||
|
|
| i.endswith('Param') and i != 'ContextParam')] | ||
|
|
||
| for param in unsupported_dofn_params: | ||
| assert param not in self.start_bundle_method.defaults |
|
Robert, PTAL. |
robertwb
left a comment
There was a problem hiding this comment.
Just some minor comments, but LGTM. Thanks!
| cdef public DoFnMethodWrapper start_bundle_method | ||
| cdef public DoFnMethodWrapper finish_bundle_method | ||
| cdef public object do_fn | ||
|
|
There was a problem hiding this comment.
There's no need to make these cdef methods.
| self._validate_finish_bundle() | ||
| self._validate_process() | ||
|
|
||
| def _validate_start_bundle(self): |
There was a problem hiding this comment.
I would just inline these three private methods.
| i.endswith('Param') and i != 'ContextParam')] | ||
|
|
||
| for param in unsupported_dofn_params: | ||
| assert param not in method_wrapper.defaults |
There was a problem hiding this comment.
Here you're implicitly using the fact that XxxParam is both named XxxParam and has value "XxxParam." At least call this out.
| cdef public DoFnMethodWrapper start_bundle_method | ||
| cdef public DoFnMethodWrapper finish_bundle_method | ||
| cdef public object do_fn | ||
|
|
There was a problem hiding this comment.
Done. So, private methods should not be be defined in pxd files ?
| self._validate_finish_bundle() | ||
| self._validate_process() | ||
|
|
||
| def _validate_start_bundle(self): |
| i.endswith('Param') and i != 'ContextParam')] | ||
|
|
||
| for param in unsupported_dofn_params: | ||
| assert param not in method_wrapper.defaults |
The distinction is that only modified methods (or classes) need to be defined in pxd files (e.g. designating that this is a cdef, rather than def, method, and typing its parameters). These validate methods are not performance critical, so leaving them as "ordinary" methods is fine (and less verbose). |
|
I see. Thanks for the explanation. |
Adds following abstractions.
DoFnSignature: describes the signature of a given DoFn object.
DoFnInvoker: defines a particular way for invoking DoFn methods.
I believe existing tests cover the updated code paths.