New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2535] Added control to set output timestamp independent of firing time for event time timers. (Direct Runner implementation) #4700
Conversation
@kennknowles Here is my initial try of implementing withOutputTimestamp() in DirectRunner. I did not put that much effort into fixing nits and implementing tests yet. If this PR looks good in general, I will go ahead and add tests and polishing. Let me know your initial review, until then I will be working on other tasks. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks overall good! I think @tgroh would be a good source for also looking over direct runner changes.
@@ -77,7 +77,8 @@ public void finishBundle() { | |||
|
|||
@Override | |||
public void onTimer( | |||
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { | |||
String timerId, BoundedWindow window, Instant timestamp, Instant outputTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also change timestamp
to deliveryTimestamp
in all these methods, to make the difference very clear.
void onTimer(String timerId, BoundedWindow window, Instant timestamp, | ||
TimeDomain timeDomain); | ||
void onTimer( | ||
String timerId, BoundedWindow window, Instant timestamp, Instant outputTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: google-java-format
@@ -698,23 +701,23 @@ public TimerInternalsTimer( | |||
|
|||
@Override | |||
public void set(Instant target) { | |||
this.target = target; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice that the IllegalArgumentException
is thrown before the wrong value is set. Here it is not likely to cause a problem, but it is a good practice. You never know how this class might be used in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would be a considered as illegal deliveryTimestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as would be thrown in setAndVerify
- and as an aside, I'd much prefer the timestamp be returned by verify and the fields be final if that's possible (I think it is)
* generated. | ||
* Construct a {@link TimerData} for the given parameters. | ||
*/ | ||
public static TimerData of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have two Instant
value parameters, it is a good time to switch this to use @AutoValue.Builder
.
@@ -225,6 +258,30 @@ public int compareTo(TimerData that) { | |||
} | |||
} | |||
|
|||
/** | |||
* Used for sorting {@link TimerData} by output timestamp of the timer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc that ordering other than the output timestamp is stable but unspecified.
@@ -400,6 +418,8 @@ public Instant get() { | |||
* <ul> | |||
* <li>the current input watermark</li> | |||
* <li>the current watermark holds</li> | |||
* <li>the earliest delivery timestamp of pending timers</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe now that you have output timestamp, you don't need to take the minimum of the delivery timestamp. That is how you can use this for GC / OnWindowExpiration.
private final SortedMultiset<TimerData> pendingTimersSortedByTs; | ||
|
||
// Keeps track of pending timers sorted by output timestamp. | ||
private final SortedMultiset<TimerData> pendingTimersSortedByOutputTs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for processing time timers, we just let them expire, because we don't want to have a pipeline just stall, right?
Definitely put in some |
private void setAndVerifyOutputTimestamp() { | ||
// Output timestamp is currently not supported in processing time timers. | ||
if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { | ||
throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"...domains other than %s: got %s", TimeDomain.EVENT_TIME, spec.getTimeDomain()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
outputTimestamp = target; | ||
} | ||
|
||
System.out.println(outputTimestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops!
System.out.println(outputTimestamp); | ||
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { | ||
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); | ||
checkArgument(!outputTimestamp.isAfter(windowExpiry), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be an even stricter check (probably should be even) - we should enforce that the outputTimestamp is no later than the maxTimestamp
of the window, if I understand windows properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've got a lot of printlns and commented out code - let me know when this is ready for review (at least, having cleaned those up)
@@ -42,7 +42,9 @@ | |||
* Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer | |||
* in the given window. | |||
*/ | |||
void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain); | |||
void onTimer( | |||
String timerId, BoundedWindow window, Instant timestamp, Instant outputTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting, here and other changed lines?
|
||
public StatefulParDoEvaluator( | ||
DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) { | ||
DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator, | ||
DirectStepContext stepContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's pretty important to document that these are owned by the StatefulParDoEvaluator
- otherwise the mutations aren't ok.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR adds ability to control the output timestamp of event timers. This feature only works for the direct runner at the moment. Before this PR, the output timestamp of event timers are set to timer's delivery timestamp.
Example:
When an user sets timer with output timestamp 3 and delivery timestamp 5:
@ProcessElement()
public void processElement(ProcessContext c, @TimerId("foo") Timer timer) {
timer.withOutputTimestamp(new Instant(3)).set(new Instant(5));
}
@ontimer("foo")
public void myTimerCallback(OnTimerContext c, BoundedWindow w) {
c.output("some_value"); // outputs with timestamp 3.
}
The timer will deliver at 5 as usual. However, output() method of the OnTimerContext in myTimerCallback function uses 3 as the output timestamp. To make sure the element is not late for further transforms down the pipeline, output watermark will be held at the output timestamp of the timer (3 in the example) until myTimerCallback executes.
This WM hold feature is also useful for outputting values saved in state cells right before garbage collection. (BEAM-1589) GC timer fires after the window expiration, therefore, outputting elements with output timestamp past the window expiration results them to be dropped further down the pipeline. By setting onWindowExpiration timer as:
timer.withOutputTimestamp(windowExpiration).set(GCtime), we could output these elements inside the window.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.