Skip to content

Commit

Permalink
Merge pull request #17262: [BEAM-14244] Use the supplied output times…
Browse files Browse the repository at this point in the history
…tamp for processing time timers rather than the input watermark
  • Loading branch information
steveniemitz committed Apr 7, 2022
1 parent a239114 commit e596abf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -197,28 +198,10 @@ public <KeyT> void onTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {

// The effective timestamp is when derived elements will have their timestamp set, if not
// otherwise specified. If this is an event time timer, then they have the timer's output
// timestamp. Otherwise, they are set to the input timestamp, which is by definition
// non-late.
Instant effectiveTimestamp;
switch (timeDomain) {
case EVENT_TIME:
effectiveTimestamp = outputTimestamp;
break;
case PROCESSING_TIME:
case SYNCHRONIZED_PROCESSING_TIME:
effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime();
break;

default:
throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
}
Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");

OnTimerArgumentProvider<KeyT> argumentProvider =
new OnTimerArgumentProvider<>(
timerId, key, window, timestamp, effectiveTimestamp, timeDomain);
new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain);
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
}

Expand Down Expand Up @@ -1217,7 +1200,7 @@ public Timer align(Duration period) {
}

/**
* For event time timers the target time should be prior to window GC time. So it return
* For event time timers the target time should be prior to window GC time. So it returns
* min(time to set, GC Time of window).
*/
private Instant minTargetAndGcTime(Instant target) {
Expand All @@ -1237,13 +1220,12 @@ public Timer withOutputTimestamp(Instant outputTimestamp) {
}

/**
*
*
* <ul>
* Ensures that:
* <li>Users can't set {@code outputTimestamp} for processing time timers.
* <li>Event time timers' {@code outputTimestamp} is set before window expiration.
* </ul>
* Ensures that a timer's {@code outputTimestamp} is set at or after the current input timestamp
* (minus allowed timestamp skew if set) and before the max timestamp of the window (plus
* allowed lateness). <br>
* If the outputTimestamp is not set, it is defaulted to either:
* <li>The firing timestamp for timers in the {@link TimeDomain#EVENT_TIME}
* <li>The current element timestamp for other time domains.
*/
private void setAndVerifyOutputTimestamp() {
if (outputTimestamp != null) {
Expand All @@ -1261,30 +1243,26 @@ private void setAndVerifyOutputTimestamp() {
+ "earlier than the timestamp of the current input or timer (%s) minus the "
+ "allowed skew (%s) and no later than %s. See the "
+ "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the "
+ "allowed skew.details on changing the allowed skew.",
+ "allowed skew.",
outputTimestamp,
elementInputTimestamp,
fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? fn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
}

// Output timestamp is set to the delivery time if not initialized by an user.
if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
} else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
// The outputTimestamp was unset and this is a timer in the EVENT_TIME domain. The output
// timestamp will be the firing timestamp.
outputTimestamp = target;
}
// For processing timers
if (outputTimestamp == null) {
// For processing timers output timestamp will be:
// 1) timestamp of input element
// OR
// 2) output timestamp of firing timer.
} else {
// The outputTimestamp was unset and this is a timer in the PROCESSING_TIME
// (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will be the timestamp of
// the element (or timer) setting this timer.
outputTimestamp = elementInputTimestamp;
}

Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness);
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
checkArgument(
!outputTimestamp.isAfter(windowExpiry),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,13 +1025,26 @@ protected void fireTimer(TimerData timerData) {
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.onFiredOrDeletedTimer(timerData);
Instant effectiveOutputTimestamp;

if (timerData.getDomain() == TimeDomain.EVENT_TIME) {
effectiveOutputTimestamp = timerData.getOutputTimestamp();
} else {
// Flink does not set a watermark hold for the timer's output timestamp, and previous to
// https://github.com/apache/beam/pull/17262 processing time timers did not correctly emit
// elements at their output timestamp. In this case we need to continue doing the wrong thing
// and using the output watermark rather than the firing timestamp. Once flink correctly sets
// a watermark hold for the output timestamp, this should be changed back.
effectiveOutputTimestamp = timerInternals.currentOutputWatermarkTime();
}

pushbackDoFnRunner.onTimer(
timerData.getTimerId(),
timerData.getTimerFamilyId(),
keyedStateInternals.getKey(),
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
effectiveOutputTimestamp,
timerData.getDomain());
}

Expand Down

0 comments on commit e596abf

Please sign in to comment.