From 40c4a5cb6eaa0350a26fe1f215eb812541a7b105 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sun, 12 Feb 2017 15:03:48 -0800 Subject: [PATCH 1/8] Reify delay and alignment in AfterProcessingTime transform --- ...fterDelayFromFirstElementStateMachine.java | 4 +- .../AfterProcessingTimeStateMachine.java | 2 + ...ynchronizedProcessingTimeStateMachine.java | 1 + .../core/triggers/TriggerStateMachines.java | 36 ++- .../triggers/TriggerStateMachinesTest.java | 7 +- .../windowing/AfterDelayFromFirstElement.java | 240 ------------------ .../windowing/AfterProcessingTime.java | 105 ++++++-- .../AfterSynchronizedProcessingTime.java | 25 +- .../windowing/TimestampTransform.java | 64 +++++ .../windowing/AfterProcessingTimeTest.java | 2 +- 10 files changed, 192 insertions(+), 294 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index b72064419564..4444c227a1fa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -47,6 +47,8 @@ * *

This class is for internal use only and may change at any time. */ +// This class should be inlined to subclasses and deleted, simplifying them too +// https://issues.apache.org/jira/browse/BEAM-1486 @Experimental(Experimental.Kind.TRIGGER) public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine { @@ -250,7 +252,7 @@ protected Instant computeTargetTimestamp(Instant time) { /** * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. */ - private static final class DelayFn implements SerializableFunction { + static final class DelayFn implements SerializableFunction { private final Duration delay; public DelayFn(Duration delay) { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java index 2490463eaa40..eaf5613cc795 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java @@ -34,6 +34,8 @@ * AfterDelayFromFirstElementStateMachine#plusDelayOf} or {@link * AfterDelayFromFirstElementStateMachine#alignedTo}. */ +// The superclass should be inlined here, its only real use +// https://issues.apache.org/jira/browse/BEAM-1486 @Experimental(Experimental.Kind.TRIGGER) public class AfterProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java index 000f6e7407c1..1319a130c7bd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; +// This should not really have the superclass https://issues.apache.org/jira/browse/BEAM-1486 class AfterSynchronizedProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java index 1be79814265c..f0e9d213766d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -18,14 +18,16 @@ package org.apache.beam.runners.core.triggers; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; +import javax.annotation.Nonnull; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; import org.apache.beam.sdk.transforms.windowing.AfterEach; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -36,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.TimestampTransform; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ReshuffleTrigger; @@ -171,18 +174,37 @@ private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) { } private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) { - return evaluateSpecific((AfterDelayFromFirstElement) v); - } - - private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) { return new AfterDelayFromFirstElementStateMachineAdapter(v); } private static class AfterDelayFromFirstElementStateMachineAdapter extends AfterDelayFromFirstElementStateMachine { - public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) { - this(v.getTimeDomain(), v.getTimestampMappers()); + private static final Function> + CONVERT_TIMESTAMP_TRANSFORM = + new Function>() { + @Override + public SerializableFunction apply( + @Nonnull TimestampTransform transform) { + if (transform instanceof TimestampTransform.Delay) { + return new DelayFn(((TimestampTransform.Delay) transform).getDelay()); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + return new AlignFn(alignTo.getPeriod(), alignTo.getOffset()); + } else { + throw new IllegalArgumentException( + String.format( + "Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); + } + } + }; + + public AfterDelayFromFirstElementStateMachineAdapter(AfterProcessingTime v) { + this( + TimeDomain.PROCESSING_TIME, + FluentIterable.from(v.getTimestampTransforms()) + .transform(CONVERT_TIMESTAMP_TRANSFORM) + .toList()); } private AfterDelayFromFirstElementStateMachineAdapter( diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java index 37f8f10a4e5e..26c0597e9d20 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java @@ -23,7 +23,6 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; import org.apache.beam.sdk.transforms.windowing.AfterEach; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -64,7 +63,7 @@ public void testStateMachineForAfterProcessingTime() { Duration minutes = Duration.standardMinutes(94); Duration hours = Duration.standardHours(13); - AfterDelayFromFirstElement trigger = + AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours); AfterDelayFromFirstElementStateMachine machine = @@ -72,10 +71,6 @@ public void testStateMachineForAfterProcessingTime() { TriggerStateMachines.stateMachineForOnceTrigger(trigger); assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME)); - - // This equality is function equality, but due to the structure of the code (no serialization) - // it is OK to check - assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers())); } @Test diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java deleted file mode 100644 index da6d74ac77ff..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import java.util.Locale; -import java.util.Objects; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TimeDomain; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; -import org.joda.time.format.PeriodFormatter; - -/** - * A base class for triggers that happen after a processing time delay from the arrival - * of the first element in a pane. - * - *

