Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8581] and [BEAM-8582] watermark and trigger fixes #10035

Closed
wants to merge 4 commits into from

Conversation

rohdesamuel
Copy link
Contributor

@rohdesamuel rohdesamuel commented Nov 8, 2019

The GeneralTriggerDriver does not put watermark holds on timers, leading to the ontime empty pane being considered late data.

The DefaultTrigger and AfterWatermark do not clear their timers after the watermark passed the end of the endow, leading to duplicate records being emitted.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@rohdesamuel
Copy link
Contributor Author

R: @robertwb
R: @dpmills
Can you review this please?

@rohdesamuel
Copy link
Contributor Author

This is blocking #9953

sdks/python/apache_beam/transforms/trigger.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/testing/util.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/runners/direct/util.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/testing/test_stream_test.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/testing/test_stream_test.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger_test.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/transforms/trigger.py Show resolved Hide resolved
@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

@rohdesamuel
Copy link
Contributor Author

There hasn't been any review in the last 9 days, so I'm asking @pabloem to merge.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is OK. Can you squash to a commit with the testing changes and another with the trigger fixes. After that, LGTM.

pass

def process_entire_key(
self, key, windowed_values, output_watermark=MIN_TIMESTAMP):
self, key, windowed_values, input_watermark=MIN_TIMESTAMP,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified we found all callers of this code? Inserting a (default) argument before existing arguments could result in an off-by-one error for unmodified callers.

Also, the ordering should be consistent with process_elements().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is MIN_TIMESTAMP the correct default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the only other place where these are called directly are in the Batch Dataflow Python Worker. So by setting these to MIN_TIMESTAMP we go back to the old implementation which didn't affect Batch.

@@ -1036,14 +1074,17 @@ class BatchGlobalTriggerDriver(TriggerDriver):
index=0,
nonspeculative_index=0)

def process_elements(self, state, windowed_values, unused_output_watermark):
def process_elements(self, state, windowed_values,
unused_output_watermark=MIN_TIMESTAMP,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not provide default values here, as it's unclear what they should be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I set these to None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant not provide default (None or otherwise).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha. I removed the default value on the output_watermarks, but the input_watermark needs a default value to keep backwards compatibility.

@robertwb
Copy link
Contributor

Run Python Precommit

@rohdesamuel
Copy link
Contributor Author

Argh, sorry for the force pushes. Local repo got into a weird state.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As requested, can you create one commit with the testing changes and another with the trigger fixes.

@rohdesamuel
Copy link
Contributor Author

Ah gotcha, misread the comment. I just fixed up the commits as asked.

@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

1 similar comment
@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

1 similar comment
@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

1 similar comment
@rohdesamuel
Copy link
Contributor Author

Run Portable_Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

3 similar comments
@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to merge this but noted some unrelated changes.

sdks/python/apache_beam/transforms/trigger_test.py Outdated Show resolved Hide resolved
@rohdesamuel
Copy link
Contributor Author

Run Python PreCommit

@HuangLED
Copy link
Contributor

HuangLED commented Dec 5, 2019

friendly ping for merging this PR.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there were some merge issues or something. I'm going to go through this again, and fix up the history and merge it if I can.

@@ -608,7 +606,7 @@ def start_bundle(self):

def process_timer(self, timer_firing):
if timer_firing.name not in self.user_timer_map:
_LOGGER.warning('Unknown timer fired: %s', timer_firing)
logging.warning('Unknown timer fired: %s', timer_firing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was reverted again.

return 'TimerFiring(%r, %r, %s, %s)' % (self.encoded_key,
self.name, self.time_domain,
self.timestamp)
return 'TimerFiring({}, {}, {}, {})'.format(self.encoded_key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of repr for encoded keys was intentional here.

@robertwb
Copy link
Contributor

robertwb commented Dec 6, 2019

I fixed the merge conflicts and history and reverted un-intentional changes on the other PR, which has been merged.

@robertwb robertwb closed this Dec 6, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants