From 75e01c87a5de8a64b9d4b3baf2e0dd061e138b25 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 11 Jun 2016 14:12:42 +0200 Subject: [PATCH 1/2] KAFKA-3185: [Streams] Added Kafka Streams Application Reset Tool cherry-picked from trunk Conflicts: checkstyle/import-control.xml streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java --- bin/kafka-streams-application-reset.sh | 21 ++ build.gradle | 2 + checkstyle/import-control.xml | 3 + .../apache/kafka/streams/KafkaStreams.java | 96 ++++--- .../kafka/streams/KafkaStreamsTest.java | 129 +++++++-- .../integration/ResetIntegrationTest.java | 255 +++++++++++++++++ .../utils/EmbeddedSingleNodeKafkaCluster.java | 9 +- .../apache/kafka/tools/StreamsResetter.java | 260 ++++++++++++++++++ 8 files changed, 715 insertions(+), 60 deletions(-) create mode 100755 bin/kafka-streams-application-reset.sh create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java diff --git a/bin/kafka-streams-application-reset.sh b/bin/kafka-streams-application-reset.sh new file mode 100755 index 0000000000000..26ab7667137fe --- /dev/null +++ b/bin/kafka-streams-application-reset.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@" diff --git a/build.gradle b/build.gradle index 36647b3468dc4..e2f4da6ee56f9 100644 --- a/build.gradle +++ b/build.gradle @@ -634,6 +634,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { + compile project(':core') compile project(':clients') compile project(':log4j-appender') compile libs.argparse4j @@ -682,6 +683,7 @@ project(':streams') { testCompile project(':clients').sourceSets.test.output testCompile project(':core') testCompile project(':core').sourceSets.test.output + testCompile project(':tools') testCompile libs.junit testRuntime libs.slf4jlog4j diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 5f52cced89d8f..72247acf59294 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -123,6 +123,8 @@ + + @@ -149,6 +151,7 @@ + diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index af6d973e3dc21..17c760e33c7eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -24,12 +24,14 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -43,7 +45,7 @@ * The computational logic can be specified either by using the {@link TopologyBuilder} class to define the a DAG topology of * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder} * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL to define the transformation. - * + *

* The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or * more threads specified in the configs for the processing work. *

@@ -56,7 +58,7 @@ * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. *

- * + *

* A simple example might look like this: *

  *    Map<String, Object> props = new HashMap<>();
@@ -74,7 +76,6 @@
  *    KafkaStreams streams = new KafkaStreams(builder, config);
  *    streams.start();
  * 
