Skip to content

Commit

Permalink
Merge pull request #11407 from boyuanzz/release-2.21.0
Browse files Browse the repository at this point in the history
[BEAM-9562] Cherry-pick: Fix output timestamp to be inferred from scheduled time w…
  • Loading branch information
ibzib committed Apr 13, 2020
2 parents d8a4a6d + 371f6a7 commit 6a49eec
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
22 changes: 22 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,34 @@ public interface Timer {
* <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing
* time timers are ignored after a window has expired. Instead, it is recommended to use {@link
* #setRelative()}.
*
* <p>If the {@link #withOutputTimestamp output timestamp} has not been explicitly set then the
* default output timestamp per {@link TimeDomain} is:
*
* <ul>
* <li>{@link TimeDomain#EVENT_TIME}: the firing time of this new timer.
* <li>{@link TimeDomain#PROCESSING_TIME}: current element's timestamp or current timer's output
* timestamp.
* <li>{@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME}: current element's timestamp or current
* timer's output timestamp.
* </ul>
*/
void set(Instant absoluteTime);

/**
* Sets the timer relative to the current time, according to any offset and alignment specified.
* Using {@link #offset(Duration)} and {@link #align(Duration)}.
*
* <p>If the {@link #withOutputTimestamp output timestamp} has not been explicitly set then the
* default output timestamp per {@link TimeDomain} is:
*
* <ul>
* <li>{@link TimeDomain#EVENT_TIME}: the firing time of this new timer.
* <li>{@link TimeDomain#PROCESSING_TIME}: current element's timestamp or current timer's output
* timestamp.
* <li>{@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME}: current element's timestamp or current
* timer's output timestamp.
* </ul>
*/
void setRelative();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,10 +885,11 @@ private class FnApiTimer<K> implements org.apache.beam.sdk.state.Timer {
private final TimeDomain timeDomain;
private final Duration allowedLateness;
private final Instant fireTimestamp;
private Instant holdTimestamp;
private final Instant elementTimestampOrTimerHoldTimestamp;
private final BoundedWindow boundedWindow;
private final PaneInfo paneInfo;

private Instant outputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;

Expand All @@ -897,13 +898,13 @@ private class FnApiTimer<K> implements org.apache.beam.sdk.state.Timer {
K userKey,
String dynamicTimerTag,
BoundedWindow boundedWindow,
Instant initialHoldTimestamp,
Instant elementTimestampOrTimerHoldTimestamp,
Instant elementTimestampOrTimerFireTimestamp,
PaneInfo paneInfo) {
this.timerId = timerId;
this.userKey = userKey;
this.dynamicTimerTag = dynamicTimerTag;
this.holdTimestamp = initialHoldTimestamp;
this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
this.boundedWindow = boundedWindow;
this.paneInfo = paneInfo;

Expand Down Expand Up @@ -990,15 +991,7 @@ public org.apache.beam.sdk.state.Timer align(Duration period) {

@Override
public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
checkArgument(
!outputTime.isAfter(windowExpiry),
"Attempted to set timer with output timestamp %s but that is after"
+ " the expiration of window %s",
outputTime,
windowExpiry);

this.holdTimestamp = outputTime;
this.outputTimestamp = outputTime;
return this;
}
/**
Expand All @@ -1016,6 +1009,51 @@ private Instant minTargetAndGcTime(Instant target) {
}

private void output(Instant scheduledTime) {
if (outputTimestamp != null) {
checkArgument(
!outputTimestamp.isBefore(elementTimestampOrTimerHoldTimestamp),
"output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
outputTimestamp,
elementTimestampOrTimerHoldTimestamp);
}

// Output timestamp is set to the delivery time if not initialized by an user.
if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(timeDomain)) {
outputTimestamp = scheduledTime;
}

// For processing timers
if (outputTimestamp == null) {
// For processing timers output timestamp will be:
// 1) timestamp of input element
// OR
// 2) hold timestamp of firing timer.
outputTimestamp = elementTimestampOrTimerHoldTimestamp;
}

Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
checkArgument(
!outputTimestamp.isAfter(scheduledTime),
"Attempted to set an event-time timer with an output timestamp of %s that is"
+ " after the timer firing timestamp %s",
outputTimestamp,
scheduledTime);
checkArgument(
!scheduledTime.isAfter(windowExpiry),
"Attempted to set an event-time timer with a firing timestamp of %s that is"
+ " after the expiration of window %s",
scheduledTime,
windowExpiry);
} else {
checkArgument(
!outputTimestamp.isAfter(windowExpiry),
"Attempted to set a processing-time timer with an output timestamp of %s that is"
+ " after the expiration of window %s",
outputTimestamp,
windowExpiry);
}

TimerHandler<K> consumer = (TimerHandler) timerHandlers.get(timerId);
try {
consumer.accept(
Expand All @@ -1024,7 +1062,7 @@ private void output(Instant scheduledTime) {
dynamicTimerTag,
Collections.singletonList(boundedWindow),
scheduledTime,
holdTimestamp,
outputTimestamp,
paneInfo));
} catch (Throwable t) {
throw UserCodeException.wrap(t);
Expand Down

0 comments on commit 6a49eec

Please sign in to comment.