Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1630] Adds ability to dynamically replace PTransforms during runtime. #3333

Conversation

chamikaramj
Copy link
Contributor

Adds two new interfaces, PTransformMatcher and PTransformOverride.

Currently only supports replacements where input and output types are an exact match (we have to address complexities due to type hints before supporting replacements with different types).

This can be used to dynamically update a populated pipeline at runtime. Each runner can configure it's own overrides.

This will be used by SplittableDoFn where matching ParDo transforms will be dynamically replaced by SplittableParDo.

@chamikaramj
Copy link
Contributor Author

R: @robertwb

@chamikaramj
Copy link
Contributor Author

cc: @jkff (since SDF related)

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.007%) to 70.575% when pulling 578ff83 on chamikaramj:sdf_direct_runner_ptransform_override into 911bfbd on apache:master.

@chamikaramj
Copy link
Contributor Author

Jenkins failure seems to be unrelated.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we're going to need this soon.


output_map = {}

class OutputVisitor(PipelineVisitor): # pylint: disable=used-before-assignment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called OutputVisitor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly updated. Now, first visitor (TransformUpdater) updates the transform while second visitor (InputOutputUpdater) determines transforms where inputs and outputs should be updated. After the two visits I update inputs and outputs. Trying to update inputs/outputs during visiting results in validation errors during the visit.


def __init__(self, pipeline, applied_labels):
self.pipeline = pipeline
self._applied_labels = applied_labels
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied_labels is unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we use it to perform transform validation at following location. Not updating this properly result in errors due to label conflicts.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L235

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that's done via self.pipeline._remove_labels_recursively, not via this attribute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

replacement_transform = override.get_replacement_transform(
transform_node.transform)
inputs = transform_node.inputs
# We only support replacing single-input PTransforms.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a TODO/JIRA for more general replacements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO .

transform_node.transform)
inputs = transform_node.inputs
# We only support replacing single-input PTransforms.
assert len(inputs) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise an informative NotImplementedError rather than an assert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


new_output = replacement_transform.expand(inputs[0])

# Recording updated outputs. This cannot be done in the same visor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visor -> visitor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def _check_replacement(self, override):
matcher = override.get_matcher()

class Visitor(PipelineVisitor):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More descriptive name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Currently this only works for replacements where input and output types
are exactly the same.
TODO: Update this to also work for transform overrides where input and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JIRA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._replace(override)

