Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform #151

Merged
merged 6 commits into from Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,7 +24,7 @@
/**
* Watermark event.
*/
public final class Watermark implements Serializable {
public final class Watermark implements Serializable, Comparable<Watermark> {

private final long timestamp;
public Watermark(final long timestamp) {
Expand All @@ -47,8 +47,18 @@ public boolean equals(final Object o) {
return timestamp == watermark.timestamp;
}

@Override
public String toString() {
return String.valueOf(timestamp);
}

@Override
public int hashCode() {
return Objects.hash(timestamp);
}

@Override
public int compareTo(final Watermark o) {
return Long.compare(timestamp, o.getTimestamp());
}
}
Expand Up @@ -159,7 +159,7 @@ protected final void checkAndFinishBundle() {
public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
// deserialize pipeline option
final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
this.outputCollector = oc;
this.outputCollector = wrapOutputCollector(oc);

this.bundleMillis = options.getMaxBundleTimeMills();
this.bundleSize = options.getMaxBundleSize();
Expand Down Expand Up @@ -227,6 +227,13 @@ public final void close() {
*/
abstract DoFn wrapDoFn(final DoFn originalDoFn);

/**
* An abstract function that wraps the original output collector.
* @param oc the original outputCollector.
* @return wrapped output collector.
*/
abstract OutputCollector wrapOutputCollector(final OutputCollector oc);

@Override
public abstract void onData(final WindowedValue<InputT> data);

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;

import java.util.Collection;
Expand Down Expand Up @@ -83,6 +84,11 @@ protected void beforeClose() {
// nothing
}

@Override
OutputCollector wrapOutputCollector(final OutputCollector oc) {
return oc;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -48,7 +49,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private final Map<K, List<WindowedValue<InputT>>> keyToValues;
private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
private long currentOutputWatermark;
private Watermark prevOutputWatermark;
private final Map<K, Watermark> keyAndWatermarkHoldMap;

/**
* GroupByKey constructor.
Expand All @@ -70,7 +72,8 @@ public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputC
options);
this.keyToValues = new HashMap<>();
this.reduceFn = reduceFn;
this.currentOutputWatermark = Long.MIN_VALUE;
this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
this.keyAndWatermarkHoldMap = new HashMap<>();
}

/**
Expand All @@ -96,6 +99,11 @@ protected DoFn wrapDoFn(final DoFn doFn) {
getMainOutputTag());
}

@Override
OutputCollector wrapOutputCollector(final OutputCollector oc) {
return new GBKWOutputCollector(oc);
}

/**
* It collects data for each key.
* The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
Expand Down Expand Up @@ -126,8 +134,6 @@ public void onData(final WindowedValue<KV<K, InputT>> element) {
private void processElementsAndTriggerTimers(final Watermark inputWatermark,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleWatermark()?

since we do all of the following three
(1) processElements
(2) TriggerTimers
(3) emits a watermark (if necessary) to downstream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will separate the emitOuptutWatermark and keep this method name.

final Instant processingTime,
final Instant synchronizedTime) {
long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;

for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
final K key = entry.getKey();
final List<WindowedValue<InputT>> values = entry.getValue();
Expand All @@ -143,27 +149,56 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark,
}

// Trigger timers
final long minOutputTimestamp =
triggerTimers(key, inputWatermark, processingTime, synchronizedTime);

minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
triggerTimers(key, inputWatermark, processingTime, synchronizedTime);

// Remove values
values.clear();
}
}

// Emit watermark to downstream operators
if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
&& currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows));
/**
* Output watermark
* = max(prev output watermark,
* min(input watermark, watermark holds)).
* @param inputWatermark input watermark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a brief note on the guarantees about the input watermark?

Something like:
{@ link InputWatermarkManager} guarantees the input watermark here to be the minimum value of the watermarks we've seen from all of the input streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already written in Transform.onWatermark :)

*/
private void emitOutputWatermark(final Watermark inputWatermark) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, can you check all of the three watermarks that make up the output watermark?

// prev output watermark - No need to check (?)
// inputWatermark - check for Long.MIN_VALUE
// keyAndWatermarkHoldMap - check emptiness

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check Long.MIN_VALUE for inputWatermark?

if (keyAndWatermarkHoldMap.isEmpty()) {
return;
}

// Find min watermark hold
final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values());

if (LOG.isDebugEnabled()) {
LOG.debug("Watermark hold: {}, "
+ "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
}

final Watermark outputWatermarkCandidate = new Watermark(
Math.max(prevOutputWatermark.getTimestamp(),
Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));

if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
// progress!
prevOutputWatermark = outputWatermarkCandidate;
// emit watermark
getOutputCollector().emitWatermark(outputWatermarkCandidate);
// Remove minimum watermark holds
if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
keyAndWatermarkHoldMap.entrySet()
.removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
}
}
}

@Override
public void onWatermark(final Watermark inputWatermark) {
checkAndInvokeBundle();
processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
// Emit watermark to downstream operators
emitOutputWatermark(inputWatermark);
checkAndFinishBundle();
}

Expand All @@ -176,6 +211,8 @@ protected void beforeClose() {
// Finish any pending windows by advancing the input watermark to infinity.
processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
// Emit watermark to downstream operators
emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
}

/**
Expand All @@ -185,10 +222,8 @@ protected void beforeClose() {
* @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
* @return the minimum output timestamp.
* If no data is emitted, it returns Long.MAX_VALUE.
*/
private long triggerTimers(final K key,
private void triggerTimers(final K key,
final Watermark watermark,
final Instant processingTime,
final Instant synchronizedTime) {
Expand All @@ -204,10 +239,7 @@ private long triggerTimers(final K key,

final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);

if (timerDataList.isEmpty()) {
return Long.MAX_VALUE;
} else {

if (!timerDataList.isEmpty()) {
// Trigger timers and emit windowed data
final KeyedWorkItem<K, InputT> timerWorkItem =
KeyedWorkItems.timersWorkItem(key, timerDataList);
Expand All @@ -223,8 +255,6 @@ private long triggerTimers(final K key,
}

timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));

return keyOutputTimestamp;
}
}

Expand Down Expand Up @@ -320,4 +350,33 @@ public TimerInternals timerInternalsForKey(final K key) {
return stateAndTimerForKey.timerInternals;
}
}

/**
* This class wraps the output collector to track the watermark hold of each key.
*/
final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
this.outputCollector = outputCollector;
}

