Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1630] Adds support for processing Splittable DoFns using DirectRunner. #4064

Merged
merged 1 commit into from Dec 19, 2017

Conversation

chamikaramj
Copy link
Contributor

Updates DoFn invocation logic to allow invoking SDF methods.
Adds SDF machinery that will be common to DirectRunner and other runners.
Adds DirectRunner specific transform overrides, evaluators, and other logic for processing Splittable DoFns.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@chamikaramj
Copy link
Contributor Author

R: @charlesccychen (for an overall review including DirectRunner specific logic)
R: @jkff (for SDF machinery)

cc: @robertwb

@chamikaramj
Copy link
Contributor Author

Run Python PreCommit

@chamikaramj
Copy link
Contributor Author

Fixed cython errors. All tests should pass now. PTAL.

@chamikaramj
Copy link
Contributor Author

Friendly ping :)

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Please see my comments. It would also be helpful if @jkff can further review the SDF-specific logic in this change.

@@ -191,8 +194,6 @@ def __init__(self, evaluation_context, applied_ptransform,
self._execution_context = evaluation_context.get_execution_context(
applied_ptransform)
self.scoped_metrics_container = scoped_metrics_container
with scoped_metrics_container:
self.start_bundle()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

class OutputProcessor(object):

def process_outputs(self, windowed_input_element, results):
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass in a customer OutputProcessor when invoking SDF.process() instead of using the default output processor since output has to be handled at ProcessFn.


Args:
do_fn: A DoFn object that contains the method.
obj: the object that contains the method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this and document what type of object this should be, and also validate this in the constructor if possible? "obj" seems too generic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""A transform that assigns a unique key to each element."""

def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
yield (uuid.uuid4().bytes, element)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in other places, we seem to rely on uuid.uuid4() to give unique values for correctness (for example, when writing shards for file I/O). Can you add comments detailing this assumption here and elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODOs to here and iobase._WriteBundleDoFn. I think collisions are extremely rare for uuid.uuid4() though. Also, added an assertion to force a failure here if a collision is detected.

"""An evaluator for sdf_direct_runner.ProcessElements transform."""

DEFAULT_MAX_NUM_OUTPUTS = 100
DEFAULT_MAX_DURATION = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document these flags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -529,16 +530,19 @@ def start_bundle(self):
self._counter_factory = counters.CounterFactory()

# TODO(aaltay): Consider storing the serialized form as an optimization.
dofn = pickler.loads(pickler.dumps(transform.dofn))
dofn = transform.dofn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this line here to ensure that the DoFn was serializable, so that a user would not hit any issues when running later on remote runners. Can you describe the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into issues since we are piggybacking StepContext and SDF DoFnInvoker objects in the DoFn. I added an option to disable this pickling only for the SDF case.

@@ -826,3 +830,74 @@ def finish_bundle(self):
None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)

return TransformResult(self, [], [], None, {None: hold})


class _ProcessElemenetsEvaluator(_TransformEvaluator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Elements"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



class SDFProcessElementInvoker(object):
"""A utility that requsts checkpoints.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"requests"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

residual_range = (
(self._range.start, self._range.stop)
if self._current_position is None
else (self._current_position + 1, self._range.stop))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please refactor this into the more readable:

if self._current_position is None:
  residual_range = (self._range.start, self._range.stop)
else:
  residual_range = (self._current_position + 1, self._range.stop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return [MyParDoOverride()]

from apache_beam.runners.direct import direct_runner
direct_runner._get_transform_overrides = get_overrides
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of monkey-patching may not be safe--subsequent tests in the same process will get your modified version of _get_transform_overrides. Consider using a @mock.patch('apache_beam.runners.direct.direct_runner._get_transform_overrides') annotation on this method (that machinery will restore the original value after this test case finishes).

Actually, alternatively, you can create the DirectRunner object and directly patch runner._ptransform_overrides instead, which might be cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

OffsetRange(10, 100)

with self.assertRaises(ValueError):
OffsetRange(10, 9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also test OffsetRange(10, 10) above to make sure we get the corner case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (note that this is not a failure case).

if self._current_position is None
else (self._current_position + 1, self._range.stop))
# If self._current_position is 'None' no records have been claimed so
# residual should start from self._range.start.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this comment to before the if above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# residual should start from self._range.start.
end_position = (
self._range.start if self._current_position is None
else self._current_position + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Overall looks very good.

@@ -423,8 +423,8 @@ def __init__(self, file_to_write):

def start_bundle(self):
assert self.file_to_write
self.file_to_write += str(uuid.uuid4())
self.file_obj = open(self.file_to_write, 'w')
# Appending a UUID to create a unique file object per invocation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the current PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A version of a test that wrote a large number of files failed due to not having this fix.


return self.start == other.start and self.stop == other.stop

def __ne__(self, other):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? Doesn't the default implementation of "ne" just negate "eq"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.



class OffsetRestrictionTracker(RestrictionTracker):
"""An `iobase.RestrictionTracker` implementations for byte offsets."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be bytes? It can be any kind of integer indices, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return False

def checkpoint(self):
with self._lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other methods should also take the lock, e.g. current_restriction, start/stop_position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def try_claim(self, position):
with self._lock:
self._last_claim_attempt = position
if position >= self._range.start and position < self._range.stop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"position < self._range.start" should be an error rather than just a failed claim.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

from apache_beam.transforms.window import TimestampedValue


class ReadFilesProvider(RestrictionProvider):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why not use the simpler example of generating a range of numbers? (this one's fine too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to have slightly complex one till we have actual SDFs as an illustration.

with TestPipeline() as p:
pc1 = (p
| 'Create1' >> beam.Create(file_names)
| 'SDF' >> beam.ParDo(ReadFiles(resume_count)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to verify that we really resume the requested number of times? In Java I did that by emitting an element to a side output once per ProcessElement call; not sure if your implementation supports side outputs yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently transform overriding only supports transforms with a single output so this cannot be done. Added a TODO for this verification (I manually verified this BTW).

if isinstance(element.value, KeyedWorkItem):
encoded_k = element.value.encoded_key
else:
assert isinstance(element.value, tuple)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document what kind of tuple we expect it to be and why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# limitations under the License.
#

"""This module contains Splittable DoFn logic that's common to all runners."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python I think I'd recommend to start with only direct runner, because I'm not sure the contents of this file will be reusable for fn api implementation, and there won't be other runners implementing SDF directly, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simply contains the core transforms (PairWithRestrictionFn, SplitRestrictionFn, ProcessElements()). This flow will be common for Fn API based implementation as well, no ?

All the implementation details other than that are in sdf_direct_runner.py.

keyed_elements = (pcoll
| 'pair' >> ParDo(PairWithRestrictionFn(sdf))
| 'split' >> ParDo(SplitRestrictionFn(sdf))
| 'explode' >> ParDo(ExplodeWindowsFn())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are exploding windows! I think somewhere above there was code that handled multiple windows...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get this. Are you saying that I won't be getting a 'WindowedValue' on ProcessFn since I explode windows here ?

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

@@ -529,16 +530,19 @@ def start_bundle(self):
self._counter_factory = counters.CounterFactory()

# TODO(aaltay): Consider storing the serialized form as an optimization.
dofn = pickler.loads(pickler.dumps(transform.dofn))
dofn = transform.dofn
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into issues since we are piggybacking StepContext and SDF DoFnInvoker objects in the DoFn. I added an option to disable this pickling only for the SDF case.


self._par_do_evaluator = _ParDoEvaluator(
evaluation_context, applied_ptransform, input_committed_bundle,
side_inputs, scoped_metrics_container)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we only have one evaluator here which is _ProcessElementsEvaluator. _ParDoEvaluator is used as a library. We are simply using _ParDoEvaluator to evaluate a ParDo where DoFn object is the ProcessFn.

If we decide to duplicate that code this'll involve a significant amount of code copying (ParDoEvaluator's, start_bundle, process(), finish_bundle()) which I prefer avoiding.

Also, note that we use a similar implementation for Java SDK: https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L112

WDYT ?

@@ -423,8 +423,8 @@ def __init__(self, file_to_write):

def start_bundle(self):
assert self.file_to_write
self.file_to_write += str(uuid.uuid4())
self.file_obj = open(self.file_to_write, 'w')
# Appending a UUID to create a unique file object per invocation.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A version of a test that wrote a large number of files failed due to not having this fix.


return self.start == other.start and self.stop == other.stop

def __ne__(self, other):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.



class OffsetRestrictionTracker(RestrictionTracker):
"""An `iobase.RestrictionTracker` implementations for byte offsets."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if self._max_num_outputs and output_count >= self._max_num_outputs:
initiate_checkpoint()

tracker.check_done()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is similar to the Java implementation.

"""An evaluator for sdf_direct_runner.ProcessElements transform."""

DEFAULT_MAX_NUM_OUTPUTS = 100
DEFAULT_MAX_DURATION = 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if isinstance(element.value, KeyedWorkItem):
encoded_k = element.value.encoded_key
else:
assert isinstance(element.value, tuple)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

encoded_k = element.value.encoded_key
else:
assert isinstance(element.value, tuple)
encoded_k = element.value[0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to 'key'.

"""A transform that assigns a unique key to each element."""

def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
yield (uuid.uuid4().bytes, element)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODOs to here and iobase._WriteBundleDoFn. I think collisions are extremely rare for uuid.uuid4() though. Also, added an assertion to force a failure here if a collision is detected.

@chamikaramj
Copy link
Contributor Author

Run Python PreCommit

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks pretty good to me SDF-wise!

@@ -962,6 +962,7 @@ def display_data(self):

def process(self, element, init_result):
if self.writer is None:
# TODO: handle uid collisions here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem necessary, uuid collisions are improbably rare

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to a comment.

"""A `DoFn` that executes machineary for invoking a Splittable `DoFn`.

Input to the `ParDo` step that includes a `ProcessFn` will be a `PCollection`
of `ElementAndRestriction` objects.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it that, or KeyedWorkItem's / WindowedValue's?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SDK automatically converts WindowedValues to values during iteration so I think it's fine to call it a PCollection of ElementAndRestrictions here. KeyedWorkItem is a special case where we receive that instead of the original iterable of values when a timer is fired (i.e. this will be true for any DoFn) so not worth mentioning here.

value = values[0]
if len(values) != 1:
raise ValueError('')
assert isinstance(value, (WindowedValue, ElementAndRestriction))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this checking for being a subclass of both at the same time? Is that how windowed values work in Python - multiple inheritance (rather than wrapping like in Java)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks if instance is of one of the types mentioned. Actually I don't think we'll be getting WindowedValues here after iteration. So updated.

windowed_element = WindowedValue(element, timestamp, [window])
else:
element_and_restriction = (
value.value if isinstance(value, WindowedValue) else value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm seems concerning, maybe @charlesccychen can comment what's going on here?

@@ -162,6 +188,8 @@ def process(self, element, timestamp=beam.DoFn.TimestampParam,
break
yield output

assert sdf_result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# Setting a timer to be reinvoked to continue processing the element.
# Currently Python SDK only supports setting timers based on watermark. So
# forcing a reinvocation by setting a timer for watermark negative
# infinity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have any practical consequences, except that the processing-time delay is not respected? E.g. does this unnecessarily hold the watermark more than requested, or something? Can the timer end up being dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works for now. Timer doesn't get dropped and watermark get resetted properly. Maria is working on adding support for proper processing time based timers, will update after we have that.

for output in output_processor.output_iter:
# A ProcessContinuation, if returned, should be the last element.
assert not process_continuation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it assigned to a non-none value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was not being set. Updated.


# Continuing here instead of breaking to enforce that this is the last
# element.
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we enforce that a ProcessContinuation was eventually returned at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't enforce that. User may or may not return a ProcessContinuation object. If returned has to be the last element of the values iterator.

@@ -157,6 +152,9 @@ def run_sdf_read_pipeline(

assert_that(pc1, equal_to(expected_data))

# TODO(chamikara: verify the number of times process method was invoked
# using a side output once SDFs supports producing side outputs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw what will currently happen if an SDF attempts to produce side outputs? I realized that Python doesn't have a ProcessContext like Java does, so there's no explicit code that fails if it tries to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

@@ -962,6 +962,7 @@ def display_data(self):

def process(self, element, init_result):
if self.writer is None:
# TODO: handle uid collisions here.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to a comment.

"""A `DoFn` that executes machineary for invoking a Splittable `DoFn`.

Input to the `ParDo` step that includes a `ProcessFn` will be a `PCollection`
of `ElementAndRestriction` objects.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SDK automatically converts WindowedValues to values during iteration so I think it's fine to call it a PCollection of ElementAndRestrictions here. KeyedWorkItem is a special case where we receive that instead of the original iterable of values when a timer is fired (i.e. this will be true for any DoFn) so not worth mentioning here.

value = values[0]
if len(values) != 1:
raise ValueError('')
assert isinstance(value, (WindowedValue, ElementAndRestriction))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks if instance is of one of the types mentioned. Actually I don't think we'll be getting WindowedValues here after iteration. So updated.

@@ -162,6 +188,8 @@ def process(self, element, timestamp=beam.DoFn.TimestampParam,
break
yield output

assert sdf_result
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# Setting a timer to be reinvoked to continue processing the element.
# Currently Python SDK only supports setting timers based on watermark. So
# forcing a reinvocation by setting a timer for watermark negative
# infinity.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works for now. Timer doesn't get dropped and watermark get resetted properly. Maria is working on adding support for proper processing time based timers, will update after we have that.

windowed_element = WindowedValue(element, timestamp, [window])
else:
element_and_restriction = (
value.value if isinstance(value, WindowedValue) else value)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My previous statement was incorrect. process() method does get an iterator (_UnwindowedValues) of WindowedValue objects. But after iterator is expanded, we always get ElementAndRestriction objects here [1]. Updated.

[1]

unwindowed_value = wv.value

for output in output_processor.output_iter:
# A ProcessContinuation, if returned, should be the last element.
assert not process_continuation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was not being set. Updated.


# Continuing here instead of breaking to enforce that this is the last
# element.
continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't enforce that. User may or may not return a ProcessContinuation object. If returned has to be the last element of the values iterator.

@@ -157,6 +152,9 @@ def run_sdf_read_pipeline(

assert_that(pc1, equal_to(expected_data))

# TODO(chamikara: verify the number of times process method was invoked
# using a side output once SDFs supports producing side outputs.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkff
Copy link
Contributor

jkff commented Dec 12, 2017

Thanks! LGTM as far as SDF goes; up to Charles to review the rest.

@chamikaramj
Copy link
Contributor Author

Thanks Eugene.

Charles, PTAL.

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Raises ValueError: if there is still any unclaimed work remaining in the
restriction invoking this method. Exception raised must have an
informative error message.
This method must raise an error if there is still any unclaimed work
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"an error" -> "ValueError"

"""A primitive transform for processing keyed elements or KeyedWorkItems.

Will be evaluated by
`runners.direct.transform_evaluator._ProcessElemenetsEvaluator`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"_ProcessElementsEvaluator"

@@ -961,6 +962,7 @@ def display_data(self):

def process(self, element, init_result):
if self.writer is None:
# We ignore uuid collisions here since it's extremely rare.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

capitalize "UUID".

residual_range = (self._range.start, self._range.stop)
end_position = self._range.start
else:
residual_range = (self._current_position + 1, self._range.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you factor residual_range out of this if? Just as:

residual_range = (end_position, self._range.stop)

super(_ParDoEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
side_inputs, scoped_metrics_container)
self._perform_dofn_pickle_test = perform_dofn_pickle_test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment that this workaround is for SDF.

@@ -530,16 +541,20 @@ def start_bundle(self):
self._counter_factory = counters.CounterFactory()

# TODO(aaltay): Consider storing the serialized form as an optimization.
dofn = pickler.loads(pickler.dumps(transform.dofn))
dofn = (pickler.loads(pickler.dumps(transform.dofn))
if self._perform_dofn_pickle_test else transform.dofn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment that this workaround is for SDF.

# limitations under the License.
#

"""Unit tests for the range_trackers module."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add extra newline.

self.sdf = sdf
self._element_tag = _ValueStateTag('element')
self._restriction_tag = _ValueStateTag('restriction')
self.watermark_hold_tag = _ValueStateTag('watermark_hold')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
This is also used in transform_evaluator module.

Acknowledged.

@@ -491,14 +491,14 @@ class NullReceiver(object):
def output(self, element):
pass

class _InMemoryReceiver(common.Receiver):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
I ran into issues since we are piggybacking StepContext and SDF DoFnInvoker objects in the DoFn. I added an option to disable this pickling only for the SDF case.

Acknowledged.

"""A transform that assigns a unique key to each element."""

def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
yield (uuid.uuid4().bytes, element)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Added TODOs to here and iobase._WriteBundleDoFn. I think collisions are extremely rare for uuid.uuid4() though. Also, added an assertion to force a failure here if a collision is detected.

I don't see this assertion? I agree that they should be rare, but a comment would help anyone reading this in the future understand this assumption.

OffsetRange(10, 100)

with self.assertRaises(ValueError):
OffsetRange(10, 9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Done (note that this is not a failure case).

Done.

SDFProcessElementInvoker(
max_num_outputs=self.DEFAULT_MAX_NUM_OUTPUTS,
max_duration=self.DEFAULT_MAX_DURATION))
self._process_fn.set_process_element_invoker(process_element_invoker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Renamed to 'key'.

Done.

@@ -530,16 +530,19 @@ def start_bundle(self):
self._counter_factory = counters.CounterFactory()

# TODO(aaltay): Consider storing the serialized form as an optimization.
dofn = pickler.loads(pickler.dumps(transform.dofn))
dofn = transform.dofn

pipeline_options = self._evaluation_context.pipeline_options
if (pipeline_options is not None
and pipeline_options.view_as(TypeOptions).runtime_type_check):
dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Done.

Done.

pass

def process(self, element, timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam, * args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Done.

Done.

if self._current_position is None
else (self._current_position + 1, self._range.stop))
# If self._current_position is 'None' no records have been claimed so
# residual should start from self._range.start.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Done.

Done.

self.sdf_invoker, windowed_element, tracker)

sdf_result = None
for output in output_values:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Added more documentation to SDFProcessElementInvoker and Result classes.

Acknowledged.

# residual should start from self._range.start.
end_position = (
self._range.start if self._current_position is None
else self._current_position + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Done.

Done.

@charlesccychen
Copy link
Contributor

Please also rebase to HEAD since there seems to be a merge conflict right now.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

Raises ValueError: if there is still any unclaimed work remaining in the
restriction invoking this method. Exception raised must have an
informative error message.
This method must raise an error if there is still any unclaimed work
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
"an error" -> "ValueError"

Done.

"""A primitive transform for processing keyed elements or KeyedWorkItems.

Will be evaluated by
`runners.direct.transform_evaluator._ProcessElemenetsEvaluator`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
"_ProcessElementsEvaluator"

Done.

@@ -961,6 +962,7 @@ def display_data(self):

def process(self, element, init_result):
if self.writer is None:
# We ignore uuid collisions here since it's extremely rare.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
capitalize "UUID".

Done.

"""Ignores undeclared outputs, default execution mode."""

def receive(self, element):
def output(self, element):
pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please add comment that this workaround is for SDF.

Done.

# limitations under the License.
#

"""Unit tests for the range_trackers module."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please add extra newline.

Done.

residual_range = (self._range.start, self._range.stop)
end_position = self._range.start
else:
residual_range = (self._current_position + 1, self._range.stop)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Can you factor residual_range out of this if? Just as:

residual_range = (end_position, self._range.stop)

Done.

input_committed_bundle, side_inputs, scoped_metrics_container,
perform_dofn_pickle_test=True):
super(_ParDoEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please add comment that this workaround is for SDF.

Mentioned above.

@@ -47,7 +47,8 @@
from apache_beam.utils import urns
from apache_beam.utils.windowed_value import WindowedValue

__all__ = ['BoundedSource', 'RangeTracker', 'Read', 'Sink', 'Write', 'Writer']
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Updated to a comment.

Acknowledged. Updated to a comment.

"""A transform that assigns a unique key to each element."""

def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
yield (uuid.uuid4().bytes, element)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
I don't see this assertion? I agree that they should be rare, but a comment would help anyone reading this in the future understand this assumption.

I removed the assertion after Eugene's comment. Added a comment.

@chamikaramj
Copy link
Contributor Author

Failure is unrelated (due to https://issues.apache.org/jira/browse/BEAM-3369).

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Raises ValueError: if there is still any unclaimed work remaining in the
restriction invoking this method. Exception raised must have an
informative error message.
This method must raise an `ValueError` if there is still any unclaimed work
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a ValueError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -332,31 +437,45 @@ def __init__(self,
kwargs: keyword side input arguments (static and placeholder), if any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
We need to pass in a customer OutputProcessor when invoking SDF.process() instead of using the default output processor since output has to be handled at ProcessFn.

Done.


Args:
do_fn: A DoFn object that contains the method.
obj: the object that contains the method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chamikaramj wrote:
Done.

Done.

Updates DoFnInvocation logic to allow invoking SDF methods.
Adds SDF machinery that will be common to DirectRunner and other runners.
Adds DirectRunner specific transform overrides, evaluators, and other logic for processing Splittable DoFns.
@chamikaramj
Copy link
Contributor Author

Thanks for the review.

@chamikaramj chamikaramj merged commit 3c81b41 into apache:master Dec 19, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants