New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8823] Make FnApiRunner work by executing ready elements instead of stages #15441
Conversation
c5f534f
to
d1984dd
Compare
Codecov Report
@@ Coverage Diff @@
## master #15441 +/- ##
==========================================
- Coverage 83.79% 83.50% -0.30%
==========================================
Files 444 445 +1
Lines 60474 61413 +939
==========================================
+ Hits 50674 51281 +607
- Misses 9800 10132 +332
Continue to review full report at Codecov.
|
r: @y1chi (see first comment for a quick description of the change) |
Thanks Pablo, I'll take a look and reach out if I have questions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM.
self.watermark_manager = WatermarkManager(stages) | ||
# from apache_beam.runners.portability.fn_api_runner import \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep this here as a hint to show that the pipeline can be visualized here.
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
Outdated
Show resolved
Hide resolved
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
Outdated
Show resolved
Hide resolved
_LOGGER.debug( | ||
'Enqueuing stage pending watermark. Stage name: %s', stage.name) | ||
self.queues.watermark_pending_inputs.enque( | ||
((stage.name, MAX_TIMESTAMP), DataInput(data_input, {}))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAX_TIMESTAMP here seems weird to me, should it be MIN_TIMESTAMP if we expect the input to be ready (maybe for streaming)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's right - for BATCH, we need the upstream stage to be fully processed before moving forward. As I work on streaming this will change to be attributed to a more appropriate timestamp.
# Timer was cleared, so we must skip setting it below. | ||
timer_cleared = True | ||
continue | ||
if timer_cleared or (transform_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if same timer was set multiple times with clear being called in between?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the SDK would only send back the latest of these events, independently of what it is. is that reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that's the case when it is grpc data channel https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L669, IIUC every time timer calls set it will write an output timer to the output queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah my bad - only the last one will be saved - see in lines 547-548 of the file - we only store the latest event on a given tmier+window+key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and if a timer is cleared for certain key and window we ignore all the other set timers for the timer family in the bundle, am I misunderstanding the condition here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see in lines 546-548 we decode all the timers that have been written, and we key them by (key, window)
in a dictionary. Note that if there are multiple timers in the same (key, window)
, only the latest one will be saved in the timers_by_key_and_window
dictionary.
Then, in the loop starting at line 551, we read the latest timer action for each (key, window)
So we will only apply the latest action - whether it is clear or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after the loop starting at line 551, the timer_cleared is set to True as long as one (key, window) has a clear timer, and all other (key, window) timers are skipped and not append to newly_set_timers because timer_cleared is True and the continue jumps to next iteration of loop starting at line 537. Isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah great observation Yichi.... I've changed this code - we keep timers per dynamic timer tag, and ew clean up only previous sets - but allow new sets to work - so we'll only use the LATEST action on the timer.
Pushing code in a bit.
buffer = runner_execution_context.pcoll_buffers.get( | ||
buffer_id, ListBuffer(None)) | ||
|
||
if buffer and buffer_id in buffers_to_clean: | ||
runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy() | ||
buffer = runner_execution_context.pcoll_buffers[buffer_id] | ||
if buffer_id in runner_execution_context.pcoll_buffers: | ||
buffers_to_clean.add(buffer_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it a bit hard to follow the logic here. Are we just trying to pop the pcollection buffer with the buffer id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added comments for the special cases. lmk if that makes sense.
oops looking at failures... |
thanks @y1chi ! this is ready for another round of review : ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment addressing commit seems missing.
the commit that tries to address comments is this one: 4a4067b it got stumped a bit by the merge commit on top |
# Timer was cleared, so we must skip setting it below. | ||
timer_cleared = True | ||
continue | ||
if timer_cleared or (transform_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after the loop starting at line 551, the timer_cleared is set to True as long as one (key, window) has a clear timer, and all other (key, window) timers are skipped and not append to newly_set_timers because timer_cleared is True and the continue jumps to next iteration of loop starting at line 537. Isn't it?
runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy() | ||
buffer = runner_execution_context.pcoll_buffers[buffer_id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
creaing copy for every stage, isn't it just overriding the original buffer with it's own copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right - so the flow is like this:
- Run stage, and write its outputs to
pcoll_buffers
- For each stage output, do:
- Get the data buffer from
pcoll_buffers
- Find its next downstream consumer and enqueue this buffer to be consumed
- If there are more downstream consumers, make a copy of the buffer, add it to
pcoll_buffers
, and go back to point 3 - If there are no more downstream consumers, continue to next step
In the general case, a PCollection will have only one consumer - so the buffer will not need to be copied, but if there are multiple downstream consumers, then we create copies starting for the second one, so that each buffer copy is pushed to one consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok, I guess what confused me is why we need to write the copy back to pcoll_buffers.
Run Python 3.8 PostCommit |
address your timer comments here: #15441 (comment) - and with the latest commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…nner work by executing ready elements instead of stages" This reverts commit ef43645.
…k by executing ready elements instead of stages * [BEAM-9640] Sketching watermark tracking on FnApiRunner. * Addressing some comments * Fixups * fixing bug with truncation of restrictions * Fixing output watermark for stages * Moving visualization tools to different file * Individual stages are run bundle-based * [wip] Working on per-bundle execution * fixups * Fixing tests * cleanup * fixing lint, formatting, some typing info * More tests passing * fix import * fix test * Fix most other tests * all passin * testout * testing weird fix, hehe * fixup * Fix formatting * fix typing issues * fixup * fixup * cleanup * addressing comments * fix typoschmypo * proper timer handling
…nner work by executing ready elements instead of stages" This reverts commit ef43645.
… FnApiRunner work by executing ready elements instead of stages"" This reverts commit a2f08e5.
… FnApiRunner work by executing ready elements instead of stages"" This reverts commit a2f08e5.
… FnApiRunner work by executing ready elements instead of stages"" This reverts commit a2f08e5.
…xecuting ready elements instead of stages * Revert "Revert "Merge pull request #15441 from [BEAM-8823] Make FnApiRunner work by executing ready elements instead of stages"" This reverts commit a2f08e5. * improving/fixing side input handling * fixup * fixup * fixup * fixup and new test * fixup * fixup * Addressing comments * Adding comments for clarity
This PR modifies the FnApiRunner to execute pipelines per-bundle instead of per-stage.
The previous implementation would work as such:
The new implementation works as follows:
I'm happy to dive into detail for this PR.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunner
compliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.