-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-2535] Support outputTimestamp and watermark holds in processing timers. #10627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2535] Support outputTimestamp and watermark holds in processing timers. #10627
Conversation
|
R: @reuvenlax |
|
Pinging to trigger tests. |
|
Run Java PreCommit |
|
Run JavaPortabilityApi PreCommit |
|
@reuvenlax I have completed outputTimestamp functionality. Need your input with the failed test cases. I think it is expected with the addition of this feature. |
|
retest this please |
|
Run Direct ValidatesRunner |
|
@reuvenlax really need your input with the failed test cases as the functionality is completed. |
|
Run Java PreCommit |
| public Timer withOutputTimestampOffset(Duration outputTimestampOffset) { | ||
| this.outputTimestampOffset = outputTimestampOffset; | ||
| return this; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we need withOutputTimestampOffset - I think withOutputTimestamp is sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| : target.minus(offset.minus(outputTimestampOffset)); | ||
| } | ||
|
|
||
| if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should verify that the output timestamp is > the timestamp of the input message (if in processElement) or the output timestamp of the firing timer (if in processTimer). The < check remains correct - even for processing-time timers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax , does that mean we need to compare output timestamp with timerInternal.currentInputWatermarkTime()? I think it will fail our previous test case of output timestamp with event timer as we have set output timestamp as 5 while having a timestamp of input element as 9.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| if (outputTimestamp == null) { | ||
| if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { | ||
| outputTimestamp = target; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if the timer is processing time, then then default outputTimestamp should be that of the input element (or the output time of the firing timer if in processTimer).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); | ||
| return new TimerInternalsTimer( | ||
| window, getNamespace(), timerId, spec, stepContext.timerInternals()); | ||
| window, getNamespace(), timerId, spec, fireTimestamp(), stepContext.timerInternals()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be timestamp(), not fireTimestamp().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| window(), | ||
| getNamespace(), | ||
| spec, | ||
| fireTimestamp(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto - this should be timestamp() not fireTimestamp()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| outputTimestamp, | ||
| elementInputTimestamp); | ||
|
|
||
| if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check outputTimestamp is in the window for processing-timer as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| new DoFn<KV<String, Integer>, Integer>() { | ||
|
|
||
| @TimerId(timerId) | ||
| private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want the second DoFn to set an event-time timer. That way you can test that the watermark hold in the first DoFn prevents firing of the second DoFn's timer, which it will only do if the second DoFn's timer is even time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax updated
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
Outdated
Show resolved
Hide resolved
| Instant earliest = THE_END_OF_TIME.get(); | ||
| for (NavigableSet<TimerData> timers : processingTimers.values()) { | ||
| if (!timers.isEmpty()) { | ||
| earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is sufficient. The NavigableSet is ordered by timer firing time, not by outputTimestamp. This means that the logic to just look at the first timer (which was correct before we had holds) is probably no longer enough. we probably need to iterate over the set (or we need to have a second data structure to hold the watermark holds and take the minimum of that data structure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax added a new function to calculate the earliest outputTimestamp from NavigableSet.
…, firing time fix, test case updates
|
retest this please |
|
Run Direct ValidatesRunner |
| target, | ||
| windowExpiry); | ||
| // For processing timers | ||
| if (outputTimestamp == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we missing an else clause here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax we only set outputTimestamp here. I think If that is already set before then we don't need anything. I think else clause is not required here.
| private final TimerSpec spec; | ||
| private Instant target; | ||
| private Instant outputTimestamp; | ||
| private Instant elementInputTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| !outputTimestamp.isBefore(elementInputTimestamp), | ||
| "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s", | ||
| outputTimestamp, | ||
| elementInputTimestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check this before setting outputTimestamp above (and only if outputTimestamp != null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
|
retest this please |
1 similar comment
|
retest this please |
|
Run Java PreCommit |
|
run dataflow validatesrunner |
|
run flink validatesrunner |
| // DoFn timer's watermark hold. This timer should not fire until the previous timer | ||
| // fires and removes | ||
| // the watermark hold. | ||
| timer.offset(Duration.standardSeconds(8)).setRelative(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use timer.set, not timer.offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| .addElements(KV.of("key", 1)) | ||
| // Normally this would case fn2's timer to expire, but it shouldn't here because of | ||
| // the output timestamp. | ||
| .advanceProcessingTime(Duration.standardSeconds(9)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have to advance processing time to at least 11.
You also need to advance the watermark to at least 10 to allow the timer to fire. Right now this isn't testing anything, because the input watermark is preventing the timer from firing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax, I think if we advance processing time to 11, it will fire fn1's timer as we have set delivery time offset as 10. If we advance processing time by 9 and watermark by 11 then it will test the functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but you need to advance the watermark before calling addElements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
| */ | ||
| public synchronized Instant getEarliestTimerTimestamp() { | ||
| Instant earliest = THE_END_OF_TIME.get(); | ||
| for (NavigableSet<TimerData> timers : processingTimers.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the other getEarliestTimerTimestamp function? Does that need to be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax done
|
Pinging to trigger tests. |
|
Run Python2_PVR_Flink PreCommit |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.