From 069328fc0f60d13170496a730d5c7e7e28ebc95f Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 20 Jun 2016 10:30:53 +0100 Subject: [PATCH 1/2] MINOR: speed up streams integration tests by a) merging some for startup/shutdown efficiency. b) use independent state dirs. c) remove some tests that are covered elsewhere --- .../integration/JoinIntegrationTest.java | 3 +- .../KStreamRepartitionJoinTest.java | 345 ++++++------------ .../RegexSourceIntegrationTest.java | 2 +- .../integration/WordCountIntegrationTest.java | 2 +- 4 files changed, 115 insertions(+), 237 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index ea216f371886d..d46abbb9e3930 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -147,7 +147,8 @@ public void shouldCountClicksPerRegion() throws Exception { // StreamsConfig configuration (so we can retrieve whatever state directory Streams came up // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState` // accordingly. - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, + "/tmp/kstreams-join-integration-test"); // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 221d34966e9a9..072110dbf8274 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -23,12 +23,9 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.TestUtils; @@ -53,15 +50,11 @@ public class KStreamRepartitionJoinTest { public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - private static volatile int testNo = 0; - private KStreamBuilder builder; private Properties streamsConfiguration; private KStream streamOne; private KStream streamTwo; - private KStream streamThree; private KStream streamFour; - private KTable kTable; private ValueJoiner valueJoiner; private KeyValueMapper> keyMapper; @@ -72,16 +65,12 @@ public class KStreamRepartitionJoinTest { private String streamOneInput; private String streamTwoInput; private String streamFourInput; - private String tableInput; - private String outputTopic; - private String streamThreeInput; @Before public void before() { - testNo++; - String applicationId = "kstream-repartition-join-test" + testNo; + String applicationId = "kstream-repartition-join-test"; builder = new KStreamBuilder(); createTopics(); streamsConfiguration = new Properties(); @@ -92,14 +81,13 @@ public void before() { streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstream-repartition-test"); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); + streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); - streamThree = builder.stream(Serdes.Integer(), Serdes.Integer(), streamThreeInput); streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput); - kTable = builder.table(Serdes.Integer(), Serdes.String(), tableInput); - valueJoiner = new ValueJoiner() { @Override public String apply(final Integer value1, final String value2) { @@ -124,16 +112,37 @@ public void whenShuttingDown() throws IOException { } @Test - public void shouldMapStreamOneAndJoin() throws ExecutionException, InterruptedException { + public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception { produceMessages(); - doJoin(streamOne.map(keyMapper), streamTwo); + + final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin(); + final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin(); + final ExpectedOutputOnTopic mapMapJoin = mapMapJoin(); + final ExpectedOutputOnTopic selectKeyJoin = selectKeyAndJoin(); + final ExpectedOutputOnTopic flatMapJoin = flatMapJoin(); + final ExpectedOutputOnTopic mapRhs = joinMappedRhsStream(); + final ExpectedOutputOnTopic mapJoinJoin = joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined(); + final ExpectedOutputOnTopic leftJoin = mapBothStreamsAndLeftJoin(); + startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); + + verifyCorrectOutput(mapOne); + verifyCorrectOutput(mapBoth); + verifyCorrectOutput(mapMapJoin); + verifyCorrectOutput(selectKeyJoin); + verifyCorrectOutput(flatMapJoin); + verifyCorrectOutput(mapRhs); + verifyCorrectOutput(mapJoinJoin); + verifyLeftJoin(leftJoin); } - @Test - public void shouldMapBothStreamsAndJoin() throws Exception { - produceMessages(); + private ExpectedOutputOnTopic mapStreamOneAndJoin() { + String mapOneStreamAndJoinOutput = "map-one-join-output"; + doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput, "map-one-join"); + return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput); + } + + private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception { final KStream map1 = @@ -148,33 +157,30 @@ public KeyValue apply(Integer key, } }); - doJoin(map1, map2); - startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); - + doJoin(map1, map2, "map-both-streams-and-join", "map-both-join"); + return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join"); } - @Test - public void shouldMapMapJoin() throws Exception { - produceMessages(); + private ExpectedOutputOnTopic mapMapJoin() throws Exception { final KStream mapMapStream = streamOne.map( new KeyValueMapper>() { @Override public KeyValue apply(Long key, Integer value) { + if (value == null) { + return new KeyValue<>(null, null); + } return new KeyValue<>(key + value, value); } }).map(keyMapper); - doJoin(mapMapStream, streamTwo); - startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); + String outputTopic = "map-map-join"; + doJoin(mapMapStream, streamTwo, outputTopic, outputTopic); + return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } - @Test - public void shouldSelectKeyAndJoin() throws ExecutionException, InterruptedException { - produceMessages(); + public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException { final KStream keySelected = @@ -185,16 +191,13 @@ public Integer apply(final Long key, final Integer value) { } }); - doJoin(keySelected, streamTwo); - startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); + String outputTopic = "select-key-join"; + doJoin(keySelected, streamTwo, outputTopic, outputTopic); + return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } - @Test - public void shouldFlatMapJoin() throws Exception { - produceMessages(); - + private ExpectedOutputOnTopic flatMapJoin() throws Exception { final KStream flatMapped = streamOne.flatMap( new KeyValueMapper>>() { @Override @@ -204,22 +207,13 @@ public Iterable> apply(Long key, } }); - doJoin(flatMapped, streamTwo); - startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); - } + String outputTopic = "flat-map-join"; + doJoin(flatMapped, streamTwo, outputTopic, outputTopic); - @Test - public void shouldJoinTwoStreamsPartitionedTheSame() throws Exception { - produceMessages(); - doJoin(streamThree, streamTwo); - startStreams(); - verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E")); + return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } - @Test - public void shouldJoinWithRhsStreamMapped() throws Exception { - produceMessages(); + private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception { ValueJoiner joiner = new ValueJoiner() { @Override @@ -227,39 +221,21 @@ public String apply(String value1, Integer value2) { return value1 + ":" + value2; } }; + String output = "join-rhs-stream-mapped"; streamTwo .join(streamOne.map(keyMapper), joiner, - JoinWindows.of("the-join").within(60 * 1000), + JoinWindows.of(output).within(60 * 1000), Serdes.Integer(), Serdes.String(), Serdes.Integer()) - .to(Serdes.Integer(), Serdes.String(), outputTopic); + .to(Serdes.Integer(), Serdes.String(), output); - startStreams(); - verifyCorrectOutput(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5")); + return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), + output); } - @Test - public void shouldLeftJoinTwoStreamsPartitionedTheSame() throws Exception { - produceMessages(); - doLeftJoin(streamThree, streamTwo); - startStreams(); - verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E")); - } - - @Test - public void shouldMapStreamOneAndLeftJoin() throws ExecutionException, InterruptedException { - produceMessages(); - doLeftJoin(streamOne.map(keyMapper), streamTwo); - startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); - } - - @Test - public void shouldMapBothStreamsAndLeftJoin() throws Exception { - produceMessages(); - + public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception { final KStream map1 = streamOne.map(keyMapper); @@ -273,102 +249,20 @@ public KeyValue apply(Integer key, } }); - doLeftJoin(map1, map2); - startStreams(); - - List received = receiveMessages(new StringDeserializer(), 5); - - if (!received.equals(expectedStreamOneTwoJoin)) { - produceToStreamOne(); - verifyCorrectOutput(expectedStreamOneTwoJoin); - } - - } - - @Test - public void shouldLeftJoinWithRhsStreamMapped() throws Exception { - produceMessages(); - - ValueJoiner joiner = new ValueJoiner() { - @Override - public String apply(String value1, Integer value2) { - return value1 + ":" + value2; - } - }; - streamTwo - .leftJoin(streamOne.map(keyMapper), - joiner, - JoinWindows.of("the-join").within(60 * 1000), + String outputTopic = "left-join"; + map1.leftJoin(map2, + valueJoiner, + JoinWindows.of("the-left-join").within(60 * 1000), Serdes.Integer(), - null, - Serdes.Integer()) - .to(Serdes.Integer(), Serdes.String(), outputTopic); - - startStreams(); - List received = receiveMessages(new StringDeserializer(), 5); - - List expectedMessages = Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"); - if (!received.equals(expectedMessages)) { - produceStreamTwoInputTo(streamTwoInput); - verifyCorrectOutput(expectedMessages); - } - } - - @Test - public void shouldLeftJoinWithKTableAfterMap() throws Exception { - produceMessages(); - streamOne.map(keyMapper) - .leftJoin(kTable, valueJoiner, Serdes.Integer(), Serdes.Integer()) - .to(Serdes.Integer(), Serdes.String(), outputTopic); - - startStreams(); - - List received = receiveMessages(new StringDeserializer(), 5); - assertThat(received, is(expectedStreamOneTwoJoin)); - } - - @Test - public void shouldLeftJoinWithTableProducedFromGroupBy() throws Exception { - produceMessages(); - KTable aggTable = - streamOne.map(keyMapper) - .groupByKey(Serdes.Integer(), Serdes.Integer()) - .aggregate(new Initializer() { - @Override - public String apply() { - return ""; - } - }, new Aggregator() { - @Override - public String apply(final Integer aggKey, final Integer value, - final String aggregate) { - return aggregate + ":" + value; - } - }, Serdes.String(), "agg-by-key"); - - streamTwo.leftJoin(aggTable, new ValueJoiner() { - @Override - public String apply(final String value1, final String value2) { - return value1 + "@" + value2; - } - }, Serdes.Integer(), Serdes.String()) + Serdes.Integer(), + Serdes.String()) .to(Serdes.Integer(), Serdes.String(), outputTopic); - startStreams(); - - receiveMessages(new StringDeserializer(), 5); - produceStreamTwoInputTo(streamTwoInput); - List received = receiveMessages(new StringDeserializer(), 5); - - assertThat(received, is(Arrays.asList("A@:1", "B@:2", "C@:3", "D@:4", "E@:5"))); - + return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } - - @Test - public void shouldJoinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception { - produceMessages(); - + private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws + Exception { final KStream map1 = streamOne.map(keyMapper); @@ -387,7 +281,7 @@ public KeyValue apply(Integer key, final KStream join = map1.join(map2, valueJoiner, - JoinWindows.of("the-join") + JoinWindows.of("join-one") .within(60 * 1000), Serdes.Integer(), Serdes.Integer(), @@ -399,6 +293,7 @@ public String apply(final String value1, final String value2) { return value1 + ":" + value2; } }; + String topic = "map-join-join"; join.map(kvMapper) .join(streamFour.map(kvMapper), joiner, @@ -406,56 +301,50 @@ public String apply(final String value1, final String value2) { Serdes.Integer(), Serdes.String(), Serdes.String()) - .to(Serdes.Integer(), Serdes.String(), outputTopic); + .to(Serdes.Integer(), Serdes.String(), topic); - startStreams(); - verifyCorrectOutput(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E")); + + return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), + topic); } - @Test - public void shouldFilterNullKeysWhenRepartionedOnJoin() throws Exception { - produceMessages(); - IntegrationTestUtils.produceKeyValuesSynchronously( - streamOneInput, - Collections.singleton( - new KeyValue(70L, null)), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - LongSerializer.class, - IntegerSerializer.class, - new Properties())); - doJoin(streamOne.map(keyMapper), streamTwo); - startStreams(); - verifyCorrectOutput(expectedStreamOneTwoJoin); + private class ExpectedOutputOnTopic { + private final List expectedOutput; + private final String outputTopic; + + ExpectedOutputOnTopic(final List expectedOutput, final String outputTopic) { + this.expectedOutput = expectedOutput; + this.outputTopic = outputTopic; + } + } + + + private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic) + throws InterruptedException { + assertThat(receiveMessages(new StringDeserializer(), + expectedOutputOnTopic.expectedOutput.size(), + expectedOutputOnTopic.outputTopic), + is(expectedOutputOnTopic.expectedOutput)); + } + private void verifyLeftJoin(ExpectedOutputOnTopic expectedOutputOnTopic) + throws InterruptedException, ExecutionException { + List received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic + .expectedOutput.size(), expectedOutputOnTopic.outputTopic); + if (!received.equals(expectedOutputOnTopic.expectedOutput)) { + produceToStreamOne(); + verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic); + } } private void produceMessages() throws ExecutionException, InterruptedException { produceToStreamOne(); produceStreamTwoInputTo(streamTwoInput); - produceToStreamThree(); - produceStreamTwoInputTo(tableInput); produceStreamTwoInputTo(streamFourInput); } - private void produceToStreamThree() - throws ExecutionException, InterruptedException { - IntegrationTestUtils.produceKeyValuesSynchronously( - streamThreeInput, - Arrays.asList( - new KeyValue<>(1, 10), - new KeyValue<>(2, 20), - new KeyValue<>(3, 30), - new KeyValue<>(4, 40), - new KeyValue<>(5, 50)), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - IntegerSerializer.class, - new Properties())); - } private void produceStreamTwoInputTo(final String streamTwoInput) throws ExecutionException, InterruptedException { @@ -483,7 +372,8 @@ private void produceToStreamOne() new KeyValue<>(5L, 2), new KeyValue<>(12L, 3), new KeyValue<>(15L, 4), - new KeyValue<>(20L, 5)), + new KeyValue<>(20L, 5), + new KeyValue(70L, null)), // nulls should be filtered TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, @@ -492,18 +382,12 @@ private void produceToStreamOne() } private void createTopics() { - streamOneInput = "stream-one-" + testNo; - streamTwoInput = "stream-two-" + testNo; - streamThreeInput = "stream-three-" + testNo; - streamFourInput = "stream-four-" + testNo; - tableInput = "table-stream-two-" + testNo; - outputTopic = "output-" + testNo; + streamOneInput = "stream-one"; + streamTwoInput = "stream-two"; + streamFourInput = "stream-four"; CLUSTER.createTopic(streamOneInput); CLUSTER.createTopic(streamTwoInput, 2, 1); - CLUSTER.createTopic(streamThreeInput, 2, 1); CLUSTER.createTopic(streamFourInput); - CLUSTER.createTopic(tableInput, 2, 1); - CLUSTER.createTopic(outputTopic); } @@ -514,20 +398,20 @@ private void startStreams() { private List receiveMessages(final Deserializer valueDeserializer, - final int numMessages) throws InterruptedException { + final int numMessages, final String topic) throws InterruptedException { final Properties config = new Properties(); config .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test-" + testNo); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); List received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, - outputTopic, + topic, numMessages, 60 * 1000); @@ -535,31 +419,24 @@ private List receiveMessages(final Deserializer valueDeserializer, return received; } - private void verifyCorrectOutput(List expectedMessages) throws InterruptedException { - assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size()), + private void verifyCorrectOutput(List expectedMessages, + final String topic) throws InterruptedException { + assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic), is(expectedMessages)); } private void doJoin(KStream lhs, - KStream rhs) { + KStream rhs, + String outputTopic, + final String joinName) { + CLUSTER.createTopic(outputTopic); lhs.join(rhs, valueJoiner, - JoinWindows.of("the-join").within(60 * 1000), + JoinWindows.of(joinName).within(60 * 1000), Serdes.Integer(), Serdes.Integer(), Serdes.String()) .to(Serdes.Integer(), Serdes.String(), outputTopic); } - private void doLeftJoin(KStream lhs, - KStream rhs) { - lhs.leftJoin(rhs, - valueJoiner, - JoinWindows.of("the-join").within(60 * 1000), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()) - .to(Serdes.Integer(), Serdes.String(), outputTopic); - } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 7e18cff90845d..5634c646eca32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -313,7 +313,7 @@ private Properties getStreamsConfig() { streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstreams-regex-test"); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java index 2966590f3fedb..9692cda1a996a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java @@ -91,7 +91,7 @@ public void shouldCountWords() throws Exception { // StreamsConfig configuration (so we can retrieve whatever state directory Streams came up // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState` // accordingly. - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstreams-word-count"); KStreamBuilder builder = new KStreamBuilder(); From afcc89f7da20a4e52bb06d413beb067659c00316 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 20 Jun 2016 12:49:17 +0100 Subject: [PATCH 2/2] use random temp directory for streams state dir --- .../streams/integration/InternalTopicIntegrationTest.java | 3 ++- .../apache/kafka/streams/integration/JoinIntegrationTest.java | 3 ++- .../streams/integration/KGroupedStreamIntegrationTest.java | 3 ++- .../kafka/streams/integration/KStreamRepartitionJoinTest.java | 3 ++- .../kafka/streams/integration/RegexSourceIntegrationTest.java | 1 - .../java/org/apache/kafka/streams/state/StateTestUtils.java | 2 +- 6 files changed, 9 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index b642b2a6a01b0..addebaebd2683 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.state.StateTestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -122,7 +123,7 @@ public void shouldCompactTopicsForStateChangelogs() throws Exception { streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index d46abbb9e3930..f251a8586a336 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.StateTestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -148,7 +149,7 @@ public void shouldCountClicksPerRegion() throws Exception { // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState` // accordingly. streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, - "/tmp/kstreams-join-integration-test"); + StateTestUtils.tempDir().getPath()); // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java index 44e92f7a0b2f8..1ec657386d4ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.StateTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -80,7 +81,7 @@ public void before() { .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kgrouped-stream-test"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath()); KeyValueMapper mapper = diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 072110dbf8274..caf326dae55d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.StateTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -80,7 +81,7 @@ public void before() { .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstream-repartition-test"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 5634c646eca32..cf4839182e933 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -313,7 +313,6 @@ private Properties getStreamsConfig() { streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstreams-regex-test"); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java index 70e6cf67f9560..f348fc98ebd02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java @@ -38,7 +38,7 @@ public class StateTestUtils { */ public static File tempDir() { try { - final File dir = Files.createTempDirectory("test").toFile(); + final File dir = Files.createTempDirectory(new File("/tmp").toPath(), "test").toFile(); dir.mkdirs(); dir.deleteOnExit();