From 660d46cde18346facdde16d63555383d6c9ff60f Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 27 Jun 2017 14:23:22 -0700 Subject: [PATCH] Add WindowFn#assignsToOneWindow --- .../beam/sdk/testing/StaticWindows.java | 5 ++++ .../transforms/windowing/GlobalWindows.java | 5 ++++ .../windowing/PartitioningWindowFn.java | 5 ++++ .../transforms/windowing/SlidingWindows.java | 5 ++++ .../sdk/transforms/windowing/WindowFn.java | 11 +++++++ .../beam/sdk/util/IdentityWindowFn.java | 5 ++++ .../windowing/SlidingWindowsTest.java | 30 +++++++++++++++---- 7 files changed, 61 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java index c11057a5c623..eba6978744c1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java @@ -126,4 +126,9 @@ public BoundedWindow getSideInputWindow(BoundedWindow mainWindow) { } }; } + + @Override + public boolean assignsToOneWindow() { + return true; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index d48d26b1807c..c68c497deb09 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -78,6 +78,11 @@ public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) { return inputTimestamp; } + @Override + public boolean assignsToOneWindow() { + return true; + } + @Override public boolean equals(Object other) { return other instanceof GlobalWindows; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java index 40ee68aae7f4..341ba27ec8cd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java @@ -58,4 +58,9 @@ public W getSideInputWindow(BoundedWindow mainWindow) { public Instant getOutputTime(Instant inputTimestamp, W window) { return inputTimestamp; } + + @Override + public final boolean assignsToOneWindow() { + return true; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index f65788438021..150b95633efa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -147,6 +147,11 @@ public boolean isCompatible(WindowFn other) { return equals(other); } + @Override + public boolean assignsToOneWindow() { + return !this.period.isShorterThan(this.size); + } + @Override public void verifyCompatibility(WindowFn other) throws IncompatibleWindowException { if (!this.isCompatible(other)) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 001d63014ec9..ffe85f3cf6c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -179,6 +179,17 @@ public boolean isNonMerging() { return false; } + /** + * Returns true if this {@link WindowFn} always assigns an element to exactly one window. + * + *

If this varies per-element, or cannot be determined, conservatively return false. + * + *

By default, returns false. + */ + public boolean assignsToOneWindow() { + return false; + } + /** * Returns a {@link TypeDescriptor} capturing what is known statically about the window type of * this {@link WindowFn} instance's most-derived class. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index a4bfdda4f813..ef6d83397f45 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -116,4 +116,9 @@ public WindowMappingFn getDefaultWindowMappingFn() { public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { return inputTimestamp; } + + @Override + public boolean assignsToOneWindow() { + return true; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java index b14e2215173d..bfd01f02ed26 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -55,11 +56,12 @@ public void testSimple() throws Exception { expected.put(new IntervalWindow(new Instant(0), new Instant(10)), set(1, 2, 5, 9)); expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11)); expected.put(new IntervalWindow(new Instant(10), new Instant(20)), set(10, 11)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(10)).every(new Duration(5)); assertEquals( expected, - runWindowFn( - SlidingWindows.of(new Duration(10)).every(new Duration(5)), + runWindowFn(windowFn, Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); + assertThat(windowFn.assignsToOneWindow(), is(false)); } @Test @@ -69,11 +71,27 @@ public void testSlightlyOverlapping() throws Exception { expected.put(new IntervalWindow(new Instant(0), new Instant(7)), set(1, 2, 5)); expected.put(new IntervalWindow(new Instant(5), new Instant(12)), set(5, 9, 10, 11)); expected.put(new IntervalWindow(new Instant(10), new Instant(17)), set(10, 11)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(7)).every(new Duration(5)); assertEquals( expected, - runWindowFn( - SlidingWindows.of(new Duration(7)).every(new Duration(5)), + runWindowFn(windowFn, Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); + assertThat(windowFn.assignsToOneWindow(), is(false)); + } + + @Test + public void testEqualSize() throws Exception { + Map> expected = new HashMap<>(); + expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2)); + expected.put(new IntervalWindow(new Instant(3), new Instant(6)), set(3, 4, 5)); + expected.put(new IntervalWindow(new Instant(6), new Instant(9)), set(6, 7)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(3)); + assertEquals( + expected, + runWindowFn( + windowFn, + Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L))); + assertThat(windowFn.assignsToOneWindow(), is(true)); } @Test @@ -82,12 +100,14 @@ public void testElidings() throws Exception { expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2)); expected.put(new IntervalWindow(new Instant(10), new Instant(13)), set(10, 11)); expected.put(new IntervalWindow(new Instant(100), new Instant(103)), set(100)); + SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(10)); assertEquals( expected, runWindowFn( // Only look at the first 3 millisecs of every 10-millisec interval. - SlidingWindows.of(new Duration(3)).every(new Duration(10)), + windowFn, Arrays.asList(1L, 2L, 3L, 5L, 9L, 10L, 11L, 100L))); + assertThat(windowFn.assignsToOneWindow(), is(true)); } @Test