# Checking if the transforms have been successfully replaced.
for override in replacements:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note about when this could happen (e.g. the ordering of replacements is important, right?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -564,3 +685,43 @@ def from_runner_api(proto, context):
pc.tag = tag
result.update_input_refcounts()
return result


class PTransformMatcher(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth its own class? Could it just be a method on PTransformOverride?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to have a new class here so that we can reuse code and maintain a hierarchy of PTransformMatchers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, Pythonic way to do this would be to return a callable, not have a hierarchy of PTransformMatchers. (These callables could be put in a library for re-use if need be.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense. Removed the PTransformMatcher class.

@@ -59,6 +65,9 @@ def apply_CombinePerKey(self, transform, pcoll):
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""

# Performing configured PTransform overrides.
pipeline.replace_all(DirectRunner.PTRANSFORM_OVERRIDES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this'd be less error prone (especially as we develop out more complicated inputs/outputs) if this was functional (i.e. returned a new pipeline object rather than mutating the existing one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By cloning ? I think this is OK since this is not a user feature (we just have to get this right for various variations of PTransforms). Also, I think replacing is better since the idea is to update the already build pipeline before running it.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.


def __init__(self, pipeline, applied_labels):
self.pipeline = pipeline
self._applied_labels = applied_labels
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we use it to perform transform validation at following location. Not updating this properly result in errors due to label conflicts.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L235


output_map = {}

class OutputVisitor(PipelineVisitor): # pylint: disable=used-before-assignment
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly updated. Now, first visitor (TransformUpdater) updates the transform while second visitor (InputOutputUpdater) determines transforms where inputs and outputs should be updated. After the two visits I update inputs and outputs. Trying to update inputs/outputs during visiting results in validation errors during the visit.

replacement_transform = override.get_replacement_transform(
transform_node.transform)
inputs = transform_node.inputs
# We only support replacing single-input PTransforms.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO .

transform_node.transform)
inputs = transform_node.inputs
# We only support replacing single-input PTransforms.
assert len(inputs) == 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


new_output = replacement_transform.expand(inputs[0])

# Recording updated outputs. This cannot be done in the same visor
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def _check_replacement(self, override):
matcher = override.get_matcher()

class Visitor(PipelineVisitor):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -564,3 +685,43 @@ def from_runner_api(proto, context):
pc.tag = tag
result.update_input_refcounts()
return result


class PTransformMatcher(object):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to have a new class here so that we can reuse code and maintain a hierarchy of PTransformMatchers.

@@ -59,6 +65,9 @@ def apply_CombinePerKey(self, transform, pcoll):
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""

# Performing configured PTransform overrides.
pipeline.replace_all(DirectRunner.PTRANSFORM_OVERRIDES)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By cloning ? I think this is OK since this is not a user feature (we just have to get this right for various variations of PTransforms). Also, I think replacing is better since the idea is to update the already build pipeline before running it.

self._replace(override)

# Checking if the transforms have been successfully replaced.
for override in replacements:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Currently this only works for replacements where input and output types
are exactly the same.
TODO: Update this to also work for transform overrides where input and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 70.703% when pulling 13fd823 on chamikaramj:sdf_direct_runner_ptransform_override into 911bfbd on apache:master.

To this end, adds two interfaces, PTransformMatcher and PTransformOverride.

Currently only supports replacements where input and output types are an exact match (we have to address complexities due to type hints before supporting replacements with different types).

This will be used by SplittableDoFn where matching ParDo transforms will be dynamically replaced by SplittableParDo.

def __init__(self, pipeline, applied_labels):
self.pipeline = pipeline
self._applied_labels = applied_labels
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that's done via self.pipeline._remove_labels_recursively, not via this attribute.

'PTransform overriding is only supported for PTransforms that '
'have a single input. Tried to replace %r that has %d inputs',
transform_node, len(inputs))
assert len(inputs) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this assert, it's redundant with the if above (or could be, if it was equality rather than greater than).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# Recording updated outputs. This cannot be done in the same visor
# since if we dynamically update output type here, we'll run into
# errors when visiting child nodes.
output_map[transform_node.outputs[None]] = new_output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand is not limited to returning a single PCollection (for example, it could return a tuple of PCollections). It could also be a DoFn with multiple outputs, in which case only the main one has "label" None.

If we don't intend to handle the more general case yet, we should at least check.

@chamikaramj chamikaramj force-pushed the sdf_direct_runner_ptransform_override branch from 13fd823 to 644f642 Compare June 13, 2017 08:35
Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

'PTransform overriding is only supported for PTransforms that '
'have a single input. Tried to replace %r that has %d inputs',
transform_node, len(inputs))
assert len(inputs) == 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -564,3 +685,43 @@ def from_runner_api(proto, context):
pc.tag = tag
result.update_input_refcounts()
return result


class PTransformMatcher(object):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense. Removed the PTransformMatcher class.

# Recording updated outputs. This cannot be done in the same visor
# since if we dynamically update output type here, we'll run into
# errors when visiting child nodes.
output_map[transform_node.outputs[None]] = new_output
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def __init__(self, pipeline, applied_labels):
self.pipeline = pipeline
self._applied_labels = applied_labels
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.658% when pulling 644f642 on chamikaramj:sdf_direct_runner_ptransform_override into fe3d554 on apache:master.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

Couldn't comment direct, but regarding "By cloning ? I think this is OK since this is not a user feature (we just have to get this right for various variations of PTransforms). Also, I think replacing is better since the idea is to update the already build pipeline before running it." yes, I was thinking by cloning, so one doesn't accidentally care any state, etc. that belonged to the old structure (especially if things change in the future). The danger of "we just have to get this right" applies to ourselves as well as end users. This is just a general comment, no need to change it now if you're confident with what you have, but just wanted to point out it's a danger.

'has a single output. Tried to replace %r that has %d outputs.'
, transform_node, len(transform_node.outputs))

if type(new_output) is tuple:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not sufficient, lists or dicts (or other types) could be returned. Only allow a single PCollection (and likewise that the previous was None -> PCollection)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


class MyParDoMatcher(object):

def __call__(self, applied_ptransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative, you could define this as

def my_par_do_matcher(applied_ptransform):
  return isinstance(applied_ptransform.transform, DoubleParDo)

and then below you would have

return my_par_do_matcher

(The lack of first-class functions is what necessitates a class in Java.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments.

'has a single output. Tried to replace %r that has %d outputs.'
, transform_node, len(transform_node.outputs))

if type(new_output) is tuple:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


class MyParDoMatcher(object):

def __call__(self, applied_ptransform):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.658% when pulling 4c382de on chamikaramj:sdf_direct_runner_ptransform_override into fe3d554 on apache:master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants