New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform #151
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @taegeonum. I've left comments.
@@ -122,8 +130,6 @@ public void onData(final WindowedValue<KV<K, InputT>> element) { | |||
private void processElementsAndTriggerTimers(final Watermark inputWatermark, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handleWatermark()?
since we do all of the following three
(1) processElements
(2) TriggerTimers
(3) emits a watermark (if necessary) to downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will separate the emitOuptutWatermark and keep this method name.
* Output watermark | ||
* = max(prev output watermark, | ||
* min(input watermark, watermark holds)). | ||
* @param inputWatermark input watermark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a brief note on the guarantees about the input watermark?
Something like:
{@ link InputWatermarkManager} guarantees the input watermark here to be the minimum value of the watermarks we've seen from all of the input streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already written in Transform.onWatermark
:)
} | ||
|
||
// Find min watermark hold | ||
final Watermark watermarkHold = Collections.min(keyAndWatermarkHoldMap.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minOfWatermarkHolds
+ "inputWatermark: {}, outputWatermark: {}", watermarkHold, inputWatermark, prevOutputWatermark); | ||
} | ||
|
||
if (watermarkHold.getTimestamp() > inputWatermark.getTimestamp()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Math.max and Math.min?
newOutputWatermark = Math.max(prevOUtputWatermark (Math.min (inputWatermark, minOfWatermarkHolds))
if (newOutputWatermark > prevOutputWatermark) {
// progress
if (minOfWatermarkHolds <= inputWatermark ) {
// Remove minimum watermark holds
}
}
* @param inputWatermark input watermark | ||
*/ | ||
private void emitOutputWatermark(final Watermark inputWatermark) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For completeness, can you check all of the three watermarks that make up the output watermark?
// prev output watermark - No need to check (?)
// inputWatermark - check for Long.MIN_VALUE
// keyAndWatermarkHoldMap - check emptiness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we check Long.MIN_VALUE for inputWatermark?
// adds the output timestamp to the watermark hold of each key | ||
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 | ||
keyAndWatermarkHoldMap.put(output.getValue().getKey(), | ||
new Watermark(output.getTimestamp().getMillis() + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reference to this? (e.g., another Beam runner doing also the same)
In the case of following sequence of elements/watermark with (timestamp):
e1(1), e2(2), w1(3), e3(3)
With the +1 we see it as
e1(2=1+1), e2(3=2+1), w1(3), e3(4=3+1)
Would that result in an incorrect reordering of e2 and w1 (in the time-scale)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output here is windowed values, not all elements. Windowed value has its own timestamp value. This is different from the element timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO here to check for different triggers?
https://issues.apache.org/jira/browse/NEMO-270
In the following case
- W-4 / E-3 / E-2 / E-1 / E-1 --> GBKW
- All elements are in the window of [1~4)
(1) If triggered at watermark W-4 this works perfectly
(2) If triggered early at E-1, then the downstream operators will consider the following E-1 late.
NEMO-270 can check for the correctness of (2)
@Override | ||
public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) { | ||
// adds the output timestamp to the watermark hold of each key | ||
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For an element to belong to a window [0~5000) then its timestamp must be one of 0, 1, ... , 4999.
If you do +1 to an element with timestamp 4999, would that make the element belong to a different window [5000, 10000)?
Please let me know if I'm missing something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Window [0-5000) is triggered at watermark 5000. This means that if we receive [0~5000) window as an output, the watermark is >= 5000. But the window output timestamp is 4999, so I added + 1 here
|
||
assertEquals(expectedValue, resultValue); | ||
} | ||
|
||
|
||
// [---- window1 --------] [--------------- window2 ---------------] | ||
// ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the existing test (fixed window), and introduce the new test (sliding window) as a separate one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed window is a special case of sliding window, so I just added the sliding window test to test GBKW.
List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1)); | ||
Collections.sort(sortedWindows, new Comparator<IntervalWindow>() { | ||
@Override | ||
public int compare(IntervalWindow o1, IntervalWindow o2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the existing IntervalWindow#compareTo?
// windowed result for key 1 | ||
assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows()); | ||
checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue()); | ||
assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also check for the PaneInfo
of the output elements?
(EARLY
, ON_TIME
, and LATE
).
It'd be definitely nice to experiment with different triggers in the WindowingStrategy
to ensure correct behaviors of the watermark holds, which I suppose allows the handling of EARLY
and LATE
elements, in addition to the timers mostly handling ON_TIME
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add TODO for the test
@johnyangk Thanks for the review! I've addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@taegeonum I've left one minor comment. Thanks!
// adds the output timestamp to the watermark hold of each key | ||
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 | ||
keyAndWatermarkHoldMap.put(output.getValue().getKey(), | ||
new Watermark(output.getTimestamp().getMillis() + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO here to check for different triggers?
https://issues.apache.org/jira/browse/NEMO-270
In the following case
- W-4 / E-3 / E-2 / E-1 / E-1 --> GBKW
- All elements are in the window of [1~4)
(1) If triggered at watermark W-4 this works perfectly
(2) If triggered early at E-1, then the downstream operators will consider the following E-1 late.
NEMO-270 can check for the correctness of (2)
JIRA: NEMO-267: Consider watermark holds in GroupByKeyAndWindowDoFnTransform
Major changes:
GroupByKeyAndWindowDoFnTransform
to emit output watermarks by holding watermarks for each keyTests for the changes:
GroupByKeyAndWindowDoFnTransform
by using sliding windows