From a20786d42b8707504bfc1ffc71f8af71843cfcd6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 24 May 2016 13:06:31 -0700 Subject: [PATCH 1/3] Improve the inheritance API for OutputTimeFn Previously, OutputTimeFn.Defaults enforced that any subclass *could not* return true for dependsOnlyOnWindow(), to encourage such subclasses to extend OutputTimeFn.DependsOnlyOnWindow. Unfortunately, this is at odds with compositional style, where all methods need to be forwarded. Further, OutputTimeFn itself was only instantiable via one of these subclasses, so there was no way to implement a proper forwarding. After this change, the API from OutputTimeFn to its subclasses is that of a simpler abstract superclass, and Defaults no longer restricts its subclasses. Note that we intend to remove OutputTimeFn as a userland function anyhow, replacing it with an enum and OutputTimeFn implementations in runners/core, so this is low risk. It is just to unblock some current work. --- .../beam/sdk/transforms/windowing/OutputTimeFn.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java index 7cf870ac1c37..497a6fbd0c98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java @@ -52,10 +52,7 @@ @Experimental(Experimental.Kind.OUTPUT_TIME) public abstract class OutputTimeFn implements Serializable { - /** - * Private constructor to prevent subclassing other than provided base classes. - */ - private OutputTimeFn() { } + protected OutputTimeFn() { } /** * Returns the output timestamp to use for data depending on the given @@ -179,11 +176,11 @@ public Instant merge(W resultWindow, Iterable mergingTimestam /** * {@inheritDoc} * - * @return {@code false}. An {@link OutputTimeFn} that depends only on the window should extend - * {@link OutputTimeFn.DependsOnlyOnWindow}. + * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the + * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}. */ @Override - public final boolean dependsOnlyOnWindow() { + public boolean dependsOnlyOnWindow() { return false; } From d94a6f15cf9f606495f5dfb1723333cfba5217f9 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 24 May 2016 13:10:38 -0700 Subject: [PATCH 2/3] Correctly forward dependsOnlyOnWindow(), etc, in WindowingStrategy Previously, WindowingStrategy contained an implemenation of OutputTimeFn that did not properly forward these important methods. --- .../org/apache/beam/sdk/util/WindowingStrategy.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java index d98793f05aed..19b11cd623e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java @@ -287,7 +287,7 @@ public int hashCode() { * */ private static class CombineWindowFnOutputTimes - extends OutputTimeFn.Defaults { + extends OutputTimeFn { private final OutputTimeFn outputTimeFn; private final WindowFn windowFn; @@ -313,5 +313,15 @@ public Instant combine(Instant timestamp, Instant otherTimestamp) { public Instant merge(W newWindow, Iterable timestamps) { return outputTimeFn.merge(newWindow, timestamps); } + + @Override + public final boolean dependsOnlyOnWindow() { + return outputTimeFn.dependsOnlyOnWindow(); + } + + @Override + public boolean dependsOnlyOnEarliestInputTimestamp() { + return outputTimeFn.dependsOnlyOnEarliestInputTimestamp(); + } } } From 55aae464530414f6e3ebe1103c32e39e6fc98a6f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 24 May 2016 13:11:40 -0700 Subject: [PATCH 3/3] Fixes to timestamps in GroupAlsoByWindowsProperties These properties had poor test coverage, so their timestamps were not updated alongside the new default for OutputTimeFn. --- .../beam/sdk/util/GroupAlsoByWindowsProperties.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index 4518f9f7a8b5..c4f3c8b16b60 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -191,8 +191,9 @@ public static void combinesElementsInSlidingWindows( CombineFn combineFn) throws Exception { - WindowingStrategy windowingStrategy = WindowingStrategy.of( - SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))); + WindowingStrategy windowingStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); List>> result = runGABW(gabwFactory, windowingStrategy, "k", @@ -360,14 +361,14 @@ public static void combinesElementsPerSession( KvMatcher.isKv( equalTo("k"), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))), - 0, // aggregate timestamp + window(0, 15).maxTimestamp().getMillis(), // aggregate timestamp 0, // window start 15), // window end WindowMatchers.isSingleWindowedValue( KvMatcher.isKv( equalTo("k"), equalTo(combineFn.apply(ImmutableList.of(4L)))), - 15, // aggregate timestamp + window(15, 25).maxTimestamp().getMillis(), // aggregate timestamp 15, // window start 25))); // window end }