This class is for internal use only and may change at any time. - */ -@Experimental(Experimental.Kind.TRIGGER) -public abstract class AfterDelayFromFirstElement extends OnceTrigger { - - protected static final List> IDENTITY = - ImmutableList.>of(); - - private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); - - /** - * To complete an implementation, return a new instance like this one, but incorporating - * the provided timestamp mapping functions. Generally should be used by calling the - * constructor of this class from the constructor of the subclass. - */ - protected abstract AfterDelayFromFirstElement newWith( - List> transform); - - /** - * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The - * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`, - * implemented via #computeTargetTimestamp - */ - protected final List> timestampMappers; - - private final TimeDomain timeDomain; - - public AfterDelayFromFirstElement( - TimeDomain timeDomain, - List> timestampMappers) { - super(null); - this.timestampMappers = timestampMappers; - this.timeDomain = timeDomain; - } - - /** - * The time domain according for which this trigger sets timers. - */ - public TimeDomain getTimeDomain() { - return timeDomain; - } - - /** - * The mapping functions applied to the arrival time of an element to determine when to - * set a wake-up timer for triggering. - */ - public List> getTimestampMappers() { - return timestampMappers; - } - - /** - * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater - * than the timestamp. - * - *

TODO: Consider sharing this with FixedWindows, and bring over the equivalent of - * CalendarWindows. - */ - public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) { - return newWith(new AlignFn(size, offset)); - } - - /** - * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp - * since the epoch. - */ - public AfterDelayFromFirstElement alignedTo(final Duration size) { - return alignedTo(size, new Instant(0)); - } - - /** - * Adds some delay to the original target time. - * - * @param delay the delay to add - * @return An updated time trigger that will wait the additional time before firing. - */ - public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { - return newWith(new DelayFn(delay)); - } - - /** - * @deprecated This will be removed in the next major version. Please use only - * {@link #plusDelayOf} and {@link #alignedTo}. - */ - @Deprecated - public OnceTrigger mappedTo(SerializableFunction timestampMapper) { - return newWith(timestampMapper); - } - - @Override - public boolean isCompatible(Trigger other) { - if (!getClass().equals(other.getClass())) { - return false; - } - - AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other; - return this.timestampMappers.equals(that.timestampMappers); - } - - - private AfterDelayFromFirstElement newWith( - SerializableFunction timestampMapper) { - return newWith( - ImmutableList.>builder() - .addAll(timestampMappers) - .add(timestampMapper) - .build()); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - protected Instant computeTargetTimestamp(Instant time) { - Instant result = time; - for (SerializableFunction timestampMapper : timestampMappers) { - result = timestampMapper.apply(result); - } - return result; - } - - /** - * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. - */ - private static final class DelayFn implements SerializableFunction { - private final Duration delay; - - public DelayFn(Duration delay) { - this.delay = delay; - } - - @Override - public Instant apply(Instant input) { - return input.plus(delay); - } - - @Override - public boolean equals(Object object) { - if (object == this) { - return true; - } - - if (!(object instanceof DelayFn)) { - return false; - } - - return this.delay.equals(((DelayFn) object).delay); - } - - @Override - public int hashCode() { - return Objects.hash(delay); - } - - @Override - public String toString() { - return PERIOD_FORMATTER.print(delay.toPeriod()); - } - } - - /** - * A {@link SerializableFunction} to align an instant to the nearest interval boundary. - */ - static final class AlignFn implements SerializableFunction { - private final Duration size; - private final Instant offset; - - - /** - * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater - * than the timestamp. - */ - public AlignFn(Duration size, Instant offset) { - this.size = size; - this.offset = offset; - } - - @Override - public Instant apply(Instant point) { - long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis(); - return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart); - } - - @Override - public boolean equals(Object object) { - if (object == this) { - return true; - } - - if (!(object instanceof AlignFn)) { - return false; - } - - AlignFn other = (AlignFn) object; - return other.size.equals(this.size) - && other.offset.equals(this.offset); - } - - @Override - public int hashCode() { - return Objects.hash(size, offset); - } - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index 09f288e63949..eda269adf248 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -17,40 +17,87 @@ */ package org.apache.beam.sdk.transforms.windowing; +import com.google.common.collect.ImmutableList; +import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Duration; import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; +import org.joda.time.format.PeriodFormatter; /** * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in * the real-time domain. - * - *

The time at which to fire the timer can be adjusted via the methods in - * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or - * {@link AfterDelayFromFirstElement#alignedTo}. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterProcessingTime extends AfterDelayFromFirstElement { +public class AfterProcessingTime extends OnceTrigger { + + private static final PeriodFormatter DURATION_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); + + private final List timestampTransforms; - private AfterProcessingTime(List> transforms) { - super(TimeDomain.PROCESSING_TIME, transforms); + public AfterProcessingTime(List timestampTransforms) { + super(null); + this.timestampTransforms = timestampTransforms; } /** - * Creates a trigger that fires when the current processing time passes the processing time - * at which this trigger saw the first element in a pane. + * Creates a trigger that fires when the current processing time passes the processing time at + * which this trigger saw the first element in a pane. */ public static AfterProcessingTime pastFirstElementInPane() { - return new AfterProcessingTime(IDENTITY); + return new AfterProcessingTime(Collections.emptyList()); + } + + /** + * The transforms applied to the arrival time of an element to determine when this trigger allows + * output. + */ + public List getTimestampTransforms() { + return timestampTransforms; + } + + /** + * Adds some delay to the original target time. + * + * @param delay the delay to add + * @return An updated time trigger that will wait the additional time before firing. + */ + public AfterProcessingTime plusDelayOf(final Duration delay) { + return new AfterProcessingTime( + ImmutableList.builder() + .addAll(timestampTransforms) + .add(TimestampTransform.delay(delay)) + .build()); + } + + /** + * Aligns timestamps to the smallest multiple of {@code period} since the {@code offset} greater + * than the timestamp. + */ + public AfterProcessingTime alignedTo(final Duration period, final Instant offset) { + return new AfterProcessingTime( + ImmutableList.builder() + .addAll(timestampTransforms) + .add(TimestampTransform.alignTo(period, offset)) + .build()); + } + + /** + * Aligns the time to be the smallest multiple of {@code period} greater than the epoch + * boundary (aka {@code new Instant(0)}). + */ + public AfterProcessingTime alignedTo(final Duration period) { + return alignedTo(period, new Instant(0)); } @Override - protected AfterProcessingTime newWith( - List> transforms) { - return new AfterProcessingTime(transforms); + public boolean isCompatible(Trigger other) { + return this.equals(other); } @Override @@ -60,17 +107,28 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { @Override protected Trigger getContinuationTrigger(List continuationTriggers) { - return new AfterSynchronizedProcessingTime(); + return AfterSynchronizedProcessingTime.ofFirstElement(); } @Override public String toString() { StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); - for (SerializableFunction delayFn : timestampMappers) { - builder - .append(".plusDelayOf(") - .append(delayFn) - .append(")"); + for (TimestampTransform transform : getTimestampTransforms()) { + if (transform instanceof TimestampTransform.Delay) { + TimestampTransform.Delay delay = (TimestampTransform.Delay) transform; + builder + .append(".plusDelayOf(") + .append(DURATION_FORMATTER.print(delay.getDelay().toPeriod())) + .append(")"); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + builder + .append(".alignedTo(") + .append(DURATION_FORMATTER.print(alignTo.getPeriod().toPeriod())) + .append(", ") + .append(alignTo.getOffset()) + .append(")"); + } } return builder.toString(); @@ -84,12 +142,13 @@ public boolean equals(Object obj) { if (!(obj instanceof AfterProcessingTime)) { return false; } + AfterProcessingTime that = (AfterProcessingTime) obj; - return Objects.equals(this.timestampMappers, that.timestampMappers); + return getTimestampTransforms().equals(that.getTimestampTransforms()); } @Override public int hashCode() { - return Objects.hash(getClass(), this.timestampMappers); + return Objects.hash(getTimestampTransforms()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index b6258f892936..624995437197 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -18,21 +18,22 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.common.base.Objects; -import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; /** - * A trigger that fires after synchronized processing time has reached a shared - * threshold between upstream workers. + * A trigger that fires after synchronized processing time has reached the processing time of the + * first element's arrival. + * + *

This is for internal, primarily as a "continuation trigger" for {@link AfterProcessingTime} + * triggers. In that use, this trigger is ready as soon as all upstream workers processing time + * clocks have caught up to the moment that input arrived. */ -public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { +public class AfterSynchronizedProcessingTime extends OnceTrigger { public AfterSynchronizedProcessingTime() { - super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - Collections.>emptyList()); + super(null); } @Override @@ -59,12 +60,4 @@ public boolean equals(Object obj) { public int hashCode() { return Objects.hashCode(AfterSynchronizedProcessingTime.class); } - - @Override - protected AfterSynchronizedProcessingTime - newWith(List> transforms) { - // ignore transforms - return this; - } - } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java new file mode 100644 index 000000000000..b16e968e3d14 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.auto.value.AutoValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** An abstract description of a standardized transformation on timestamps. */ +public abstract class TimestampTransform { + + /** Returns a transform that shifts a timestamp later by {@code delay}. */ + public static TimestampTransform delay(Duration delay) { + return new AutoValue_TimestampTransform_Delay(delay); + } + + /** + * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * from {@code offset}. + */ + public static TimestampTransform alignTo(Duration period, Instant offset) { + return new AutoValue_TimestampTransform_AlignTo(period, offset); + } + + /** + * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * from the start of the epoch. + */ + public static TimestampTransform alignTo(Duration period) { + return alignTo(period, new Instant(0)); + } + + /** + * Represents the transform that aligns a timestamp to the next boundary of {@link #getPeriod()} + * start at {@link #getOffset()}. + */ + @AutoValue + public abstract static class AlignTo extends TimestampTransform { + public abstract Duration getPeriod(); + + public abstract Instant getOffset(); + } + + /** Represents the transform that delays a timestamp by {@link #getDelay()}. */ + @AutoValue + public abstract static class Delay extends TimestampTransform { + public abstract Duration getDelay(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 4984d7ce75a1..006ff0053597 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -45,7 +45,7 @@ public void testContinuation() throws Exception { OnceTrigger firstElementPlus1 = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)); assertEquals( - new AfterSynchronizedProcessingTime(), + AfterSynchronizedProcessingTime.ofFirstElement(), firstElementPlus1.getContinuationTrigger()); } From 5d5602dbff41ef48add2ea763527f8c0901f0bc0 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sun, 12 Feb 2017 15:53:17 -0800 Subject: [PATCH 2/8] Upgrade Java triggers to support runner API deserialization --- .../AfterSynchronizedProcessingTimeStateMachine.java | 6 +++++- .../runners/core/triggers/TriggerStateMachines.java | 2 +- ...fterSynchronizedProcessingTimeStateMachineTest.java | 3 ++- .../apache/beam/sdk/transforms/windowing/AfterAll.java | 7 +++++++ .../beam/sdk/transforms/windowing/AfterEach.java | 7 +++++++ .../beam/sdk/transforms/windowing/AfterFirst.java | 7 +++++++ .../windowing/AfterSynchronizedProcessingTime.java | 10 +++++++--- .../windowing/AfterSynchronizedProcessingTimeTest.java | 2 +- 8 files changed, 37 insertions(+), 7 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java index 1319a130c7bd..07fab223739c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java @@ -28,13 +28,17 @@ // This should not really have the superclass https://issues.apache.org/jira/browse/BEAM-1486 class AfterSynchronizedProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { + public static AfterSynchronizedProcessingTimeStateMachine ofFirstElement() { + return new AfterSynchronizedProcessingTimeStateMachine(); + } + @Override @Nullable public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) { return context.currentSynchronizedProcessingTime(); } - public AfterSynchronizedProcessingTimeStateMachine() { + private AfterSynchronizedProcessingTimeStateMachine() { super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Collections.>emptyList()); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java index f0e9d213766d..b13ac4016307 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -116,7 +116,7 @@ private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) { } private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) { - return new AfterSynchronizedProcessingTimeStateMachine(); + return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement(); } private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) { diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java index 140bd62ecd1c..7bfd48d2d9b4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java @@ -36,7 +36,8 @@ @RunWith(JUnit4.class) public class AfterSynchronizedProcessingTimeStateMachineTest { - private TriggerStateMachine underTest = new AfterSynchronizedProcessingTimeStateMachine(); + private TriggerStateMachine underTest = + AfterSynchronizedProcessingTimeStateMachine.ofFirstElement(); @Test public void testAfterProcessingTimeWithFixedWindows() throws Exception { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index c3f084893a4c..2747311dff98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -44,6 +44,13 @@ public static AfterAll of(OnceTrigger... triggers) { return new AfterAll(Arrays.asList(triggers)); } + /** + * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. + */ + public static AfterAll of(List triggers) { + return new AfterAll(triggers); + } + @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the latest of its sub-triggers. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 872ad46e16d5..56a9d14a2487 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -57,6 +57,13 @@ public static AfterEach inOrder(Trigger... triggers) { return new AfterEach(Arrays.asList(triggers)); } + /** + * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. + */ + public static AfterEach inOrder(List triggers) { + return new AfterEach(triggers); + } + @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire at least once when the first trigger in the sequence diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index a742b43d7253..79fd6391d60a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -45,6 +45,13 @@ public static AfterFirst of(OnceTrigger... triggers) { return new AfterFirst(Arrays.asList(triggers)); } + /** + * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. + */ + public static AfterFirst of(List triggers) { + return new AfterFirst(triggers); + } + @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the earliest of its sub-triggers. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index 624995437197..f9826996ec47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -23,8 +23,8 @@ import org.joda.time.Instant; /** - * A trigger that fires after synchronized processing time has reached the processing time of the - * first element's arrival. + * FOR INTERNAL USE ONLY. A trigger that fires after synchronized processing time has + * reached the processing time of the first element's arrival. * *

This is for internal, primarily as a "continuation trigger" for {@link AfterProcessingTime} * triggers. In that use, this trigger is ready as soon as all upstream workers processing time @@ -32,7 +32,11 @@ */ public class AfterSynchronizedProcessingTime extends OnceTrigger { - public AfterSynchronizedProcessingTime() { + public static AfterSynchronizedProcessingTime ofFirstElement() { + return new AfterSynchronizedProcessingTime(); + } + + private AfterSynchronizedProcessingTime() { super(null); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java index 49d44c55dfe4..e2cfdd2d85ff 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java @@ -30,7 +30,7 @@ @RunWith(JUnit4.class) public class AfterSynchronizedProcessingTimeTest { - private Trigger underTest = new AfterSynchronizedProcessingTime(); + private Trigger underTest = AfterSynchronizedProcessingTime.ofFirstElement(); @Test public void testFireDeadline() throws Exception { From 67854e66e97889b3f3e03de297e5af5b73c3fab1 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sat, 11 Feb 2017 16:48:05 -0800 Subject: [PATCH 3/8] Fix typo in runner API generated Java class --- sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 195ce01dad87..370b57c7b52e 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -25,7 +25,7 @@ syntax = "proto3"; package org.apache.beam.runner_api.v1; -option java_package = "org.apache.beam.sdks.common.runner_api.v1"; +option java_package = "org.apache.beam.sdk.common.runner_api.v1"; option java_outer_classname = "RunnerApi"; import "google/protobuf/any.proto"; From 661cd8d7407d5f414d5d94badacdeadb519107b7 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sat, 11 Feb 2017 17:32:21 -0800 Subject: [PATCH 4/8] Flesh out triggers in Runner API proto --- .../src/main/proto/beam_runner_api.proto | 109 +++++++++++++----- 1 file changed, 83 insertions(+), 26 deletions(-) diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 370b57c7b52e..91f155800e31 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -29,7 +29,6 @@ option java_package = "org.apache.beam.sdk.common.runner_api.v1"; option java_outer_classname = "RunnerApi"; import "google/protobuf/any.proto"; -import "google/protobuf/timestamp.proto"; // A Pipeline is a hierarchical graph of PTransforms, linked // by PCollections. @@ -402,6 +401,24 @@ enum OutputTime { EARLIEST_IN_PANE = 2; } +// The different time domains in the Beam model. +enum TimeDomain { + + // Event time is time from the perspective of the data + EVENT_TIME = 0; + + // Processing time is time from the perspective of the + // execution of your pipeline + PROCESSING_TIME = 1; + + // Synchronized processing time is the minimum of the + // processing time of all pending elements. + // + // The "processing time" of an element refers to + // the local processing time at which it was emitted + SYNCHRONIZED_PROCESSING_TIME = 2; +} + // A small DSL for expressing when to emit new aggregations // from a GroupByKey or CombinePerKey // @@ -439,27 +456,31 @@ message Trigger { } // After input arrives, ready when the specified delay has passed. - message AfterProcessingTimeDelay { - // (Required) The delay, in milliseconds. - int64 delay_millis = 1; + message AfterProcessingTime { + + // (Required) The transforms to apply to an arriving element's timestamp, + // in order + repeated TimestampTransform timestamp_transforms = 1; } - // After input arrives, ready when the synchronized processing time - // progresses as far as the given delay. - message AfterSynchronizedProcessingTimeDelay { - // (Required) The delay, in milliseconds. - int64 delay_millis = 1; + // Ready whenever upstream processing time has all caught up with + // the arrival time of an input element + message AfterSynchronizedProcessingTime { + } + + // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but + // specially denoted to indicate the user did not alter the triggering. + message Default { + } + + // Ready whenever the requisite number of input elements have arrived + message ElementCount { + int32 element_count = 1; } // Never ready. There will only be an ON_TIME output and a final // output at window expiration. - message Never { } - - // Ready whenever the subtrigger is ready; resets state when the subtrigger - // completes. - message Repeat { - // (Require) Trigger that is run repeatedly. - Trigger subtrigger = 1; + message Never { } // Ready whenever either of its subtriggers are ready, but finishes output @@ -473,9 +494,12 @@ message Trigger { Trigger finally = 2; } - // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but - // specially denoted to indicate the user did not alter the triggering. - message Default { } + // Ready whenever the subtrigger is ready; resets state when the subtrigger + // completes. + message Repeat { + // (Require) Trigger that is run repeatedly. + Trigger subtrigger = 1; + } // The full disjoint union of possible triggers. oneof trigger { @@ -483,12 +507,39 @@ message Trigger { AfterAny after_any = 2; AfterEach after_each = 3; AfterEndOfWindow after_end_of_widow = 4; - AfterProcessingTimeDelay after_processing_time_delay = 5; - AfterSynchronizedProcessingTimeDelay after_synchronized_processing_time_delay = 6; - Never never = 7; - Repeat repeat = 8; - OrFinally or_finally = 9; - Default default = 10; + AfterProcessingTime after_processing_time = 5; + AfterSynchronizedProcessingTime after_synchronized_processing_time = 6; + Default default = 7; + ElementCount element_count = 8; + Never never = 9; + OrFinally or_finally = 10; + Repeat repeat = 11; + } +} + +// A specification for a transformation on a timestamp. +// +// Primarily used by AfterProcessingTime triggers to transform +// the arrival time of input to a target time for firing. +message TimestampTransform { + oneof timestamp_transform { + Delay delay = 1; + AlignTo align_to = 2; + } + + message Delay { + // (Required) The delay, in milliseconds. + int64 delay_millis = 1; + } + + message AlignTo { + // (Required) A duration to which delays should be quantized + // in milliseconds. + int64 period = 3; + + // (Required) An offset from 0 for the quantization specified by + // alignment_size, in milliseconds + int64 offset = 4; } } @@ -633,6 +684,12 @@ message DisplayData { } enum Type { - STRING = 0; INTEGER = 1; FLOAT = 2; BOOLEAN = 3; TIMESTAMP = 4; DURATION = 5; JAVA_CLASS = 6; + STRING = 0; + INTEGER = 1; + FLOAT = 2; + BOOLEAN = 3; + TIMESTAMP = 4; + DURATION = 5; + JAVA_CLASS = 6; } } From 2803864ba689df92c993b0c4afc392df4558b6bf Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sat, 11 Feb 2017 16:47:15 -0800 Subject: [PATCH 5/8] Add runner API config to poms --- pom.xml | 8 ++++++++ sdks/java/core/pom.xml | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/pom.xml b/pom.xml index be756592e562..d53502e1e400 100644 --- a/pom.xml +++ b/pom.xml @@ -320,6 +320,7 @@ beam-sdks-common-fn-api ${project.version} + org.apache.beam beam-sdks-common-fn-api @@ -327,6 +328,13 @@ test-jar + + org.apache.beam + beam-sdks-common-runner-api + ${project.version} + + + org.apache.beam beam-runners-flink_2.10-examples diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 266e14495dc9..4f8955096325 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -182,6 +182,11 @@ + + org.apache.beam + beam-sdks-common-runner-api + + io.grpc grpc-auth From f4ceaeefe9e8e9d069b760e166c7057a00465360 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sat, 11 Feb 2017 17:50:27 -0800 Subject: [PATCH 6/8] Add conversion to/from Java SDK trigger to runner API proto --- .../sdk/transforms/windowing/Triggers.java | 313 ++++++++++++++++++ .../transforms/windowing/TriggersTest.java | 100 ++++++ 2 files changed, 413 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java new file mode 100644 index 000000000000..8ac904cc55ac --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** Utilities for working with {@link Triggers Triggers}. */ +@Experimental(Experimental.Kind.TRIGGER) +public class Triggers implements Serializable { + + @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter(); + + public static RunnerApi.Trigger toProto(Trigger trigger) { + return CONVERTER.convertTrigger(trigger); + } + + @VisibleForTesting + static class ProtoConverter { + + public RunnerApi.Trigger convertTrigger(Trigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return tryConvert(evaluationMethod, trigger); + } + + private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) { + try { + return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger); + } catch (InvocationTargetException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new RuntimeException(exc.getCause()); + } + } catch (IllegalAccessException exc) { + throw new IllegalStateException( + String.format("Internal error: could not invoke %s", evaluationMethod)); + } + } + + private Method getEvaluationMethod(Class clazz) { + try { + return getClass().getDeclaredMethod("convertSpecific", clazz); + } catch (NoSuchMethodException exc) { + throw new IllegalArgumentException( + String.format( + "Cannot translate trigger class %s to a runner-API proto.", + clazz.getCanonicalName()), + exc); + } + } + + private RunnerApi.Trigger convertSpecific(DefaultTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setDefault(RunnerApi.Trigger.Default.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterWatermark.FromEndOfWindow v) { + return RunnerApi.Trigger.newBuilder() + .setAfterEndOfWidow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(NeverTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setNever(RunnerApi.Trigger.Never.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) { + return RunnerApi.Trigger.newBuilder() + .setAfterSynchronizedProcessingTime( + RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance()) + .build(); + } + + private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) { + switch (timeDomain) { + case EVENT_TIME: + return RunnerApi.TimeDomain.EVENT_TIME; + case PROCESSING_TIME: + return RunnerApi.TimeDomain.PROCESSING_TIME; + case SYNCHRONIZED_PROCESSING_TIME: + return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + default: + throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); + } + } + + private RunnerApi.Trigger convertSpecific(AfterFirst v) { + RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterAll v) { + RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterPane v) { + return RunnerApi.Trigger.newBuilder() + .setElementCount( + RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount())) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) { + RunnerApi.Trigger.AfterEndOfWindow.Builder builder = + RunnerApi.Trigger.AfterEndOfWindow.newBuilder(); + + builder.setEarlyFirings(toProto(v.getEarlyTrigger())); + if (v.getLateTrigger() != null) { + builder.setLateFirings(toProto(v.getLateTrigger())); + } + + return RunnerApi.Trigger.newBuilder().setAfterEndOfWidow(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterEach v) { + RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(Repeatedly v) { + return RunnerApi.Trigger.newBuilder() + .setRepeat( + RunnerApi.Trigger.Repeat.newBuilder() + .setSubtrigger(toProto(v.getRepeatedTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setOrFinally( + RunnerApi.Trigger.OrFinally.newBuilder() + .setMain(toProto(v.getMainTrigger())) + .setFinally(toProto(v.getUntilTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) { + RunnerApi.Trigger.AfterProcessingTime.Builder builder = + RunnerApi.Trigger.AfterProcessingTime.newBuilder(); + + for (TimestampTransform transform : v.getTimestampTransforms()) { + builder.addTimestampTransforms(convertTimestampTransform(transform)); + } + + return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build(); + } + + private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) { + if (transform instanceof TimestampTransform.Delay) { + return RunnerApi.TimestampTransform.newBuilder() + .setDelay( + RunnerApi.TimestampTransform.Delay.newBuilder() + .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis())) + .build(); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + return RunnerApi.TimestampTransform.newBuilder() + .setAlignTo( + RunnerApi.TimestampTransform.AlignTo.newBuilder() + .setPeriod(alignTo.getPeriod().getMillis()) + .setOffset(alignTo.getOffset().getMillis())) + .build(); + + } else { + throw new IllegalArgumentException( + String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); + } + } + } + + public static Trigger fromProto(RunnerApi.Trigger triggerProto) { + switch (triggerProto.getTriggerCase()) { + case AFTER_ALL: + return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList())); + case AFTER_ANY: + return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList())); + case AFTER_EACH: + return AfterEach.inOrder( + protosToTriggers(triggerProto.getAfterEach().getSubtriggersList())); + case AFTER_END_OF_WIDOW: + RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWidow(); + + if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) { + return AfterWatermark.pastEndOfWindow(); + } + + // It either has early or late firings or both; our typing in Java makes this a smidge + // annoying + if (triggerProto.getAfterEndOfWidow().hasEarlyFirings()) { + AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWidow().getEarlyFirings())); + + if (triggerProto.getAfterEndOfWidow().hasLateFirings()) { + trigger = + trigger.withLateFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWidow().getLateFirings())); + } + return trigger; + } else { + // only late firings, so return directly + return AfterWatermark.pastEndOfWindow() + .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings())); + } + case AFTER_PROCESSING_TIME: + AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane(); + for (RunnerApi.TimestampTransform transform : + triggerProto.getAfterProcessingTime().getTimestampTransformsList()) { + switch (transform.getTimestampTransformCase()) { + case ALIGN_TO: + trigger = + trigger.alignedTo( + Duration.millis(transform.getAlignTo().getPeriod()), + new Instant(transform.getAlignTo().getOffset())); + break; + case DELAY: + trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis())); + break; + case TIMESTAMPTRANSFORM_NOT_SET: + throw new IllegalArgumentException( + String.format( + "Required field 'timestamp_transform' not set in %s", transform)); + default: + throw new IllegalArgumentException( + String.format( + "Unknown timestamp transform case: %s", + transform.getTimestampTransformCase())); + } + } + return trigger; + case AFTER_SYNCHRONIZED_PROCESSING_TIME: + return AfterSynchronizedProcessingTime.ofFirstElement(); + case ELEMENT_COUNT: + return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount()); + case NEVER: + return Never.ever(); + case OR_FINALLY: + return fromProto(triggerProto.getOrFinally().getMain()) + .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally())); + case REPEAT: + return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger())); + case DEFAULT: + return DefaultTrigger.of(); + case TRIGGER_NOT_SET: + throw new IllegalArgumentException( + String.format("Required field 'trigger' not set in %s", triggerProto)); + default: + throw new IllegalArgumentException( + String.format("Unknown trigger case: %s", triggerProto.getTriggerCase())); + } + } + + private static List protosToTriggers(List triggers) { + List result = Lists.newArrayList(); + for (RunnerApi.Trigger trigger : triggers) { + result.add(fromProto(trigger)); + } + return result; + } + + // Do not instantiate + private Triggers() {} +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java new file mode 100644 index 000000000000..0ac59660a2b0 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for utilities in {@link Triggers}. */ +@RunWith(Parameterized.class) +public class TriggersTest { + + @AutoValue + abstract static class ToProtoAndBackSpec { + abstract Trigger getTrigger(); + } + + private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) { + return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger); + } + + @Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.of( + // Atomic triggers + toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()), + toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)), + toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()), + toProtoAndBackSpec(Never.ever()), + toProtoAndBackSpec(DefaultTrigger.of()), + toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .alignedTo(Duration.millis(5), new Instant(27))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardSeconds(3)) + .alignedTo(Duration.millis(5), new Instant(27)) + .plusDelayOf(Duration.millis(13))), + + // Composite triggers + + toProtoAndBackSpec( + AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42))) + .withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterWatermark.pastEndOfWindow()))); + } + + @Parameter(0) + public ToProtoAndBackSpec toProtoAndBackSpec; + + @Test + public void testToProtoAndBack() throws Exception { + Trigger trigger = toProtoAndBackSpec.getTrigger(); + Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger)); + + assertThat(toProtoAndBackTrigger, equalTo(trigger)); + } +} From b6e9f73ae00813167c34b55459b9832e20d9aa41 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 14 Feb 2017 12:33:43 -0800 Subject: [PATCH 7/8] Remove underscore from Runner API proto Java package --- sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 91f155800e31..a9133abe3c5e 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -25,7 +25,7 @@ syntax = "proto3"; package org.apache.beam.runner_api.v1; -option java_package = "org.apache.beam.sdk.common.runner_api.v1"; +option java_package = "org.apache.beam.sdk.common.runner.v1"; option java_outer_classname = "RunnerApi"; import "google/protobuf/any.proto"; From e097b7575c0be830cf966bd179b3268eb463659e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 14 Feb 2017 19:30:56 -0800 Subject: [PATCH 8/8] Upgrade Dataflow container version to beam-master-20170214 --- runners/google-cloud-dataflow-java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index d366ddc7e031..ec1a92778072 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ jar - beam-master-20170208 + beam-master-20170214 6