@Override
public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
// adds the output timestamp to the watermark hold of each key
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an element to belong to a window [0~5000) then its timestamp must be one of 0, 1, ... , 4999.
If you do +1 to an element with timestamp 4999, would that make the element belong to a different window [5000, 10000)?
Please let me know if I'm missing something here.

Copy link
Contributor Author

@taegeonum taegeonum Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Window [0-5000) is triggered at watermark 5000. This means that if we receive [0~5000) window as an output, the watermark is >= 5000. But the window output timestamp is 4999, so I added + 1 here

// TODO #270: consider early firing
// TODO #270: This logic may not be applied to early firing outputs
keyAndWatermarkHoldMap.put(output.getValue().getKey(),
new Watermark(output.getTimestamp().getMillis() + 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reference to this? (e.g., another Beam runner doing also the same)

In the case of following sequence of elements/watermark with (timestamp):

e1(1), e2(2), w1(3), e3(3)

With the +1 we see it as

e1(2=1+1), e2(3=2+1), w1(3), e3(4=3+1)

Would that result in an incorrect reordering of e2 and w1 (in the time-scale)?

Copy link
Contributor Author

@taegeonum taegeonum Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output here is windowed values, not all elements. Windowed value has its own timestamp value. This is different from the element timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO here to check for different triggers?
https://issues.apache.org/jira/browse/NEMO-270

In the following case

  • W-4 / E-3 / E-2 / E-1 / E-1 --> GBKW
  • All elements are in the window of [1~4)

(1) If triggered at watermark W-4 this works perfectly
(2) If triggered early at E-1, then the downstream operators will consider the following E-1 late.

NEMO-270 can check for the correctness of (2)

outputCollector.emit(output);
}
@Override
public void emitWatermark(final Watermark watermark) {
outputCollector.emitWatermark(watermark);
}
@Override
public <T> void emit(final String dstVertexId, final T output) {
outputCollector.emit(dstVertexId, output);
}
}
}