- * */ @InterfaceStability.Unstable @@ -99,52 +100,56 @@ public class KafkaStreams { // usage only and should not be exposed to users at all. private final UUID processId; + private final StreamsConfig config; + /** * Construct the stream instance. * - * @param builder the processor topology builder specifying the computational logic - * @param props properties for the {@link StreamsConfig} + * @param builder the processor topology builder specifying the computational logic + * @param props properties for the {@link StreamsConfig} */ - public KafkaStreams(TopologyBuilder builder, Properties props) { + public KafkaStreams(final TopologyBuilder builder, final Properties props) { this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); } /** * Construct the stream instance. * - * @param builder the processor topology builder specifying the computational logic - * @param config the stream configs + * @param builder the processor topology builder specifying the computational logic + * @param config the stream configs */ - public KafkaStreams(TopologyBuilder builder, StreamsConfig config) { + public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) { this(builder, config, new DefaultKafkaClientSupplier()); } /** * Construct the stream instance. * - * @param builder the processor topology builder specifying the computational logic - * @param config the stream configs - * @param clientSupplier the kafka clients supplier which provides underlying producer and consumer clients - * for this {@link KafkaStreams} instance + * @param builder the processor topology builder specifying the computational logic + * @param config the stream configs + * @param clientSupplier the kafka clients supplier which provides underlying producer and consumer clients + * for this {@link KafkaStreams} instance */ - public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier) { + public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier) { // create the metrics - Time time = new SystemTime(); + final Time time = new SystemTime(); this.processId = UUID.randomUUID(); + this.config = config; + // The application ID is a required config and hence should always have value - String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement(); - List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); + final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -152,25 +157,26 @@ public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSu this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; for (int i = 0; i < this.threads.length; i++) { - this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time); + this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time); } } /** * Start the stream instance by starting all its threads. + * * @throws IllegalStateException if process was already started */ public synchronized void start() { log.debug("Starting Kafka Stream process"); - if (state == CREATED) { - for (StreamThread thread : threads) + if (this.state == CREATED) { + for (final StreamThread thread : this.threads) thread.start(); - state = RUNNING; + this.state = RUNNING; log.info("Started Kafka Stream process"); - } else if (state == RUNNING) { + } else if (this.state == RUNNING) { throw new IllegalStateException("This process was already started."); } else { throw new IllegalStateException("Cannot restart after closing."); @@ -180,40 +186,64 @@ public synchronized void start() { /** * Shutdown this stream instance by signaling all the threads to stop, * and then wait for them to join. + * * @throws IllegalStateException if process has not started yet */ public synchronized void close() { log.debug("Stopping Kafka Stream process"); - if (state == RUNNING) { + if (this.state == RUNNING) { // signal the threads to stop and wait - for (StreamThread thread : threads) + for (final StreamThread thread : this.threads) thread.close(); - for (StreamThread thread : threads) { + for (final StreamThread thread : this.threads) { try { thread.join(); - } catch (InterruptedException ex) { + } catch (final InterruptedException ex) { Thread.interrupted(); } } } - if (state != STOPPED) { - metrics.close(); - state = STOPPED; + if (this.state != STOPPED) { + this.metrics.close(); + this.state = STOPPED; log.info("Stopped Kafka Stream process"); } } + /** + * Cleans up local state store directory ({@code state.dir}), by deleting all data with regard to the application-id. + *

+ * May only be called either before instance is started or after instance is closed. + * + * @throws IllegalStateException if instance is currently running + */ + public void cleanUp() { + if (this.state == RUNNING) { + throw new IllegalStateException("Cannot clean up while running."); + } + + final String localApplicationDir = this.config.getString(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + + log.debug("Clean up local Kafka Streams data in {}", localApplicationDir); + log.debug("Removing local Kafka Streams application data in {} for application {}", + localApplicationDir, + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); + Utils.delete(new File(localApplicationDir)); + } + /** * Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception. * * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler. */ - public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) { - for (StreamThread thread : threads) + public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { + for (final StreamThread thread : this.threads) thread.setUncaughtExceptionHandler(eh); } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 22d8bf2dd168b..f33cb3a569dfb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,16 +19,21 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.Properties; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class KafkaStreamsTest { @Test public void testStartAndClose() throws Exception { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); @@ -36,71 +41,143 @@ public void testStartAndClose() throws Exception { final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); - KStreamBuilder builder = new KStreamBuilder(); - KafkaStreams streams = new KafkaStreams(builder, props); + final KStreamBuilder builder = new KStreamBuilder(); + final KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); final int newInitCount = MockMetricsReporter.INIT_COUNT.get(); final int initCountDifference = newInitCount - oldInitCount; - Assert.assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0); + assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0); streams.close(); Assert.assertEquals("each reporter initialized should also be closed", - oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get()); + oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get()); } @Test public void testCloseIsIdempotent() throws Exception { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - KStreamBuilder builder = new KStreamBuilder(); - KafkaStreams streams = new KafkaStreams(builder, props); + final KStreamBuilder builder = new KStreamBuilder(); + final KafkaStreams streams = new KafkaStreams(builder, props); streams.close(); final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); streams.close(); Assert.assertEquals("subsequent close() calls should do nothing", - closeCount, MockMetricsReporter.CLOSE_COUNT.get()); + closeCount, MockMetricsReporter.CLOSE_COUNT.get()); } - @Test + @Test(expected = IllegalStateException.class) public void testCannotStartOnceClosed() throws Exception { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KStreamBuilder builder = new KStreamBuilder(); - KafkaStreams streams = new KafkaStreams(builder, props); + final KStreamBuilder builder = new KStreamBuilder(); + final KafkaStreams streams = new KafkaStreams(builder, props); streams.close(); try { streams.start(); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { Assert.assertEquals("Cannot restart after closing.", e.getMessage()); - return; + throw e; + } finally { + streams.close(); } - Assert.fail("should have caught an exception and returned"); } - @Test + @Test(expected = IllegalStateException.class) public void testCannotStartTwice() throws Exception { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KStreamBuilder builder = new KStreamBuilder(); - KafkaStreams streams = new KafkaStreams(builder, props); + final KStreamBuilder builder = new KStreamBuilder(); + final KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); try { streams.start(); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { Assert.assertEquals("This process was already started.", e.getMessage()); - return; + throw e; + } finally { + streams.close(); } - Assert.fail("should have caught an exception and returned"); } + + @Test + public void testCleanup() throws Exception { + final Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + final KStreamBuilder builder = new KStreamBuilder(); + final KafkaStreams streams = new KafkaStreams(builder, props); + + streams.cleanUp(); + streams.start(); + streams.close(); + streams.cleanUp(); + } + + @Test + public void testCleanupIsolation() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + + final String appId1 = "testIsolation-1"; + final String appId2 = "testIsolation-2"; + final String stateDir = TestUtils.tempDirectory().getPath(); + final File stateDirApp1 = new File(stateDir + File.separator + appId1); + final File stateDirApp2 = new File(stateDir + File.separator + appId2); + + final Properties props = new Properties(); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir); + + assertFalse(stateDirApp1.exists()); + assertFalse(stateDirApp2.exists()); + + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId1); + final KafkaStreams streams1 = new KafkaStreams(builder, props); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId2); + final KafkaStreams streams2 = new KafkaStreams(builder, props); + + assertTrue(stateDirApp1.exists()); + assertTrue(stateDirApp2.exists()); + + streams1.cleanUp(); + assertFalse(stateDirApp1.exists()); + assertTrue(stateDirApp2.exists()); + + streams2.cleanUp(); + assertFalse(stateDirApp1.exists()); + assertFalse(stateDirApp2.exists()); + } + + @Test(expected = IllegalStateException.class) + public void testCannotCleanupWhileRunning() throws Exception { + final Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + final KStreamBuilder builder = new KStreamBuilder(); + final KafkaStreams streams = new KafkaStreams(builder, props); + + streams.start(); + try { + streams.cleanUp(); + } catch (final IllegalStateException e) { + Assert.assertEquals("Cannot clean up while running.", e.getMessage()); + throw e; + } finally { + streams.close(); + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java new file mode 100644 index 0000000000000..bffdfd937a867 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tools.StreamsResetter; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + + +/** + * Tests local state store and global application cleanup. + */ +public class ResetIntegrationTest { + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + + private static final String APP_ID = "cleanup-integration-test"; + private static final String INPUT_TOPIC = "inputTopic"; + private static final String OUTPUT_TOPIC = "outputTopic"; + private static final String OUTPUT_TOPIC_2 = "outputTopic2"; + private static final String INTERMEDIATE_USER_TOPIC = "userTopic"; + + private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; + private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; + + @BeforeClass + public static void startKafkaCluster() throws Exception { + CLUSTER.createTopic(INPUT_TOPIC); + CLUSTER.createTopic(OUTPUT_TOPIC); + CLUSTER.createTopic(OUTPUT_TOPIC_2); + CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); + } + + @Test + public void testReprocessingFromScratchAfterCleanUp() throws Exception { + final Properties streamsConfiguration = prepareTest(); + final Properties resultTopicConsumerConfig = prepareResultConsumer(); + + prepareInputData(); + final KStreamBuilder builder = setupTopology(); + + // RUN + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10); + // receive only first values to make sure intermediate user topic is not consumed completely + // => required to test "seekToEnd" for intermediate topics + final KeyValue result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0); + + streams.close(); + + // RESET + Utils.sleep(STREAMS_CONSUMER_TIMEOUT); + streams.cleanUp(); + cleanGlobal(); + assertInternalTopicsGotDeleted(); + Utils.sleep(CLEANUP_CONSUMER_TIMEOUT); + + // RE-RUN + streams = new KafkaStreams(setupTopology(), streamsConfiguration); + streams.start(); + final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10); + final KeyValue resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0); + streams.close(); + + assertThat(resultRerun, equalTo(result)); + assertThat(resultRerun2, equalTo(result2)); + } + + private Properties prepareTest() throws Exception { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1); + streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + return streamsConfiguration; + } + + private Properties prepareResultConsumer() { + final Properties resultTopicConsumerConfig = new Properties(); + resultTopicConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + resultTopicConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-standard-consumer-" + OUTPUT_TOPIC); + resultTopicConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + resultTopicConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + resultTopicConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + + return resultTopicConsumerConfig; + } + + private void prepareInputData() throws Exception { + final Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 10L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 20L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 30L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 40L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 50L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 60L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 61L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 62L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 63L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L); + } + + private KStreamBuilder setupTopology() { + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream input = builder.stream(INPUT_TOPIC); + + // use map to trigger internal re-partitioning before groupByKey + final KTable globalCounts = input + .map(new KeyValueMapper>() { + @Override + public KeyValue apply(final Long key, final String value) { + return new KeyValue<>(key, value); + } + }) + .groupByKey() + .count("global-count"); + globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); + + final KStream windowedCounts = input + .through(INTERMEDIATE_USER_TOPIC) + .map(new KeyValueMapper>() { + @Override + public KeyValue apply(final Long key, final String value) { + // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped + // => want to test "skip over" unprocessed records + // increasing the sleep time only has disadvantage that test run time is increased + Utils.sleep(1000); + return new KeyValue<>(key, value); + } + }) + .groupByKey() + .count(TimeWindows.of(35).advanceBy(10), "count") + .toStream() + .map(new KeyValueMapper, Long, KeyValue>() { + @Override + public KeyValue apply(final Windowed key, final Long value) { + return new KeyValue<>(key.window().start() + key.window().end(), value); + } + }); + windowedCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC_2); + + return builder; + } + + private void cleanGlobal() { + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + + final int exitCode = new StreamsResetter().run( + new String[]{ + "--application-id", APP_ID, + "--bootstrap-server", CLUSTER.bootstrapServers(), + "--zookeeper", CLUSTER.zKConnectString(), + "--input-topics", INPUT_TOPIC, + "--intermediate-topics", INTERMEDIATE_USER_TOPIC + }, + cleanUpConfig); + Assert.assertEquals(0, exitCode); + } + + private void assertInternalTopicsGotDeleted() { + final Set expectedRemainingTopicsAfterCleanup = new HashSet<>(); + expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); + expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC); + expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); + expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); + expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); + + Set allTopics; + ZkUtils zkUtils = null; + try { + zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + + do { + Utils.sleep(100); + allTopics = new HashSet<>(); + allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size()); + } finally { + if (zkUtils != null) { + zkUtils.close(); + } + } + assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index 34753ae2e672f..b293a02730692 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -48,6 +48,8 @@ public void start() throws IOException, InterruptedException { log.debug("ZooKeeper instance is running at {}", zKConnectString()); brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); + brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); + brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); broker = new KafkaEmbedded(brokerConfig); @@ -125,4 +127,9 @@ public void createTopic(String topic, Properties topicConfig) { broker.createTopic(topic, partitions, replication, topicConfig); } -} \ No newline at end of file + + public void deleteTopic(String topic) { + broker.deleteTopic(topic); + } + +} diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java new file mode 100644 index 0000000000000..734c15b0f2784 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import kafka.admin.TopicCommand; +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. + *

+ * Resetting the processing state of an application includes the following actions: + *

    + *
  1. setting the application's consumer offsets for input and internal topics to zero
  2. + *
  3. skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate topics)
  4. + *
  5. deleting any topics created internally by Kafka Streams for this application
  6. + *
+ *

+ * Do only use this tool if no application instance is running. Otherwise, the application will get into an invalid state and crash or produce wrong results. + *

+ * If you run multiple application instances, running this tool once is sufficient. + * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance (to clean local state store directory). + * Otherwise, your application is in an invalid state. + *

+ * User output topics will not be deleted or modified by this tool. + * If downstream applications consume intermediate or output topics, it is the user's responsibility to adjust those applications manually if required. + */ +public class StreamsResetter { + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_ERROR = 1; + + private static OptionSpec bootstrapServerOption; + private static OptionSpec zookeeperOption; + private static OptionSpec applicationIdOption; + private static OptionSpec inputTopicsOption; + private static OptionSpec intermediateTopicsOption; + + private OptionSet options = null; + private final Properties consumerConfig = new Properties(); + private final List allTopics = new LinkedList<>(); + + public int run(final String[] args) { + return run(args, new Properties()); + } + + public int run(final String[] args, final Properties config) { + this.consumerConfig.clear(); + this.consumerConfig.putAll(config); + + int exitCode = EXIT_CODE_SUCCESS; + + ZkUtils zkUtils = null; + try { + parseArguments(args); + + zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + + this.allTopics.clear(); + this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + + resetInputAndInternalTopicOffsets(); + seekToEndIntermediateTopics(); + deleteInternalTopics(zkUtils); + } catch (final Exception e) { + exitCode = EXIT_CODE_ERROR; + System.err.println("ERROR: " + e.getMessage()); + } finally { + if (zkUtils != null) { + zkUtils.close(); + } + } + + return exitCode; + } + + private void parseArguments(final String[] args) throws IOException { + final OptionParser optionParser = new OptionParser(); + applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)") + .withRequiredArg() + .ofType(String.class) + .describedAs("id") + .required(); + bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("localhost:9092") + .describedAs("urls"); + zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("localhost:2181") + .describedAs("url"); + inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + + try { + this.options = optionParser.parse(args); + } catch (final OptionException e) { + optionParser.printHelpOn(System.err); + throw e; + } + } + + private void resetInputAndInternalTopicOffsets() { + final List inputTopics = this.options.valuesOf(inputTopicsOption); + + if (inputTopics.size() == 0) { + System.out.println("No input topics specified."); + } else { + System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics."); + } + + final Properties config = new Properties(); + config.putAll(this.consumerConfig); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption)); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption)); + config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + for (final String inTopic : inputTopics) { + if (!this.allTopics.contains(inTopic)) { + System.out.println("Input topic " + inTopic + " not found. Skipping."); + } + } + + for (final String topic : this.allTopics) { + if (isInputTopic(topic) || isInternalTopic(topic)) { + System.out.println("Topic: " + topic); + + try (final KafkaConsumer client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + client.subscribe(Collections.singleton(topic)); + client.poll(1); + + final Set partitions = client.assignment(); + client.seekToBeginning(partitions); + for (final TopicPartition p : partitions) { + client.position(p); + } + client.commitSync(); + } catch (final RuntimeException e) { + System.err.println("ERROR: Resetting offsets for topic " + topic + " failed."); + throw e; + } + } + } + + System.out.println("Done."); + } + + private boolean isInputTopic(final String topic) { + return this.options.valuesOf(inputTopicsOption).contains(topic); + } + + private void seekToEndIntermediateTopics() { + final List intermediateTopics = this.options.valuesOf(intermediateTopicsOption); + + if (intermediateTopics.size() == 0) { + System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets."); + return; + } + + System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics); + + final Properties config = new Properties(); + config.putAll(this.consumerConfig); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption)); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption)); + config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + for (final String topic : intermediateTopics) { + if (this.allTopics.contains(topic)) { + System.out.println("Topic: " + topic); + + try (final KafkaConsumer client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + client.subscribe(Collections.singleton(topic)); + client.poll(1); + + final Set partitions = client.assignment(); + client.seekToEnd(partitions); + for (final TopicPartition p : partitions) { + client.position(p); + } + client.commitSync(); + } catch (final RuntimeException e) { + System.err.println("ERROR: Seek-to-end for topic " + topic + " failed."); + throw e; + } + } else { + System.out.println("Topic " + topic + " not found. Skipping."); + } + } + + System.out.println("Done."); + } + + private void deleteInternalTopics(final ZkUtils zkUtils) { + System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption)); + + for (final String topic : this.allTopics) { + if (isInternalTopic(topic)) { + final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ + "--zookeeper", this.options.valueOf(zookeeperOption), + "--delete", "--topic", topic}); + try { + TopicCommand.deleteTopic(zkUtils, commandOptions); + } catch (final RuntimeException e) { + System.err.println("ERROR: Deleting topic " + topic + " failed."); + throw e; + } + } + } + + System.out.println("Done."); + } + + private boolean isInternalTopic(final String topicName) { + return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-") + && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")); + } + + public static void main(final String[] args) { + System.exit(new StreamsResetter().run(args)); + } + +} From 1478653f08e3cfefe7f080572a41c38b3941775c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 28 Jul 2016 00:28:28 +0200 Subject: [PATCH 2/2] fixed cherry-pick issues --- checkstyle/import-control.xml | 2 +- .../kafka/streams/KafkaStreamsTest.java | 2 +- .../integration/ResetIntegrationTest.java | 8 +- .../utils/IntegrationTestUtils.java | 133 ++++++++++-------- .../integration/utils/KafkaEmbedded.java | 105 ++++++++------ 5 files changed, 140 insertions(+), 110 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 72247acf59294..1052d8e43ed72 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -150,7 +150,7 @@ - + diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index f33cb3a569dfb..af7e681b0ddc3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -132,7 +132,7 @@ public void testCleanupIsolation() throws Exception { final String appId1 = "testIsolation-1"; final String appId2 = "testIsolation-2"; - final String stateDir = TestUtils.tempDirectory().getPath(); + final String stateDir = TestUtils.tempDirectory("kafka-test").getPath(); final File stateDirApp1 = new File(stateDir + File.separator + appId1); final File stateDirApp2 = new File(stateDir + File.separator + appId2); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index bffdfd937a867..28be86861f2e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -118,7 +118,7 @@ private Properties prepareTest() throws Exception { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getPath()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); @@ -176,8 +176,7 @@ public KeyValue apply(final Long key, final String value) { return new KeyValue<>(key, value); } }) - .groupByKey() - .count("global-count"); + .countByKey("global-count"); globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); final KStream windowedCounts = input @@ -192,8 +191,7 @@ public KeyValue apply(final Long key, final String value) { return new KeyValue<>(key, value); } }) - .groupByKey() - .count(TimeWindows.of(35).advanceBy(10), "count") + .countByKey(TimeWindows.of("count", 35).advanceBy(10)) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index c3f90897fcc2c..14b24aac4b32a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -55,10 +55,10 @@ public class IntegrationTestUtils { * @param maxMessages Maximum number of messages to read via the consumer. * @return The values retrieved via the consumer. */ - public static List readValues(String topic, Properties consumerConfig, int maxMessages) { - List returnList = new ArrayList<>(); - List> kvs = readKeyValues(topic, consumerConfig, maxMessages); - for (KeyValue kv : kvs) { + public static List readValues(final String topic, final Properties consumerConfig, final int maxMessages) { + final List returnList = new ArrayList<>(); + final List> kvs = readKeyValues(topic, consumerConfig, maxMessages); + for (final KeyValue kv : kvs) { returnList.add(kv.value); } return returnList; @@ -72,7 +72,7 @@ public static List readValues(String topic, Properties consumerConfig, in * @param consumerConfig Kafka consumer configuration * @return The KeyValue elements retrieved via the consumer. */ - public static List> readKeyValues(String topic, Properties consumerConfig) { + public static List> readKeyValues(final String topic, final Properties consumerConfig) { return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES); } @@ -85,17 +85,17 @@ public static List> readKeyValues(String topic, Properties * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ - public static List> readKeyValues(String topic, Properties consumerConfig, int maxMessages) { - KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); + public static List> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) { + final KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); consumer.subscribe(Collections.singletonList(topic)); - int pollIntervalMs = 100; - int maxTotalPollTimeMs = 2000; + final int pollIntervalMs = 100; + final int maxTotalPollTimeMs = 2000; int totalPollTimeMs = 0; - List> consumedValues = new ArrayList<>(); + final List> consumedValues = new ArrayList<>(); while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; - ConsumerRecords records = consumer.poll(pollIntervalMs); - for (ConsumerRecord record : records) { + final ConsumerRecords records = consumer.poll(pollIntervalMs); + for (final ConsumerRecord record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); } } @@ -103,7 +103,7 @@ public static List> readKeyValues(String topic, Properties return consumedValues; } - private static boolean continueConsuming(int messagesConsumed, int maxMessages) { + private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) { return maxMessages <= 0 || messagesConsumed < maxMessages; } @@ -112,10 +112,10 @@ private static boolean continueConsuming(int messagesConsumed, int maxMessages) * * @param streamsConfiguration Streams configuration settings */ - public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException { - String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); + public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException { + final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); if (path != null) { - File node = Paths.get(path).normalize().toFile(); + final File node = Paths.get(path).normalize().toFile(); // Only purge state when it's under /tmp. This is a safety net to prevent accidentally // deleting important local directory trees. if (node.getAbsolutePath().startsWith("/tmp")) { @@ -132,11 +132,11 @@ public static void purgeLocalStreamsState(Properties streamsConfiguration) throw * @param Value type of the data records */ public static void produceKeyValuesSynchronously( - String topic, Collection> records, Properties producerConfig) + final String topic, final Collection> records, final Properties producerConfig) throws ExecutionException, InterruptedException { - Producer producer = new KafkaProducer<>(producerConfig); - for (KeyValue record : records) { - Future f = producer.send( + final Producer producer = new KafkaProducer<>(producerConfig); + for (final KeyValue record : records) { + final Future f = producer.send( new ProducerRecord<>(topic, record.key, record.value)); f.get(); } @@ -144,86 +144,103 @@ public static void produceKeyValuesSynchronously( producer.close(); } + public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, + final Collection> records, + final Properties producerConfig, + final Long timestamp) + throws ExecutionException, InterruptedException { + final Producer producer = new KafkaProducer<>(producerConfig); + for (final KeyValue record : records) { + final Future f = producer.send( + new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); + f.get(); + } + producer.flush(); + producer.close(); + } + public static void produceValuesSynchronously( - String topic, Collection records, Properties producerConfig) + final String topic, final Collection records, final Properties producerConfig) throws ExecutionException, InterruptedException { - Collection> keyedRecords = new ArrayList<>(); - for (V value : records) { - KeyValue kv = new KeyValue<>(null, value); + final Collection> keyedRecords = new ArrayList<>(); + for (final V value : records) { + final KeyValue kv = new KeyValue<>(null, value); keyedRecords.add(kv); } produceKeyValuesSynchronously(topic, keyedRecords, producerConfig); } - public static List> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, - String topic, - int expectedNumRecords) throws InterruptedException { + public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords) throws InterruptedException { return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } /** * Wait until enough data (key-value records) has been consumed. - * @param consumerConfig Kafka Consumer configuration - * @param topic Topic to consume from + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from * @param expectedNumRecords Minimum number of expected records - * @param waitTime Upper bound in waiting time in milliseconds + * @param waitTime Upper bound in waiting time in milliseconds * @return All the records consumed, or null if no records are consumed * @throws InterruptedException - * @throws AssertionError if the given wait time elapses + * @throws AssertionError if the given wait time elapses */ - public static List> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, - String topic, - int expectedNumRecords, - long waitTime) throws InterruptedException { - List> accumData = new ArrayList<>(); - long startTime = System.currentTimeMillis(); + public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime) throws InterruptedException { + final List> accumData = new ArrayList<>(); + final long startTime = System.currentTimeMillis(); while (true) { - List> readData = readKeyValues(topic, consumerConfig); + final List> readData = readKeyValues(topic, consumerConfig); accumData.addAll(readData); if (accumData.size() >= expectedNumRecords) return accumData; if (System.currentTimeMillis() > startTime + waitTime) - throw new AssertionError("Expected " + expectedNumRecords + + throw new AssertionError("Expected " + expectedNumRecords + " but received only " + accumData.size() + " records before timeout " + waitTime + " ms"); Thread.sleep(Math.min(waitTime, 100L)); } } - public static List waitUntilMinValuesRecordsReceived(Properties consumerConfig, - String topic, - int expectedNumRecords) throws InterruptedException { + public static List waitUntilMinValuesRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords) throws InterruptedException { return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } /** * Wait until enough data (value records) has been consumed. - * @param consumerConfig Kafka Consumer configuration - * @param topic Topic to consume from + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from * @param expectedNumRecords Minimum number of expected records - * @param waitTime Upper bound in waiting time in milliseconds + * @param waitTime Upper bound in waiting time in milliseconds * @return All the records consumed, or null if no records are consumed * @throws InterruptedException - * @throws AssertionError if the given wait time elapses + * @throws AssertionError if the given wait time elapses */ - public static List waitUntilMinValuesRecordsReceived(Properties consumerConfig, - String topic, - int expectedNumRecords, - long waitTime) throws InterruptedException { - List accumData = new ArrayList<>(); - long startTime = System.currentTimeMillis(); + public static List waitUntilMinValuesRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime) throws InterruptedException { + final List accumData = new ArrayList<>(); + final long startTime = System.currentTimeMillis(); while (true) { - List readData = readValues(topic, consumerConfig, expectedNumRecords); + final List readData = readValues(topic, consumerConfig, expectedNumRecords); accumData.addAll(readData); if (accumData.size() >= expectedNumRecords) return accumData; if (System.currentTimeMillis() > startTime + waitTime) - throw new AssertionError("Expected " + expectedNumRecords + + throw new AssertionError("Expected " + expectedNumRecords + " but received only " + accumData.size() + " records before timeout " + waitTime + " ms"); Thread.sleep(Math.min(waitTime, 100L)); } } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 348b46b5c7112..8e0d11c07c881 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -18,20 +18,6 @@ package org.apache.kafka.streams.integration.utils; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; - -import java.io.File; -import java.util.Collections; -import java.util.List; - import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; @@ -42,11 +28,23 @@ import kafka.utils.TestUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + /** * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by * default. - * + *

* Requires a running ZooKeeper instance to connect to. */ public class KafkaEmbedded { @@ -63,20 +61,21 @@ public class KafkaEmbedded { /** * Creates and starts an embedded Kafka broker. + * * @param config Broker configuration settings. Used to modify, for example, on which port the * broker should listen to. Note that you cannot change the `log.dirs` setting * currently. */ - public KafkaEmbedded(Properties config) throws IOException { - tmpFolder = new TemporaryFolder(); - tmpFolder.create(); - logDir = tmpFolder.newFolder(); - effectiveConfig = effectiveConfigFrom(config); - boolean loggingEnabled = true; - KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + public KafkaEmbedded(final Properties config) throws IOException { + this.tmpFolder = new TemporaryFolder(); + this.tmpFolder.create(); + this.logDir = this.tmpFolder.newFolder(); + this.effectiveConfig = effectiveConfigFrom(config); + final boolean loggingEnabled = true; + final KafkaConfig kafkaConfig = new KafkaConfig(this.effectiveConfig, loggingEnabled); log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", - logDir, zookeeperConnect()); - kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); + this.logDir, zookeeperConnect()); + this.kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect()); } @@ -85,12 +84,13 @@ public KafkaEmbedded(Properties config) throws IOException { /** * Creates the configuration for starting the Kafka broker by merging default values with * overwrites. + * * @param initialConfig Broker configuration settings that override the default config. * @return * @throws IOException */ - private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { - Properties effectiveConfig = new Properties(); + private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException { + final Properties effectiveConfig = new Properties(); effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0); effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1"); effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092"); @@ -100,17 +100,17 @@ private Properties effectiveConfigFrom(Properties initialConfig) throws IOExcept effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); effectiveConfig.putAll(initialConfig); - effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath()); + effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), this.logDir.getAbsolutePath()); return effectiveConfig; } /** * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. - * + *

* You can use this to tell Kafka producers and consumers how to connect to this instance. */ public String brokerList() { - return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT); + return this.kafka.config().hostName() + ":" + this.kafka.boundPort(SecurityProtocol.PLAINTEXT); } @@ -118,7 +118,7 @@ public String brokerList() { * The ZooKeeper connection string aka `zookeeper.connect`. */ public String zookeeperConnect() { - return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); + return this.effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); } /** @@ -127,12 +127,12 @@ public String zookeeperConnect() { public void stop() { log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect()); - kafka.shutdown(); - kafka.awaitShutdown(); - log.debug("Removing logs.dir at {} ...", logDir); - List logDirs = Collections.singletonList(logDir.getAbsolutePath()); + this.kafka.shutdown(); + this.kafka.awaitShutdown(); + log.debug("Removing logs.dir at {} ...", this.logDir); + final List logDirs = Collections.singletonList(this.logDir.getAbsolutePath()); CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq()); - tmpFolder.delete(); + this.tmpFolder.delete(); log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect()); } @@ -142,7 +142,7 @@ public void stop() { * * @param topic The name of the topic. */ - public void createTopic(String topic) { + public void createTopic(final String topic) { createTopic(topic, 1, 1, new Properties()); } @@ -153,7 +153,7 @@ public void createTopic(String topic) { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(String topic, int partitions, int replication) { + public void createTopic(final String topic, final int partitions, final int replication) { createTopic(topic, partitions, replication, new Properties()); } @@ -165,10 +165,10 @@ public void createTopic(String topic, int partitions, int replication) { * @param replication The replication factor for (partitions of) this topic. * @param topicConfig Additional topic-level configuration settings. */ - public void createTopic(String topic, - int partitions, - int replication, - Properties topicConfig) { + public void createTopic(final String topic, + final int partitions, + final int replication, + final Properties topicConfig) { log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", topic, partitions, replication, topicConfig); @@ -176,14 +176,29 @@ public void createTopic(String topic, // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. - ZkClient zkClient = new ZkClient( + final ZkClient zkClient = new ZkClient( zookeeperConnect(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); - boolean isSecure = false; - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + final boolean isSecure = false; + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); zkClient.close(); } -} \ No newline at end of file + + public void deleteTopic(final String topic) { + log.debug("Deleting topic { name: {} }", topic); + + final ZkClient zkClient = new ZkClient( + zookeeperConnect(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + final boolean isSecure = false; + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + AdminUtils.deleteTopic(zkUtils, topic); + zkClient.close(); + } + +}