From 5e42157f94e93f29bea367f6fb2605c890b64d46 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 30 Jun 2016 01:53:06 +0200 Subject: [PATCH 1/2] hotfix: check join window boundaries --- .../kafka/streams/kstream/JoinWindows.java | 21 +++++++----- .../streams/kstream/JoinWindowsTest.java | 32 ++++++++++++------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 53ddf3ec46288..936bcd28cee72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -42,7 +42,8 @@ * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as * a join specification on the second stream with flipped before and after values. *

- * Both values (before and after) must not be negative and not zero at the same time. + * Both values (before and after) must not result in an "inverse" window, + * i.e., lower-interval-bound must not be larger than upper-interval.bound. */ public class JoinWindows extends Windows { @@ -54,14 +55,17 @@ public class JoinWindows extends Windows { private JoinWindows(String name, long before, long after) { super(name); - if (before < 0) { - throw new IllegalArgumentException("window size must be > 0 (you provided before as " + before + ")"); + if (before < 0) { // shift lower bound to right + if (after < -before) { + throw new IllegalArgumentException("Upper interval bound smaller than lower interval bound." + + " must be at least " + (-before)); + } } - if (after < 0) { - throw new IllegalArgumentException("window size must be > 0 (you provided after as " + after + ")"); - } - if (before == 0 && after == 0) { - throw new IllegalArgumentException("window size must be > 0 (you provided 0)"); + if (after < 0) { // shift upper bound to left + if (before < -after) { + throw new IllegalArgumentException("Lower interval bound greater than upper interval bound." + + " must be at least " + (-after)); + } } this.after = after; @@ -70,6 +74,7 @@ private JoinWindows(String name, long before, long after) { /** * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}. + * ({@code timeDifference} must not be negative) * * @param timeDifference join window interval */ diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index d8fa7b49c7c13..d80342a7e94ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -29,7 +29,7 @@ public class JoinWindowsTest { private static String anyName = "window"; private static long anySize = 123L; - private static long anyOtherSize = 456L; + private static long anyOtherSize = 456L; // should be larger than anySize @Test public void shouldHaveSaneEqualsAndHashCode() { @@ -66,6 +66,21 @@ public void shouldHaveSaneEqualsAndHashCode() { assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1); } + @Test + public void validWindows() { + JoinWindows.of(anyName, anyOtherSize) // [ -anyOtherSize ; anyOtherSize ] + .before(anySize) // [ -anySize ; anyOtherSize ] + .before(0) // [ 0 ; anyOtherSize ] + .before(-anySize) // [ anySize ; anyOtherSize ] + .before(-anyOtherSize); // [ anyOtherSize ; anyOtherSize ] + + JoinWindows.of(anyName, anyOtherSize) // [ -anyOtherSize ; anyOtherSize ] + .after(anySize) // [ -anyOtherSize ; anySize ] + .after(0) // [ -anyOtherSize ; 0 ] + .after(-anySize) // [ -anyOtherSize ; -anySize ] + .after(-anyOtherSize); // [ -anyOtherSize ; -anyOtherSize ] + } + @Test(expected = IllegalArgumentException.class) public void nameMustNotBeEmpty() { JoinWindows.of("", anySize); @@ -77,23 +92,18 @@ public void nameMustNotBeNull() { } @Test(expected = IllegalArgumentException.class) - public void windowSizeMustNotBeNegative() { + public void timeDifferenceMustNotBeNegative() { JoinWindows.of(anyName, -1); } @Test(expected = IllegalArgumentException.class) - public void beforeMustNotBeNegative() { - JoinWindows.of(anyName, anySize).before(-1); - } - - @Test(expected = IllegalArgumentException.class) - public void afterSizeMustNotBeNegative() { - JoinWindows.of(anyName, anySize).after(-1); + public void afterBelowLower() { + JoinWindows.of(anyName, anySize).after(-anySize-1); } @Test(expected = IllegalArgumentException.class) - public void windowSizeMustNotBeZero() { - JoinWindows.of(anyName, 0); + public void beforeOverUpper() { + JoinWindows.of(anyName, anySize).before(-anySize-1); } } \ No newline at end of file From ad3d07f84f80cffd7537e1d28a993e66664037bc Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 30 Jun 2016 14:59:11 +0200 Subject: [PATCH 2/2] fixed checkstyle simplified boundary check --- .../apache/kafka/streams/kstream/JoinWindows.java | 13 ++----------- .../kafka/streams/kstream/JoinWindowsTest.java | 4 ++-- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 936bcd28cee72..309a9e6a8a930 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -55,17 +55,8 @@ public class JoinWindows extends Windows { private JoinWindows(String name, long before, long after) { super(name); - if (before < 0) { // shift lower bound to right - if (after < -before) { - throw new IllegalArgumentException("Upper interval bound smaller than lower interval bound." - + " must be at least " + (-before)); - } - } - if (after < 0) { // shift upper bound to left - if (before < -after) { - throw new IllegalArgumentException("Lower interval bound greater than upper interval bound." - + " must be at least " + (-after)); - } + if (before + after < 0) { + throw new IllegalArgumentException("Window interval (ie, before+after) must not be negative"); } this.after = after; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index d80342a7e94ce..20efd4580c199 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -98,12 +98,12 @@ public void timeDifferenceMustNotBeNegative() { @Test(expected = IllegalArgumentException.class) public void afterBelowLower() { - JoinWindows.of(anyName, anySize).after(-anySize-1); + JoinWindows.of(anyName, anySize).after(-anySize - 1); } @Test(expected = IllegalArgumentException.class) public void beforeOverUpper() { - JoinWindows.of(anyName, anySize).before(-anySize-1); + JoinWindows.of(anyName, anySize).before(-anySize - 1); } } \ No newline at end of file