-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9562] Send Timers over Data Channel as Elements #11314
Conversation
sdks/python/apache_beam/pipeline.py
Outdated
@@ -1071,6 +1071,14 @@ def named_inputs(self): | |||
} | |||
return dict(main_inputs, **side_inputs) | |||
|
|||
def main_inputs(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generic transforms don't have the notion of main inputs, let's filter things out in the implementation in ParDo.
@@ -1294,16 +1295,43 @@ def _pardo_fn_data(self): | |||
windowing = None | |||
return self.fn, self.args, self.kwargs, si_tags_and_types, windowing | |||
|
|||
def to_runner_api_parameter(self, context): | |||
def to_runner_api(self, context, main_inputs, has_parts=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is starting to look like a lot of code duplication. How about we pass (all) inputs as a keyword argument, and let PTransform.to_runner_api
take an **extra_kwargs
that it passes on to to_runner_api_parameter
.
e046783
to
3ff8c3e
Compare
paneinfo, | ||
timer_family_id, | ||
timer_coder_impl, | ||
output_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A type on this parameter would be useful.
dynamic_timer_tag='', | ||
windows=(self._window, ), | ||
clear_bit=False, | ||
fire_timestamp=clear_ts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't bother setting these timestamps, or paneinfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Should the coder be ignoring them as well?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Should the coder be ignoring them as well?)
No, the timer coder is encoding all of these info now.
Don't bother setting these timestamps, or paneinfo.
Could you please explain more about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're meaningless when we're clearing a timer (e.g. it won't fire, hold back the watermark, or have a pane info).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, when clear_bit
is True
, the coder ignores these fields. I think we should have a better Timer
with API of
and clear
like in Java as a follow up.
@@ -611,7 +611,7 @@ def __init__(self, | |||
transform_id, # type: str | |||
key_coder, # type: coders.Coder | |||
window_coder, # type: coders.Coder | |||
timer_family_specs # type: Mapping[str, beam_runner_api_pb2.TimerFamilySpec] | |||
timer_coders |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type?
@@ -1088,6 +1142,30 @@ def create_operation(self, | |||
transform_proto.spec.payload, parameter_type) | |||
return creator(self, transform_id, transform_proto, payload, consumers) | |||
|
|||
def get_timer_coders(self): | |||
timer_coder = {} | |||
for transform_id, transform_proto in self.descriptor.transforms.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see us doing this loop three times now. Perhaps it would be more useful to do the loop once to set everything up, creating a single dictionary (transform_id, timer_family_id) -> (all info about that timer we need to dispatch them).
instruction_id, transform_id, timer_id) | ||
timer_output_streams[transform_id] = output_streams | ||
self.process_timer_ops[ | ||
transform_id].user_state_context.update_timer_output_streams( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rather than this double nesting, it might simplify things to have an update_timer_output_streams(timer_id, output_stream)
method that could be called repeatedly.
output_streams) | ||
|
||
# Process timers | ||
if self.timer_data_channel: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't safely assume the runner will finish sending all timers before sending any of the data (and the buffer may get full, resulting in a deadlock). I think we need to have a data_channel.inputs() that returns both data and timers and then branch in the loop.
I'm sorry. I am havinhg a heavy headache. I'll bow out. @robertwb can you review fn_runner.py and siblings? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of comments on the Java side. Still working reviewing but I need to step away for a little bit.
.../java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
Show resolved
Hide resolved
checkArgument( | ||
mainInput.getCoder() instanceof KvCoder, | ||
"DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s", | ||
mainInput.getCoder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: did we not have this check before, and just failed when attempting to cast to KVCoder (in the removed block from translate
above)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was being covered by validation in DoFnSignatures but it is being repeated here for defense in depth reasons.
idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor()) | ||
idGenerator, | ||
sdkHarnessRegistry.beamFnStateApiServiceDescriptor(), | ||
sdkHarnessRegistry.beamFnDataApiServiceDescriptor()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the timer API service descriptor different from the data API service descriptor? Does that need to be plumbed through SdkHarnessRegistry and used here instead of the data API descriptor? (same question below and in streaming worker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They both use the Data API so no. All were saying here is that we will re-use the same gRPC channel for both timers and data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So we only have a separate timer_api_service_descriptor in the protos so that a runner has the option to make it separate, but it doesn't need to be separate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct.
} | ||
|
||
private RegisterNodeFunction( | ||
@Nullable RunnerApi.Pipeline pipeline, | ||
IdGenerator idGenerator, | ||
Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) { | ||
Endpoints.ApiServiceDescriptor stateApiServiceDescriptor, | ||
Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timerApiServiceDescriptor isn't used? Should it be stored and written to the ProcessBundleDescrioptor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java changes LGTM overall aside from the above comment. Another set of eyes (or at least another look from my own eyes when they're fresh) would be good though.
The test_pardo_timers_clear fails with streaming Flink. The python sdk sends all timers(hold_timestamp=-INF with python default behavior) but only gets the timer with timestamp=20 back. Given the test only fails when streaming, it seems like something not correct with watermark(?). @lukecwik |
|
||
def to_runner_api(self, context, **extra_kwargs): | ||
# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec | ||
has_parts = extra_kwargs.get('has_part', False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can leave this in the parameter list.
# type: (PipelineContext) -> typing.Tuple[str, message.Message] | ||
assert isinstance(self, ParDo), \ | ||
"expected instance of ParDo, but got %s" % self.__class__ | ||
key_coder, window_coder = self._get_key_and_window_coder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put this in the if block below closer to where they're used?
def to_runner_api(self, context, **extra_kwargs): | ||
# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec | ||
has_parts = extra_kwargs.get('has_part', False) | ||
urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I see what's going on here.
dynamic_timer_tag='', | ||
windows=(self._window, ), | ||
clear_bit=False, | ||
fire_timestamp=clear_ts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're meaningless when we're clearing a timer (e.g. it won't fire, hold back the watermark, or have a pane info).
# type: (...) -> OutputTimer | ||
assert self._timer_receivers is not None | ||
return OutputTimer(key, window, self._timer_receivers[timer_spec.name]) | ||
output_stream = self._timer_output_streams[timer_spec.name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this were a single map rather that two parallel maps, you could write something like
output_tream, timer_coder_impl = self._timer_info(timer_spec.name]
done_inputs.add((element.transform_id, element.timer_family_id)) | ||
else: | ||
yield element | ||
if isinstance(element, beam_fn_api_pb2.Elements.Data): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif
stream_done = False | ||
while not stream_done: | ||
streams = None | ||
if not stream_done: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always be true (given the loop condition).
timer_stream.append(stream) | ||
if isinstance(stream, beam_fn_api_pb2.Elements.Data): | ||
data_stream.append(stream) | ||
if data_stream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to have these conditionals, you can just write
yield beam_fn_api_pb2.Elements(data=data_stream, timer=timer_stream)
for stream in streams: | ||
if isinstance(stream, beam_fn_api_pb2.Elements.Timer): | ||
timer_stream.append(stream) | ||
if isinstance(stream, beam_fn_api_pb2.Elements.Data): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else
@@ -92,7 +92,7 @@ cdef class DoOperation(Operation): | |||
cdef DoFnRunner dofn_runner | |||
cdef object tagged_receivers | |||
cdef object side_input_maps | |||
cdef object user_state_context | |||
cpdef public object user_state_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than making this public, I would add an add_timer_info
method to this operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All my comments were pretty minor, the logic looks good (for the Python worker/data channel changes).
Run Python PreCommit |
Run Python2_PVR_Flink PreCommit |
Address Brian's comments on apache#11314
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I've finished reviewing all the Python files.
@@ -1272,6 +1272,8 @@ def expand(self, pcoll): | |||
key_coder = coder.key_coder() | |||
else: | |||
key_coder = coders.registry.get_coder(typehints.Any) | |||
self.window_coder = pcoll.windowing.windowfn.get_window_coder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these still used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Will removed.
window_coder = input_pcoll.windowing.windowfn.get_window_coder() | ||
return key_coder, window_coder | ||
|
||
def to_runner_api(self, context, **extra_kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks like it's copied from the superclass, instead just do
def to_runner_api(self, context, named_inputs, **extra_kwargs):
super(ParDo, self).to_runner_api, named_inputs=named_inputs, **extra_kwargs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can delete this override since we pass extra_kwargs
from PTransform
now.
data_channels = collections.defaultdict( | ||
list | ||
) # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]] | ||
|
||
# Inject data inputs from data plane. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is a bit misleading, as the injection doesn't happen in this for loop. (Similarly with timers.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment.
stage.timer_pcollections.append( | ||
(timer_read_pcoll + '/Read', timer_write_pcoll)) | ||
for timer_family_id in payload.timer_family_specs.keys(): | ||
stage.timers.add((transform.unique_name, timer_family_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice simplification here :).
expected_outputs # type: DataOutput | ||
): | ||
expected_outputs, # type: DataOutput | ||
fired_timers, # type: Mapping[str, Mapping[str, PartitionableBuffer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, should this be a Mapping[Tuple[str, str], PartitionableBuffer]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the fired_timers
implementation but forgot to update the typing here. Thanks!
@@ -536,7 +525,8 @@ def _run_stage(self, | |||
runner_execution_context, | |||
bundle_context_manager, | |||
data_input, | |||
data_output, | |||
data_output, {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put {} on its own line. (Surprised yapf didn't complain, or maybe you haven't run it yet.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yapf helps me put the {} here.
@@ -896,7 +906,9 @@ def _generate_splits_for_testing(self, | |||
|
|||
def process_bundle(self, | |||
inputs, # type: Mapping[str, PartitionableBuffer] | |||
expected_outputs # type: DataOutput | |||
expected_outputs, # type: DataOutput | |||
fired_timers, # type: Mapping[str, Mapping[str, PartitionableBuffer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mapping[Tuple[str, str], PartitionableBuffer]?
|
||
for transform_id, timer_family_id in ( | ||
set(expected_output_timers.keys()) - set(fired_timers.keys())): | ||
# Close the stream if there is no timers to be sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a subtle point. I might write something like "The worker waits for a logical timer stream to be closed for every possible timer, regardless of whether there are any timers to be sent."
Maybe it'd be clearer to iterate over expected_output_timers
, and send fired_timers.get((transform_id, timer_family_id), [])
.
if coder_id in self.execution_context.safe_coders: | ||
return self.execution_context.pipeline_context.coders[ | ||
self.execution_context.safe_coders[coder_id]].get_impl() | ||
else: | ||
return self.execution_context.pipeline_context.coders[coder_id].get_impl() | ||
|
||
def get_timer_coder_impl(self, transform_id, timer_family_id): | ||
assert (transform_id, timer_family_id) in self._timer_coder_ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key error if it's not present below will be sufficient.
…ata channel in Beam Java and Python
This is a big change which also affects the runners. Would it have made sense to notify Runner authors, especially since post commit tests are broken? It took me a bit to figure out what caused the regression. |
@mxm Which post commits are you referring to? & Can you please mark the jira(s) with fix version 2.21.0 so we can fix the regression in the release? |
Thanks, Max! Sorry for the inconvenience. It seems like currently both Spark and Flink fail on the same test: org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerAlignBounded. The failure pattern is also the same: the pipeline only produces the output from timer, not from the ProcessElement fn. I think there should be something wrong in the java runner shared library code. Have you worked on it? Or do you want me to follow up fixing this issue? |
The problem is with the Timer implementation inside the FnApiDoFnRunner. The spec for Timer wasn't clear as to what the defaults were when withOutputTimestamp was added and hence some critical logic was deleted during the migration. See #11402 for the fix. |
I was actually working on something related to timers in #11362 and was surprised to see that the test failed when I opened the PR, since I had run tests locally. Then figured something must have changed on master in the meantime. Thanks for following up with this! |
For commit: 3ff8c3e
r: @robertwb for data_plane.py and bundle_processor.py
r: @pabloem for fn_runner related part.
For commit: a2a7164
r: @TheNeuralBit
cc: @y1chi for reference
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.