From 0ce35012c6d03754c4476b82fd4e62e899dfa32f Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 20 Jun 2016 22:16:02 +0200 Subject: [PATCH 1/2] Kafka-3880: Disallow Join Window with size zero --- .../kafka/streams/kstream/JoinWindows.java | 18 ++-- .../KStreamRepartitionJoinTest.java | 11 +-- .../streams/kstream/JoinWindowsTest.java | 99 +++++++++++++++++++ .../streams/kstream/TimeWindowsTest.java | 30 +++--- .../kstream/internals/KStreamImplTest.java | 4 +- .../internals/KStreamKStreamJoinTest.java | 9 +- .../internals/KStreamKStreamLeftJoinTest.java | 8 +- 7 files changed, 142 insertions(+), 37 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index f45c0640dff56..81eddad387d42 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -52,21 +52,27 @@ public class JoinWindows extends Windows { private JoinWindows(String name, long before, long after) { super(name); + if (before < 0) { + throw new IllegalArgumentException("window size must be > 0 (you provided before as " + before + ")"); + } + if (after < 0) { + throw new IllegalArgumentException("window size must be > 0 (you provided after as " + after + ")"); + } + if (before == 0 && after == 0) { + throw new IllegalArgumentException("window size must be > 0 (you provided 0)"); + } + this.after = after; this.before = before; } - public static JoinWindows of(String name) { - return new JoinWindows(name, 0L, 0L); - } - /** * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}. * * @param timeDifference join window interval */ - public JoinWindows within(long timeDifference) { - return new JoinWindows(this.name, timeDifference, timeDifference); + public static JoinWindows of(String name, long timeDifference) { + return new JoinWindows(name, timeDifference, timeDifference); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index caf326dae55d2..c8525139ee691 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -226,7 +226,7 @@ public String apply(String value1, Integer value2) { streamTwo .join(streamOne.map(keyMapper), joiner, - JoinWindows.of(output).within(60 * 1000), + JoinWindows.of(output, 60 * 1000), Serdes.Integer(), Serdes.String(), Serdes.Integer()) @@ -253,7 +253,7 @@ public KeyValue apply(Integer key, String outputTopic = "left-join"; map1.leftJoin(map2, valueJoiner, - JoinWindows.of("the-left-join").within(60 * 1000), + JoinWindows.of("the-left-join", 60 * 1000), Serdes.Integer(), Serdes.Integer(), Serdes.String()) @@ -282,8 +282,7 @@ public KeyValue apply(Integer key, final KStream join = map1.join(map2, valueJoiner, - JoinWindows.of("join-one") - .within(60 * 1000), + JoinWindows.of("join-one", 60 * 1000), Serdes.Integer(), Serdes.Integer(), Serdes.String()); @@ -298,7 +297,7 @@ public String apply(final String value1, final String value2) { join.map(kvMapper) .join(streamFour.map(kvMapper), joiner, - JoinWindows.of("the-other-join").within(60 * 1000), + JoinWindows.of("the-other-join", 60 * 1000), Serdes.Integer(), Serdes.String(), Serdes.String()) @@ -433,7 +432,7 @@ private void doJoin(KStream lhs, CLUSTER.createTopic(outputTopic); lhs.join(rhs, valueJoiner, - JoinWindows.of(joinName).within(60 * 1000), + JoinWindows.of(joinName, 60 * 1000), Serdes.Integer(), Serdes.Integer(), Serdes.String()) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java new file mode 100644 index 0000000000000..d8fa7b49c7c13 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + + +public class JoinWindowsTest { + + private static String anyName = "window"; + private static long anySize = 123L; + private static long anyOtherSize = 456L; + + @Test + public void shouldHaveSaneEqualsAndHashCode() { + JoinWindows w1 = JoinWindows.of("w1", anySize); + JoinWindows w2 = JoinWindows.of("w2", anySize); + + // Reflexive + assertEquals(w1, w1); + assertEquals(w1.hashCode(), w1.hashCode()); + + // Symmetric + assertEquals(w1, w2); + assertEquals(w2, w1); + assertEquals(w1.hashCode(), w2.hashCode()); + + JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize); + JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after); + assertEquals(w3, w4); + assertEquals(w4, w3); + assertEquals(w3.hashCode(), w4.hashCode()); + + // Inequality scenarios + assertNotEquals("must be false for null", null, w1); + assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1); + assertNotEquals("must be false for different types", new Object(), w1); + + JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after + 1); + assertNotEquals("must be false when window sizes are different", differentWindowSize, w1); + + JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after + 1); + assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1); + + JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before + 1); + assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeEmpty() { + JoinWindows.of("", anySize); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeNull() { + JoinWindows.of(null, anySize); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeNegative() { + JoinWindows.of(anyName, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void beforeMustNotBeNegative() { + JoinWindows.of(anyName, anySize).before(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void afterSizeMustNotBeNegative() { + JoinWindows.of(anyName, anySize).after(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeZero() { + JoinWindows.of(anyName, 0); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 62b12a9ff4094..5acd6e22c6918 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -25,8 +25,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotEquals; public class TimeWindowsTest { @@ -39,31 +38,30 @@ public void shouldHaveSaneEqualsAndHashCode() { TimeWindows w2 = TimeWindows.of("w2", w1.size); // Reflexive - assertTrue(w1.equals(w1)); - assertTrue(w1.hashCode() == w1.hashCode()); + assertEquals(w1, w1); + assertEquals(w1.hashCode(), w1.hashCode()); // Symmetric - assertTrue(w1.equals(w2)); - assertTrue(w1.hashCode() == w2.hashCode()); - assertTrue(w2.hashCode() == w1.hashCode()); + assertEquals(w1, w2); + assertEquals(w2, w1); + assertEquals(w1.hashCode(), w2.hashCode()); // Transitive TimeWindows w3 = TimeWindows.of("w3", w2.size); - assertTrue(w2.equals(w3)); - assertTrue(w2.hashCode() == w3.hashCode()); - assertTrue(w1.equals(w3)); - assertTrue(w1.hashCode() == w3.hashCode()); + assertEquals(w2, w3); + assertEquals(w1, w3); + assertEquals(w1.hashCode(), w3.hashCode()); // Inequality scenarios - assertFalse("must be false for null", w1.equals(null)); - assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant"))); - assertFalse("must be false for different types", w1.equals(new Object())); + assertNotEquals("must be false for null", null, w1); + assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1); + assertNotEquals("must be false for different types", new Object(), w1); TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1); - assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize)); + assertNotEquals("must be false when window sizes are different", differentWindowSize, w1); TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1); - assertFalse("must be false when advance intervals are different", w1.equals(differentAdvanceInterval)); + assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval, w1); } @Test(expected = IllegalArgumentException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 3d45d1dcc8a29..84c9167bd704a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -108,14 +108,14 @@ public boolean test(String key, Integer value) { public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-0"), stringSerde, intSerde, intSerde); + }, JoinWindows.of("join-0", 1), stringSerde, intSerde, intSerde); KStream stream5 = streams2[1].join(streams3[1], new ValueJoiner() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-1"), stringSerde, intSerde, intSerde); + }, JoinWindows.of("join-1", 1), stringSerde, intSerde, intSerde); stream4.to("topic-5"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 6b0828a62aa78..aa7d117685a6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -77,7 +77,8 @@ public void testJoin() throws Exception { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), + intSerde, stringSerde, stringSerde); joined.process(processor); Collection> copartitionGroups = builder.copartitionGroups(); @@ -175,7 +176,8 @@ public void testOuterJoin() throws Exception { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); + joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), + intSerde, stringSerde, stringSerde); joined.process(processor); Collection> copartitionGroups = builder.copartitionGroups(); @@ -275,7 +277,8 @@ public void testWindowing() throws Exception { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), + intSerde, stringSerde, stringSerde); joined.process(processor); Collection> copartitionGroups = builder.copartitionGroups(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 1a608a737cbd7..8e05da9f4427e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -78,8 +78,8 @@ public void testLeftJoin() throws Exception { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test") - .within(100), intSerde, stringSerde, stringSerde); + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), + intSerde, stringSerde, stringSerde); joined.process(processor); Collection> copartitionGroups = builder.copartitionGroups(); @@ -157,8 +157,8 @@ public void testWindowing() throws Exception { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test") - .within(100), intSerde, stringSerde, stringSerde); + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), + intSerde, stringSerde, stringSerde); joined.process(processor); Collection> copartitionGroups = builder.copartitionGroups(); From 05da0f6ecdf1530064d7c878a2f90d6fb8f2c7cc Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 21 Jun 2016 11:49:40 +0200 Subject: [PATCH 2/2] minor updates --- .../java/org/apache/kafka/streams/kstream/JoinWindows.java | 2 ++ .../kafka/streams/kstream/internals/KStreamImplTest.java | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 81eddad387d42..53ddf3ec46288 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -41,6 +41,8 @@ * * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as * a join specification on the second stream with flipped before and after values. + *

+ * Both values (before and after) must not be negative and not zero at the same time. */ public class JoinWindows extends Windows { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 84c9167bd704a..6242702d7cb8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -103,19 +103,20 @@ public boolean test(String key, Integer value) { } ); + final int anyWindowSize = 1; KStream stream4 = streams2[0].join(streams3[0], new ValueJoiner() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-0", 1), stringSerde, intSerde, intSerde); + }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde); KStream stream5 = streams2[1].join(streams3[1], new ValueJoiner() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-1", 1), stringSerde, intSerde, intSerde); + }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde); stream4.to("topic-5");