From 7e9e6733dcaa7d78a4500ce761398df32590d9f6 Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 22 Oct 2016 20:41:42 +0800 Subject: [PATCH 01/24] kafka-4295: kafka-console-consumer.sh does not delete the temporary group in zookeeper Author: huxi Since consumer stop logic and zk node removal code are in separate threads, so when two threads execute in an interleaving manner, persistent node '/consumers/' might not be removed for those console consumer groups which do not specify "group.id". This will pollute Zookeeper with lots of inactive console consumer offset information. --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 361bef29585b0..acdce5e28cb60 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -104,7 +104,10 @@ object ConsoleConsumer extends Logging { Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { consumer.stop() - + // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack + if (!conf.groupIdPassed && conf.options.has(conf.zkConnectOpt)) + ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) + shutdownLatch.await() if (conf.enableSystestEventsLogging) { From d2767f8cfe22cbc2f394fffc7e398bb47f5c6680 Mon Sep 17 00:00:00 2001 From: amethystic Date: Thu, 22 Dec 2016 15:37:36 +0800 Subject: [PATCH 02/24] kafka-ConsoleConsumer does not delete the temporary group in zookeeper 1. Remove same cleanup code from within the JVM shutdown hook code block 2. Refine ZkUtils.maybeDeletePath to capture ZkException if to-be-delete znode is not empty --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 8 +++----- core/src/main/scala/kafka/utils/ZkUtils.scala | 8 +++++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 03fc9183c47a8..5a0955884e6ce 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -79,8 +79,9 @@ object ConsoleConsumer extends Logging { reportRecordCount() // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack - if (!conf.groupIdPassed) + if (!conf.groupIdPassed && conf.options.has(conf.zkConnectOpt)) { ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) + } shutdownLatch.countDown() } @@ -104,10 +105,7 @@ object ConsoleConsumer extends Logging { Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { consumer.stop() - // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack - if (!conf.groupIdPassed && conf.options.has(conf.zkConnectOpt)) - ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) - + shutdownLatch.await() if (conf.enableSystestEventsLogging) { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fcb564867e0af..e027db4ba1ff6 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} -import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.KeeperException.{Code, NotEmptyException} import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper} @@ -89,12 +89,14 @@ object ZkUtils { } def maybeDeletePath(zkUrl: String, dir: String) { + val zk = createZkClient(zkUrl, 30*1000, 30*1000) try { - val zk = createZkClient(zkUrl, 30*1000, 30*1000) zk.deleteRecursive(dir) - zk.close() } catch { + case _: ZkException => zk.deleteRecursive(dir) case _: Throwable => // swallow + } finally { + zk.close() } } From 09438bce12c965cbe879cea7c2bbd7b222127357 Mon Sep 17 00:00:00 2001 From: amethystic Date: Thu, 22 Dec 2016 15:40:08 +0800 Subject: [PATCH 03/24] kafka-4295: ConsoleConsumer does not delete the temporary group in zookeeper remove useless imports in ZkUtils.scala --- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index e027db4ba1ff6..7d0909797c034 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} -import org.apache.zookeeper.KeeperException.{Code, NotEmptyException} +import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper} From f6458c8795d4608081bc9304df2aadfa213db719 Mon Sep 17 00:00:00 2001 From: amethystic Date: Wed, 28 Dec 2016 13:13:17 +0000 Subject: [PATCH 04/24] KAFKA-4428; Kafka does not exit if port is already bound During Acceptor initialization, if "Address already in use" error is encountered, the shutdown latch in each Processor is never counted down. Consequently, the Kafka server hangs when `Processor.shutdown` is called. Author: huxi Author: amethystic Reviewers: Jun Rao , Ismael Juma Closes #2156 from amethystic/kafka-4428_Kafka_noexit_for_port_already_use --- .../scala/kafka/network/SocketServer.scala | 21 ++++++++++----- .../unit/kafka/server/ServerStartupTest.scala | 26 ++++++++++++++++--- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 55061edb68f2d..95c5fdfd348f7 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -166,10 +166,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { private val startupLatch = new CountDownLatch(1) - private val shutdownLatch = new CountDownLatch(1) + + // `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor + // (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open + // latch and then replace it in `startupComplete()`. + @volatile private var shutdownLatch = new CountDownLatch(0) + private val alive = new AtomicBoolean(true) - def wakeup() + def wakeup(): Unit /** * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete @@ -188,24 +193,26 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ /** * Record that the thread startup is complete */ - protected def startupComplete() = { + protected def startupComplete(): Unit = { + // Replace the open latch with a closed one + shutdownLatch = new CountDownLatch(1) startupLatch.countDown() } /** * Record that the thread shutdown is complete */ - protected def shutdownComplete() = shutdownLatch.countDown() + protected def shutdownComplete(): Unit = shutdownLatch.countDown() /** * Is the server still running? */ - protected def isRunning = alive.get + protected def isRunning: Boolean = alive.get /** * Close the connection identified by `connectionId` and decrement the connection count. */ - def close(selector: KSelector, connectionId: String) { + def close(selector: KSelector, connectionId: String): Unit = { val channel = selector.channel(connectionId) if (channel != null) { debug(s"Closing selector connection $connectionId") @@ -219,7 +226,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ /** * Close `channel` and decrement the connection count. */ - def close(channel: SocketChannel) { + def close(channel: SocketChannel): Unit = { if (channel != null) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) connectionQuotas.dec(channel.socket.getInetAddress) diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 86b167ea8d19a..92c6a9be8e55e 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,9 +17,7 @@ package kafka.server -import kafka.utils.ZkUtils -import kafka.utils.CoreUtils -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.easymock.EasyMock import org.junit.Assert._ @@ -43,6 +41,28 @@ class ServerStartupTest extends ZooKeeperTestHarness { CoreUtils.delete(server.config.logDirs) } + @Test + def testConflictBrokerStartupWithSamePort { + // Create and start first broker + val brokerId1 = 0 + val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect) + val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1)) + val port = server1.boundPort() + + // Create a second broker with same port + val brokerId2 = 1 + val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, port = port) + try { + TestUtils.createServer(KafkaConfig.fromProps(props2)) + fail("Starting a broker with the same port should fail") + } catch { + case _: RuntimeException => // expected + } finally { + server1.shutdown() + CoreUtils.delete(server1.config.logDirs) + } + } + @Test def testConflictBrokerRegistration { // Try starting a broker with the a conflicting broker id. From f7e092c09241ea63e28d2111aff0ea1a5bc8b6f9 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Thu, 29 Dec 2016 12:03:24 -0500 Subject: [PATCH 05/24] MINOR: Sync up 'kafka-run-class.bat' with 'kafka-run-class.sh' Some of the recent changes to `kafka-run-class.sh` have not been applied to `kafka-run-class.bat`. These recent changes include setting proper streams or connect classpaths. So any streams or connect use case that leverages `kafka-run-class.bat` would fail with an error like ``` Error: Could not find or load main class org.apache.kafka.streams.??? ``` Author: Vahid Hashemian Reviewers: Ewen Cheslack-Postava Closes #2238 from vahidhashemian/minor/sync_up_kafka-run-class.bat --- bin/windows/kafka-run-class.bat | 37 ++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) mode change 100644 => 100755 bin/windows/kafka-run-class.bat diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat old mode 100644 new mode 100755 index 9e4c8652f57f5..4f85301286589 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -47,21 +47,48 @@ for %%i in (%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( call :concat %%i ) -rem Classpath addition for kafka-perf dependencies -for %%i in (%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( +rem Classpath addition for kafka-examples +for %%i in (%BASE_DIR%\examples\build\libs\kafka-examples*.jar) do ( call :concat %%i ) rem Classpath addition for kafka-clients -for %%i in (%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do ( +for %%i in (%BASE_DIR%\clients\build\libs\kafka-clients*.jar) do ( call :concat %%i ) -rem Classpath addition for kafka-examples -for %%i in (%BASE_DIR%\examples\build\libs\kafka-examples-*.jar) do ( +rem Classpath addition for kafka-streams +for %%i in (%BASE_DIR%\streams\build\libs\kafka-streams*.jar) do ( + call :concat %%i +) + +rem Classpath addition for kafka-streams-examples +for %%i in (%BASE_DIR%\streams\examples\build\libs\kafka-streams-examples*.jar) do ( + call :concat %%i +) + +for %%i in (%BASE_DIR%\streams\build\dependant-libs-%SCALA_VERSION%\rocksdb*.jar) do ( + call :concat %%i +) + +rem Classpath addition for kafka tools +for %%i in (%BASE_DIR%\tools\build\libs\kafka-tools*.jar) do ( + call :concat %%i +) + +for %%i in (%BASE_DIR%\tools\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( call :concat %%i ) +for %%p in (api runtime file json tools) do ( + for %%i in (%BASE_DIR%\connect\%%p\build\libs\connect-%%p*.jar) do ( + call :concat %%i + ) + if exist "%BASE_DIR%\connect\%%p\build\dependant-libs\*" ( + call :concat %BASE_DIR%\connect\%%p\build\dependant-libs\* + ) +) + rem Classpath addition for release call :concat %BASE_DIR%\libs\* From 689d5c9c268e1f157230487b9440fde35e8ac65a Mon Sep 17 00:00:00 2001 From: Edward Ribeiro Date: Thu, 29 Dec 2016 21:05:11 -0800 Subject: [PATCH 06/24] MINOR: Mx4jLoader always returns false even if mx4j is loaded & started Mx4jLoader.scala should explicitly `return true` if the class is successfully loaded and started, otherwise it will return false even if the class is loaded. Author: Edward Ribeiro Reviewers: Ewen Cheslack-Postava Closes #2295 from eribeiro/mx4jloader-bug --- core/src/main/scala/kafka/utils/Mx4jLoader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 5d2549e840d33..a959c4a0011e4 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -58,7 +58,7 @@ object Mx4jLoader extends Logging { mbs.registerMBean(xsltProcessor, processorName) httpAdaptorClass.getMethod("start").invoke(httpAdaptor) info("mx4j successfuly loaded") - true + return true } catch { case _: ClassNotFoundException => From d9e72ae3ff88ac9570da479d22ed8b5d70d42c25 Mon Sep 17 00:00:00 2001 From: LoneRifle Date: Thu, 29 Dec 2016 21:14:26 -0800 Subject: [PATCH 07/24] MINOR: Rephrase Javadoc summary for ConsumerRecord The original Javadoc description for `ConsumerRecord` is slightly confusing in that it can be read in a way such that an object is a key value pair received from Kafka, but (only) consists of the metadata associated with the record. This PR makes it clearer that the metadata is _included_ with the record, and moves the comma so that the phrase "topic name and partition number" in the sentence is more closely associated with the phrase "from which the record is being received". Author: LoneRifle Reviewers: Ismael Juma , Ewen Cheslack-Postava Closes #2290 from LoneRifle/patch-1 --- .../org/apache/kafka/clients/consumer/ConsumerRecord.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index c8aef5390fbaf..5f1015512285c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -16,8 +16,9 @@ import org.apache.kafka.common.record.TimestampType; /** - * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the - * record is being received and an offset that points to the record in a Kafka partition. + * A key/value pair to be received from Kafka. This also consists of a topic name and + * a partition number from which the record is being received, an offset that points + * to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord. */ public class ConsumerRecord { public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP; From 74449b4d4ef78076291ce00bebdef9d47ea69e4e Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 31 Dec 2016 10:45:47 +0000 Subject: [PATCH 08/24] KAFKA-4351; MirrorMaker with new consumer should support comma-separated regex This makes it consistent with MirrorMaker with the old consumer. Author: huxi Author: amethystic Reviewers: Vahid Hashemian , Ismael Juma Closes #2072 from amethystic/kafka-4351_Regex_behavior_change_for_new_consumer --- .../main/scala/kafka/tools/MirrorMaker.scala | 17 +++-- .../tools/MirrorMakerIntegrationTest.scala | 73 +++++++++++++++++++ docs/upgrade.html | 22 +++--- 3 files changed, 92 insertions(+), 20 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 19a2570adfa84..42456f76a2e07 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -20,7 +20,7 @@ package kafka.tools import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.regex.{Pattern, PatternSyntaxException} +import java.util.regex.Pattern import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge @@ -64,7 +64,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig} */ object MirrorMaker extends Logging with KafkaMetricsGroup { - private var producer: MirrorMakerProducer = null + private[tools] var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) // Track the messages not successfully sent by mirror maker. @@ -574,7 +574,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], + // Only for testing + private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener], whitelistOpt: Option[String]) extends MirrorMakerBaseConsumer { @@ -589,12 +590,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def init() { debug("Initiating new consumer") val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) - if (whitelistOpt.isDefined) { + whitelistOpt.foreach { whitelist => try { - consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) + consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener) } catch { - case pse: PatternSyntaxException => - error("Invalid expression syntax: %s".format(whitelistOpt.get)) + case pse: RuntimeException => + error(s"Invalid expression syntax: $whitelist") throw pse } } @@ -686,7 +687,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerProducer(val producerProps: Properties) { + private[tools] class MirrorMakerProducer(val producerProps: Properties) { val sync = producerProps.getProperty("producer.type", "async").equals("sync") diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala new file mode 100644 index 0000000000000..465e8de0e8bce --- /dev/null +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.util.Properties + +import kafka.consumer.ConsumerTimeoutException +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer} +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.junit.Test + +class MirrorMakerIntegrationTest extends KafkaServerTestHarness { + + override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(KafkaConfig.fromProps(_, new Properties())) + + @Test + def testCommaSeparatedRegex(): Unit = { + val topic = "new-topic" + val msg = "a test message" + val brokerList = TestUtils.getBrokerListStrFromServers(servers) + + val producerProps = new Properties + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put("producer.type", "sync") + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) + val producer = new MirrorMakerProducer(producerProps) + MirrorMaker.producer = producer + MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) + MirrorMaker.producer.close() + + val consumerProps = new Properties + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer) + + val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelistOpt = Some("another_topic,new.*,foo")) + mirrorMakerConsumer.init() + try { + TestUtils.waitUntilTrue(() => { + try { + val data = mirrorMakerConsumer.receive() + data.topic == topic && new String(data.value) == msg + } catch { + // this exception is thrown if no record is returned within a short timeout, so safe to ignore + case _: ConsumerTimeoutException => false + } + }, "MirrorMaker consumer should read the expected message from the expected topic within the timeout") + } finally consumer.close() + } + +} diff --git a/docs/upgrade.html b/docs/upgrade.html index 06b53da482e78..3e2c52e27618a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -14,19 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. --> -

Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.0 to 0.10.2.0

-Users upgrading from versions prior to 0.10.1.0 should follow the upgrade guide here. Users upgrading from 0.10.1.0 +

Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0

+Users upgrading from versions prior to 0.10.1.x should follow the upgrade guide here. Users upgrading from 0.10.1.0 can upgrade the brokers one at a time: shut down the broker, update the code, and restart it. -
-0.10.2.0 has Potential breaking changes (Please review before upgrading). - -
Potential breaking changes in 0.10.2.0
-
    -
  • Several methods on the Java consumer may now throw InterruptException if the calling thread is interrupted. - Please refer to the KafkaConsumer Javadoc for a more in-depth explanation of this change.
  • -
- -

Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0

For a rolling upgrade:

@@ -39,6 +29,14 @@

Upgrading from 0.8.x, 0.9.x, 0.10. +
Notable changes in 0.10.2.0
+
    +
  • Several methods on the Java consumer may now throw InterruptException if the calling thread is interrupted. + Please refer to the KafkaConsumer Javadoc for a more in-depth explanation of this change.
  • +
  • Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This + makes the behaviour consistent with MirrorMaker when used the old Scala consumer.
  • +
+

Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0

0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please notice the Potential breaking changes in 0.10.1.0 before upgrade. From fc7141f13b35b67a4c12fe016559592a5eded845 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Sat, 31 Dec 2016 10:49:27 -0800 Subject: [PATCH 09/24] KAFKA-4480: Report an error in 'kafka-configs' command if the config to be removed does not exist Author: Vahid Hashemian Reviewers: Guozhang Wang Closes #2218 from vahidhashemian/KAFKA-4480 --- .../main/scala/kafka/admin/ConfigCommand.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index aa3780e9d6c77..9c4c5ed1c5d88 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -21,13 +21,12 @@ import java.util.Properties import joptsimple._ import kafka.common.Config import kafka.log.LogConfig -import kafka.server.{ConfigEntityName, QuotaId} -import kafka.server.{DynamicConfig, ConfigType} +import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId} import kafka.utils.{CommandLineUtils, ZkUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils -import scala.collection.JavaConverters._ import scala.collection._ +import scala.collection.JavaConverters._ /** @@ -83,8 +82,15 @@ object ConfigCommand extends Config { // compile the final set of configs val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName) + configs.putAll(configsToBeAdded) - configsToBeDeleted.foreach(config => configs.remove(config)) + configsToBeDeleted.foreach { config => + // log an error if the config to be deleted does not exist + if (!configs.containsKey(config)) + System.err.println(s"Deleting config '$config' from entity '$entityName' of type '$entityType' failed, because the specified config does not exist.") + else + configs.remove(config) + } entityType match { case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, configs) @@ -93,7 +99,7 @@ object ConfigCommand extends Config { case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs) case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}") } - println(s"Updated config for entity: $entity.") + println(s"Completed Updating config for entity: $entity.") } private def parseBroker(broker: String): Int = { From a6f7e81f990e1d6c790669d0ee7340ea24f8c015 Mon Sep 17 00:00:00 2001 From: Himani Arora <1himani.arora@gmail.com> Date: Tue, 3 Jan 2017 11:07:43 +0000 Subject: [PATCH 10/24] MINOR: Scala code readability improvements Author: Himani Arora <1himani.arora@gmail.com> Reviewers: Ismael Juma Closes #2297 from himani1/refactored_code --- .../scala/unit/kafka/admin/AclCommandTest.scala | 2 +- .../kafka/admin/DescribeConsumerGroupTest.scala | 16 +++++++--------- .../unit/kafka/admin/ListConsumerGroupTest.scala | 2 +- .../unit/kafka/admin/ReplicationQuotaUtils.scala | 2 +- .../test/scala/unit/kafka/common/TopicTest.scala | 4 ++-- .../unit/kafka/consumer/TopicFilterTest.scala | 10 +++++----- .../integration/KafkaServerTestHarness.scala | 2 +- 7 files changed, 18 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 8ea604120eafe..6d89d4fb19aa6 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -45,7 +45,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( TopicResources -> (Set(Read, Write, Describe, Delete), Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete")), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operation", "Create", "--operation", "ClusterAction")), - GroupResources -> (Set(Read, Describe).toSet[Operation], Array("--operation", "Read", "--operation", "Describe")) + GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")) ) private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]]( diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index b9c760df439c2..a6ebbe3016d4c 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -35,7 +35,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val overridingProps = new Properties() val topic = "foo" - val topicFilter = new Whitelist(topic) + val topicFilter = Whitelist(topic) val group = "test.group" val props = new Properties @@ -64,9 +64,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { EasyMock.replay(consumerMock) // action/test - TestUtils.waitUntilTrue(() => { - !consumerGroupCommand.describeGroup()._2.isDefined - }, "Expected no rows in describe group results.") + TestUtils.waitUntilTrue(() => consumerGroupCommand.describeGroup()._2.isEmpty, "Expected no rows in describe group results.") // cleanup consumerGroupCommand.close() @@ -89,7 +87,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupCommand.describeGroup() assignments.isDefined && - assignments.get.filter(_.group == group).size == 1 && + assignments.get.count(_.group == group) == 1 && assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.nonEmpty) }, "Expected rows and a member id column in describe group results.") @@ -116,7 +114,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupCommand.describeGroup() assignments.isDefined && - assignments.get.filter(_.group == group).size == 1 && + assignments.get.count(_.group == group) == 1 && assignments.get.filter(_.group == group).head.consumerId.isDefined && assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.isEmpty) // the member should be gone }, "Expected no active member in describe group results.") @@ -142,9 +140,9 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupCommand.describeGroup() assignments.isDefined && - assignments.get.filter(_.group == group).size == 2 && - assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 && - assignments.get.filter{ x => x.group == group && !x.partition.isDefined}.size == 1 + assignments.get.count(_.group == group) == 2 && + assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 && + assignments.get.count { x => x.group == group && !x.partition.isDefined } == 1 }, "Expected rows for consumers with no assigned partitions in describe group results.") // cleanup diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala index f4494c7911fe7..8759cfd55df95 100644 --- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala @@ -35,7 +35,7 @@ class ListConsumerGroupTest extends KafkaServerTestHarness { val overridingProps = new Properties() val topic = "foo" - val topicFilter = new Whitelist(topic) + val topicFilter = Whitelist(topic) val group = "test.group" val props = new Properties diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala index fff4ea18f7b3a..9608892f1863e 100644 --- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala +++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala @@ -48,7 +48,7 @@ object ReplicationQuotaUtils { val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic, topic) val leader = topicConfig.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp) val follower = topicConfig.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp) - val topicConfigAvailable = (leader == throttledLeaders && follower == throttledFollowers) + val topicConfigAvailable = leader == throttledLeaders && follower == throttledFollowers brokerConfigAvailable && topicConfigAvailable }, "throttle limit/replicas was not set") } diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala index 39eb31526318a..caab674433edb 100644 --- a/core/src/test/scala/unit/kafka/common/TopicTest.scala +++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala @@ -37,7 +37,7 @@ class TopicTest { invalidTopicNames += "Is" + weirdChar + "illegal" } - for (i <- 0 until invalidTopicNames.size) { + for (i <- invalidTopicNames.indices) { try { Topic.validate(invalidTopicNames(i)) fail("Should throw InvalidTopicException.") @@ -49,7 +49,7 @@ class TopicTest { val validTopicNames = new ArrayBuffer[String]() validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_.", longName.drop(7)) - for (i <- 0 until validTopicNames.size) { + for (i <- validTopicNames.indices) { try { Topic.validate(validTopicNames(i)) } diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 1e073aaf82c15..b1e127451d2eb 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -28,29 +28,29 @@ class TopicFilterTest extends JUnitSuite { @Test def testWhitelists() { - val topicFilter1 = new Whitelist("white1,white2") + val topicFilter1 = Whitelist("white1,white2") assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = true)) assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = false)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false)) - val topicFilter2 = new Whitelist(".+") + val topicFilter2 = Whitelist(".+") assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true)) assertFalse(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true)) assertTrue(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false)) - val topicFilter3 = new Whitelist("white_listed-topic.+") + val topicFilter3 = Whitelist("white_listed-topic.+") assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true)) assertFalse(topicFilter3.isTopicAllowed("black1", excludeInternalTopics = true)) - val topicFilter4 = new Whitelist("test-(?!bad\\b)[\\w]+") + val topicFilter4 = Whitelist("test-(?!bad\\b)[\\w]+") assertTrue(topicFilter4.isTopicAllowed("test-good", excludeInternalTopics = true)) assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true)) } @Test def testBlacklists() { - val topicFilter1 = new Blacklist("black1") + val topicFilter1 = Blacklist("black1") assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = true)) assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = false)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 7059d177ad476..f418b30c8b520 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -113,7 +113,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { * Restart any dead brokers */ def restartDeadBrokers() { - for(i <- 0 until servers.length if !alive(i)) { + for(i <- servers.indices if !alive(i)) { servers(i).startup() alive(i) = true } From 06bef3ba11bbe30ec5d4ba36c775f4c35cab7dae Mon Sep 17 00:00:00 2001 From: Balint Molnar Date: Tue, 3 Jan 2017 14:13:50 +0000 Subject: [PATCH 11/24] KAFKA-4318; Migrate ProducerSendTest to the new consumer Author: Balint Molnar Reviewers: Ismael Juma Closes #2083 from baluchicken/KAFKA-4318 --- .../kafka/api/BaseProducerSendTest.scala | 94 ++++++++----------- 1 file changed, 40 insertions(+), 54 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index b5aaaf49a0764..82409bbcd88fc 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -20,21 +20,21 @@ package kafka.api import java.util.Properties import java.util.concurrent.TimeUnit +import collection.JavaConverters._ import kafka.admin.AdminUtils -import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig -import kafka.message.Message import kafka.server.KafkaConfig import kafka.utils.TestUtils -import kafka.utils.TestUtils._ +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.record.TimestampType import org.junit.Assert._ import org.junit.{After, Before, Test} -import scala.collection.mutable.Buffer +import scala.collection.mutable.{ArrayBuffer, Buffer} abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -46,8 +46,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { trustStoreFile = trustStoreFile, saslProperties = saslProperties).map(KafkaConfig.fromProps(_, overridingProps)) } - private var consumer1: SimpleConsumer = null - private var consumer2: SimpleConsumer = null + private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _ private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() protected val topic = "topic" @@ -56,16 +55,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @Before override def setUp() { super.setUp() - - // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", servers.head.boundPort(), 100, 1024 * 1024, "") - consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024 * 1024, "") + consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT) } @After override def tearDown() { - consumer1.close() - consumer2.close() + consumer.close() // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused producers.foreach(_.close()) @@ -83,6 +78,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { producer } + private def pollUntilNumRecords(numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { + val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]() + TestUtils.waitUntilTrue(() => { + records ++= consumer.poll(50).asScala + records.size == numRecords + }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.") + records + } + /** * testSendOffset checks the basic send API behavior * @@ -260,8 +264,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val producer = createProducer(brokerList) try { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) + TestUtils.createTopic(zkUtils, topic, 2, 2, servers) val partition = 1 val now = System.currentTimeMillis() @@ -276,21 +279,20 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { assertEquals(partition, recordMetadata.partition) } - val leader1 = leaders(partition) + consumer.assign(List(new TopicPartition(topic, partition)).asJava) + // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if (leader1.get == configs.head.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + val records = pollUntilNumRecords(numRecords) + + records.zipWithIndex.foreach { case (record, i) => + assertEquals(topic, record.topic) + assertEquals(partition, record.partition) + assertEquals(i.toLong, record.offset) + assertNull(record.key) + assertEquals(s"value${i + 1}", new String(record.value)) + assertEquals(now, record.timestamp) } - val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer - assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) - // TODO: also check topic and partition after they are added in the return messageSet - for (i <- 0 until numRecords) { - assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes, now, Message.MagicValue_V1), messageSet1(i).message) - assertEquals(i.toLong, messageSet1(i).offset) - } } finally { producer.close() } @@ -384,12 +386,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { */ @Test def testCloseWithZeroTimeoutFromCallerThread() { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) - val leader0 = leaders(0) - - // create record - val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + TestUtils.createTopic(zkUtils, topic, 2, 2, servers) + val partition = 0 + consumer.assign(List(new TopicPartition(topic, partition)).asJava) + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes) // Test closing from caller thread. for (_ <- 0 until 50) { @@ -406,12 +406,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) } } - val fetchResponse = if (leader0.get == configs.head.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } - assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count) } } @@ -420,12 +415,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { */ @Test def testCloseWithZeroTimeoutFromSenderThread() { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers) - val leader = leaders(0) - - // create record - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + TestUtils.createTopic(zkUtils, topic, 1, 2, servers) + val partition = 0 + consumer.assign(List(new TopicPartition(topic, partition)).asJava) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes) // Test closing from sender thread. class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean) extends Callback { @@ -450,16 +443,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { assertTrue("No request is complete.", responses.forall(!_.isDone())) // flush the messages. producer.flush() - assertTrue("All request are complete.", responses.forall(_.isDone())) + assertTrue("All requests are complete.", responses.forall(_.isDone())) // Check the messages received by broker. - val fetchResponse = if (leader.get == configs.head.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } - val expectedNumRecords = (i + 1) * numRecords - assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords), - expectedNumRecords, fetchResponse.messageSet(topic, 0).size) + pollUntilNumRecords(numRecords) } finally { producer.close() } From 25fdb3d4b981e6277a17ea72ab02a1cc26c4f648 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 3 Jan 2017 08:34:15 -0800 Subject: [PATCH 12/24] KAFKA-4561; Ordering of operations in StreamThread.shutdownTasksAndState may void at-least-once guarantees In `shutdownTasksAndState` and `suspendTasksAndState` we commit offsets BEFORE we flush any state. This is wrong as if an exception occurs during a flush, we may violate the at-least-once guarantees, that is we would have committed some offsets but NOT sent the processed data on to other Sinks. Also during suspend and shutdown, we should try and complete all tasks even when exceptions occur. We should just keep track of the exception and rethrow it at the end if necessary. This helps with ensuring that StateStores etc are closed. Author: Damian Guy Reviewers: Guozhang Wang Closes #2281 from dguy/kafka-4561 --- .../processor/internals/AbstractTask.java | 5 +- .../internals/ProcessorStateManager.java | 48 ++-- .../processor/internals/RecordCollector.java | 5 + .../internals/RecordCollectorImpl.java | 3 +- .../internals/StandbyContextImpl.java | 7 + .../processor/internals/StreamTask.java | 20 +- .../processor/internals/StreamThread.java | 129 +++++----- .../utils/IntegrationTestUtils.java | 2 + .../internals/ProcessorStateManagerTest.java | 24 ++ .../processor/internals/StandbyTaskTest.java | 4 +- .../processor/internals/StreamTaskTest.java | 34 ++- .../processor/internals/StreamThreadTest.java | 220 +++++++++++++++++- .../StreamThreadStateStoreProviderTest.java | 7 +- .../kafka/test/MockStateStoreSupplier.java | 17 +- .../kafka/test/NoOpRecordCollector.java | 16 +- .../test/ProcessorTopologyTestDriver.java | 18 +- 16 files changed, 431 insertions(+), 128 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 127a64e80e3c6..622426d8bd436 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -122,11 +122,12 @@ public final ThreadCache cache() { /** * @throws ProcessorStateException if there is an error while closing the state manager + * @param writeCheckpoint boolean indicating if a checkpoint file should be written */ - void closeStateManager() { + void closeStateManager(final boolean writeCheckpoint) { log.trace("task [{}] Closing", id()); try { - stateMgr.close(recordCollectorOffsets()); + stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); } catch (IOException e) { throw new ProcessorStateException("Error while closing the state manager", e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 821b260cb3b67..c81df6c350454 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -357,33 +357,35 @@ public void close(Map ackedOffsets) throws IOException { } } - Map checkpointOffsets = new HashMap<>(); - for (String storeName : stores.keySet()) { - TopicPartition part; - if (loggingEnabled.contains(storeName)) - part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName)); - else - part = new TopicPartition(storeName, getPartition(storeName)); - - // only checkpoint the offset to the offsets file if it is persistent; - if (stores.get(storeName).persistent()) { - Long offset = ackedOffsets.get(part); - - if (offset != null) { - // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(part, offset + 1); - } else { - // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(part); - if (offset != null) - checkpointOffsets.put(part, offset); + if (ackedOffsets != null) { + Map checkpointOffsets = new HashMap<>(); + for (String storeName : stores.keySet()) { + TopicPartition part; + if (loggingEnabled.contains(storeName)) + part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName)); + else + part = new TopicPartition(storeName, getPartition(storeName)); + + // only checkpoint the offset to the offsets file if it is persistent; + if (stores.get(storeName).persistent()) { + Long offset = ackedOffsets.get(part); + + if (offset != null) { + // store the last offset + 1 (the log position after restoration) + checkpointOffsets.put(part, offset + 1); + } else { + // if no record was produced. we need to check the restored offset. + offset = restoredOffsets.get(part); + if (offset != null) + checkpointOffsets.put(part, offset); + } } } + // write the checkpoint file before closing, to indicate clean shutdown + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + checkpoint.write(checkpointOffsets); } - // write the checkpoint file before closing, to indicate clean shutdown - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); - checkpoint.write(checkpointOffsets); } } finally { // release the state directory directoryLock diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 6d7d56174c892..8b0dcdfe55160 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -17,9 +17,12 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; +import java.util.Map; + public interface RecordCollector { void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer); @@ -30,6 +33,8 @@ void send(ProducerRecord record, Serializer keySerializer, Seria void close(); + Map offsets(); + /** * A supplier of a {@link RecordCollectorImpl} instance. */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 31596cc0de138..d733e66368e6c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -131,7 +131,8 @@ public void close() { * * @return the map from TopicPartition to offset */ - Map offsets() { + @Override + public Map offsets() { return this.offsets; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 9ce6595364bbc..a660b153f375f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; @@ -28,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; +import java.util.Collections; import java.util.Map; public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier { @@ -52,6 +54,11 @@ public void flush() { public void close() { } + + @Override + public Map offsets() { + return Collections.emptyMap(); + } }; private final TaskId id; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ae374ce58b3c3..9369c01498a4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsConfig; @@ -55,7 +54,7 @@ public class StreamTask extends AbstractTask implements Punctuator { private final Map partitionQueues; private final Map consumedOffsets; - private final RecordCollectorImpl recordCollector; + private final RecordCollector recordCollector; private final int maxBufferedSize; private boolean commitRequested = false; @@ -70,23 +69,23 @@ public class StreamTask extends AbstractTask implements Punctuator { * @param partitions the collection of assigned {@link TopicPartition} * @param topology the instance of {@link ProcessorTopology} * @param consumer the instance of {@link Consumer} - * @param producer the instance of {@link Producer} * @param restoreConsumer the instance of {@link Consumer} used when restoring state * @param config the {@link StreamsConfig} specified by the user * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread + * @param recordCollector the instance of {@link RecordCollector} used to produce records */ public StreamTask(TaskId id, String applicationId, Collection partitions, ProcessorTopology topology, Consumer consumer, - Producer producer, Consumer restoreConsumer, StreamsConfig config, StreamsMetrics metrics, StateDirectory stateDirectory, - ThreadCache cache) { + ThreadCache cache, + final RecordCollector recordCollector) { super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); @@ -110,10 +109,10 @@ public StreamTask(TaskId id, this.consumedOffsets = new HashMap<>(); // create the record recordCollector that maintains the produced offsets - this.recordCollector = new RecordCollectorImpl(producer, id().toString()); + this.recordCollector = recordCollector; // initialize the topology with its own context - this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); + this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache); // initialize the state stores log.info("{} Initializing state stores", logPrefix); @@ -361,7 +360,6 @@ public void close() { log.debug("{} Closing processor topology", logPrefix); this.partitionGroup.close(); - this.consumedOffsets.clear(); closeTopology(); } @@ -387,5 +385,9 @@ public String toString() { return super.toString(); } - + @Override + public void flushState() { + super.flushState(); + recordCollector.flush(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f09bf546612d2..626de7c83789d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -64,6 +64,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static java.util.Collections.singleton; @@ -252,14 +253,14 @@ public void onPartitionsRevoked(Collection assignment) { try { if (state == State.PENDING_SHUTDOWN) { log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.", - StreamThread.this.getName(), assignment); + StreamThread.this.getName(), assignment); } log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", - StreamThread.this.getName(), assignment); + StreamThread.this.getName(), assignment); setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED); lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned // suspend active tasks - suspendTasksAndState(true); + suspendTasksAndState(); } catch (Throwable t) { rebalanceException = t; throw t; @@ -385,7 +386,7 @@ public Map tasks() { private void shutdown() { log.info("{} Shutting down", logPrefix); - shutdownTasksAndState(false); + shutdownTasksAndState(); // close all embedded clients try { @@ -407,7 +408,9 @@ private void shutdown() { // TODO remove this // hotfix to improve ZK behavior als long as KAFKA-4060 is not fixed (c.f. KAFKA-4369) // when removing this, make StreamPartitionAssignor#internalTopicManager "private" again - partitionAssignor.internalTopicManager.zkClient.close(); + if (partitionAssignor != null && partitionAssignor.internalTopicManager != null) { + partitionAssignor.internalTopicManager.zkClient.close(); + } // remove all tasks removeStreamTasks(); @@ -417,120 +420,122 @@ private void shutdown() { setState(State.NOT_RUNNING); } - private void unAssignChangeLogPartitions(final boolean rethrowExceptions) { + private RuntimeException unAssignChangeLogPartitions() { try { // un-assign the change log partitions restoreConsumer.assign(Collections.emptyList()); - } catch (Exception e) { + } catch (RuntimeException e) { log.error("{} Failed to un-assign change log partitions: ", logPrefix, e); - if (rethrowExceptions) { - throw e; - } + return e; } + return null; } - private void shutdownTasksAndState(final boolean rethrowExceptions) { + @SuppressWarnings("ThrowableNotThrown") + private void shutdownTasksAndState() { log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); - // only commit under clean exit - if (cleanRun) { - // Commit first as there may be cached records that have not been flushed yet. - commitOffsets(rethrowExceptions); - } + final AtomicReference firstException = new AtomicReference<>(null); // Close all processors in topology order - closeAllTasks(); + firstException.compareAndSet(null, closeAllTasks()); // flush state - flushAllState(rethrowExceptions); - // flush out any extra data sent during close - producer.flush(); - // Close all task state managers - closeAllStateManagers(rethrowExceptions); + firstException.compareAndSet(null, flushAllState()); + // Close all task state managers. Don't need to set exception as all + // state would have been flushed above + closeAllStateManagers(firstException.get() == null); + // only commit under clean exit + if (cleanRun && firstException.get() == null) { + firstException.set(commitOffsets()); + } // remove the changelog partitions from restore consumer - unAssignChangeLogPartitions(rethrowExceptions); + unAssignChangeLogPartitions(); } /** * Similar to shutdownTasksAndState, however does not close the task managers, * in the hope that soon the tasks will be assigned again - * @param rethrowExceptions */ - private void suspendTasksAndState(final boolean rethrowExceptions) { + private void suspendTasksAndState() { log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); - - // Commit first as there may be cached records that have not been flushed yet. - commitOffsets(rethrowExceptions); + final AtomicReference firstException = new AtomicReference<>(null); // Close all topology nodes - closeAllTasksTopologies(); + firstException.compareAndSet(null, closeAllTasksTopologies()); // flush state - flushAllState(rethrowExceptions); - // flush out any extra data sent during close - producer.flush(); + firstException.compareAndSet(null, flushAllState()); + // only commit after all state has been flushed and there hasn't been an exception + if (firstException.get() == null) { + firstException.set(commitOffsets()); + } // remove the changelog partitions from restore consumer - unAssignChangeLogPartitions(rethrowExceptions); + firstException.compareAndSet(null, unAssignChangeLogPartitions()); updateSuspendedTasks(); + if (firstException.get() != null) { + throw new StreamsException(logPrefix + " failed to suspend stream tasks", firstException.get()); + } } interface AbstractTaskAction { void apply(final AbstractTask task); } - private void performOnAllTasks(final AbstractTaskAction action, - final String exceptionMessage, - final boolean throwExceptions) { + private RuntimeException performOnAllTasks(final AbstractTaskAction action, + final String exceptionMessage) { + RuntimeException firstException = null; final List allTasks = new ArrayList(activeTasks.values()); allTasks.addAll(standbyTasks.values()); for (final AbstractTask task : allTasks) { try { action.apply(task); - } catch (KafkaException e) { - log.error("{} Failed while executing {} {} duet to {}: ", + } catch (RuntimeException t) { + log.error("{} Failed while executing {} {} due to {}: ", StreamThread.this.logPrefix, task.getClass().getSimpleName(), task.id(), exceptionMessage, - e); - if (throwExceptions) { - throw e; + t); + if (firstException == null) { + firstException = t; } } } + return firstException; } - private void closeAllStateManagers(final boolean throwExceptions) { - performOnAllTasks(new AbstractTaskAction() { + private Throwable closeAllStateManagers(final boolean writeCheckpoint) { + return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id()); - task.closeStateManager(); + task.closeStateManager(writeCheckpoint); } - }, "close state manager", throwExceptions); + }, "close state manager"); } - private void commitOffsets(final boolean throwExceptions) { + private RuntimeException commitOffsets() { // Exceptions should not prevent this call from going through all shutdown steps - performOnAllTasks(new AbstractTaskAction() { + return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id()); task.commitOffsets(); } - }, "commit consumer offsets", throwExceptions); + }, "commit consumer offsets"); } - private void flushAllState(final boolean throwExceptions) { - performOnAllTasks(new AbstractTaskAction() { + private RuntimeException flushAllState() { + return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id()); task.flushState(); } - }, "flush state", throwExceptions); + }, "flush state"); } /** @@ -788,9 +793,9 @@ protected StreamTask createStreamTask(TaskId id, Collection part sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(id.topicGroupId); - - return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors, stateDirectory, cache); + final ProcessorTopology topology = builder.build(id.topicGroupId); + final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString()); + return new StreamTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors, stateDirectory, cache, recordCollector); } private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set partitions) { @@ -824,7 +829,7 @@ private void closeNonAssignedSuspendedTasks() { log.debug("{} closing suspended non-assigned task", logPrefix); try { task.close(); - task.closeStateManager(); + task.closeStateManager(true); } catch (Exception e) { log.error("{} Failed to remove suspended task {}", logPrefix, next.getKey(), e); } finally { @@ -845,7 +850,7 @@ private void closeNonAssignedSuspendedStandbyTasks() { final StandbyTask task = suspendedTask.getValue(); try { task.close(); - task.closeStateManager(); + task.closeStateManager(true); } catch (Exception e) { log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e); } finally { @@ -997,26 +1002,26 @@ private void removeStandbyTasks() { standbyRecords.clear(); } - private void closeAllTasks() { - performOnAllTasks(new AbstractTaskAction() { + private RuntimeException closeAllTasks() { + return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id()); task.close(); sensors.taskDestructionSensor.record(); } - }, "close", false); + }, "close"); } - private void closeAllTasksTopologies() { - performOnAllTasks(new AbstractTaskAction() { + private RuntimeException closeAllTasksTopologies() { + return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id()); task.closeTopology(); sensors.taskDestructionSensor.record(); } - }, "close", false); + }, "close"); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index fe20225fcce82..aa358ab3322e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -102,7 +102,9 @@ public static List> readKeyValues(final String topic, fina consumedValues.add(new KeyValue<>(record.key(), record.value())); } } + consumer.close(); + return consumedValues; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 9198fa973dd63..7023712871b1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -474,4 +474,28 @@ public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throw assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } + @Test + public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { + final TaskId taskId = new TaskId(0, 1); + final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); + // write an empty checkpoint file + final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); + oldCheckpoint.write(Collections.emptyMap()); + + final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + + restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( + new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + + + final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); + final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + stateMgr.close(null); + assertFalse(checkpointFile.exists()); + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index b28b8d2f1c9d6..10a86fe6db8f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -202,7 +202,7 @@ public void testUpdate() throws Exception { assertEquals(Collections.emptyList(), store1.keys); assertEquals(Utils.mkList(1, 2, 3), store2.keys); - task.closeStateManager(); + task.closeStateManager(true); File taskDir = stateDirectory.directoryForTask(taskId); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); @@ -295,7 +295,7 @@ public void testUpdateKTable() throws Exception { remaining = task.update(ktable, remaining); assertNull(remaining); - task.closeStateManager(); + task.closeStateManager(true); File taskDir = stateDirectory.directoryForTask(taskId); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 15f93d9cff77b..3d50007f46e2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Test; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -88,6 +90,7 @@ public class StreamTaskTest { Collections.emptyMap()); private File baseDir; private StateDirectory stateDirectory; + private RecordCollectorImpl recordCollector; private StreamsConfig createConfig(final File baseDir) throws Exception { return new StreamsConfig(new Properties() { @@ -127,7 +130,8 @@ public void cleanup() { @Test public void testProcessOrder() throws Exception { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null); + recordCollector = new RecordCollectorImpl(producer, "taskId"); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -174,7 +178,7 @@ public void testProcessOrder() throws Exception { @Test public void testPauseResume() throws Exception { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null); + StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -233,7 +237,7 @@ public void testPauseResume() throws Exception { @Test public void testMaybePunctuate() throws Exception { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -312,7 +316,7 @@ public void process(final Object key, final Object value) { Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap()); - final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0)); + final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector); final int offset = 20; streamTask.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -360,7 +364,7 @@ public void punctuate(final long timestamp) { Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap()); - final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0)); + final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector); try { streamTask.punctuate(punctuator, 1); @@ -372,6 +376,26 @@ public void punctuate(final long timestamp) { } + @Test + public void shouldFlushRecordCollectorOnFlushState() throws Exception { + final ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap()); + final AtomicBoolean flushed = new AtomicBoolean(false); + final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { + @Override + public void flush() { + flushed.set(true); + } + }; + final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "appId", partitions, topology, consumer, restoreStateConsumer, createConfig(baseDir), null, stateDirectory, new ThreadCache(0), recordCollector); + streamTask.flushState(); + assertTrue(flushed.get()); + } + private Iterable> records(ConsumerRecord... recs) { return Arrays.asList(recs); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e8c10e4e95b06..442fc3a7c9548 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -27,16 +27,19 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; import org.junit.Assert; @@ -64,6 +67,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class StreamThreadTest { @@ -145,7 +149,17 @@ public TestStreamTask(TaskId id, Consumer restoreConsumer, StreamsConfig config, StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null, stateDirectory, null); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, new StreamsMetrics() { + @Override + public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) { + return null; + } + + @Override + public void recordLatency(final Sensor sensor, final long startNs, final long endNs) { + + } + }, stateDirectory, null, new RecordCollectorImpl(producer, id.toString())); } @Override @@ -172,8 +186,8 @@ public void close() { } @Override - void closeStateManager() { - super.closeStateManager(); + void closeStateManager(boolean writeCheckpoint) { + super.closeStateManager(writeCheckpoint); this.closedStateManager = true; } } @@ -831,6 +845,206 @@ Map> activeTasks() { assertThat(createdTasks.get(task00Partitions).id(), is(taskId)); } + @Test + public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + builder.stream("t1").groupByKey(); + final StreamsConfig config = new StreamsConfig(configProps()); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.producer, + clientSupplier.restoreConsumer, + config, + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) { + @Override + public void close() { + throw new RuntimeException("KABOOM!"); + } + }; + final StreamsConfig config1 = new StreamsConfig(configProps()); + + final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { + return testStreamTask; + } + }; + + + final Map> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + + + thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + + thread.start(); + thread.close(); + thread.join(); + assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed); + + + } + + @Test + public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false); + builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore)); + final StreamsConfig config = new StreamsConfig(configProps()); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.producer, + clientSupplier.restoreConsumer, + config, + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) { + @Override + public void flushState() { + throw new RuntimeException("KABOOM!"); + } + }; + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { + return testStreamTask; + } + }; + + + final Map> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + + + thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + // store should have been opened + assertTrue(stateStore.isOpen()); + + thread.start(); + thread.close(); + thread.join(); + assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed); + // store should be closed even if we had an exception + assertFalse(stateStore.isOpen()); + } + + @Test + public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringCloseTopologyWhenSuspendingState() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + builder.stream("t1").groupByKey(); + final StreamsConfig config = new StreamsConfig(configProps()); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.producer, + clientSupplier.restoreConsumer, + config, + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) { + @Override + public void closeTopology() { + throw new RuntimeException("KABOOM!"); + } + }; + final StreamsConfig config1 = new StreamsConfig(configProps()); + + final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { + return testStreamTask; + } + }; + + + final Map> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + + + thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + try { + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + fail("should have thrown exception"); + } catch (Exception e) { + // expected + } + assertFalse(testStreamTask.committed); + } + + @Test + public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + builder.stream("t1").groupByKey(); + final StreamsConfig config = new StreamsConfig(configProps()); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.producer, + clientSupplier.restoreConsumer, + config, + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) { + @Override + public void flushState() { + throw new RuntimeException("KABOOM!"); + } + }; + final StreamsConfig config1 = new StreamsConfig(configProps()); + + final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { + return testStreamTask; + } + }; + + + final Map> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + + + thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + try { + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + fail("should have thrown exception"); + } catch (Exception e) { + // expected + } + assertFalse(testStreamTask.committed); + + } + + private void initPartitionGrouper(StreamsConfig config, StreamThread thread) { StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 9758d89940b78..fe53ec008a63a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -183,11 +184,10 @@ private StreamTask createStreamsTask(final String applicationId, final ProcessorTopology topology, final TaskId taskId) { return new StreamTask(taskId, applicationId, Collections - .singletonList(new TopicPartition("topic", taskId.partition)), topology, + .singletonList(new TopicPartition("topic", taskId.partition)), topology, clientSupplier.consumer, - clientSupplier.producer, clientSupplier.restoreConsumer, - streamsConfig, new TheStreamMetrics(), stateDirectory, null) { + streamsConfig, new TheStreamMetrics(), stateDirectory, null, new NoOpRecordCollector()) { @Override protected void initializeOffsetLimits() { @@ -237,4 +237,5 @@ public void recordLatency(final Sensor sensor, final long startNs, final long en } } + } diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java index 353262302cbed..d1fe213965d37 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java @@ -29,9 +29,10 @@ import java.util.Map; public class MockStateStoreSupplier implements StateStoreSupplier { - private final String name; - private final boolean persistent; - private final boolean loggingEnabled; + private String name; + private boolean persistent; + private boolean loggingEnabled; + private MockStateStore stateStore; public MockStateStoreSupplier(String name, boolean persistent) { this(name, persistent, true); @@ -43,6 +44,10 @@ public MockStateStoreSupplier(String name, boolean persistent, boolean loggingEn this.loggingEnabled = loggingEnabled; } + public MockStateStoreSupplier(final MockStateStore stateStore) { + this.stateStore = stateStore; + } + @Override public String name() { return name; @@ -50,6 +55,9 @@ public String name() { @Override public StateStore get() { + if (stateStore != null) { + return stateStore; + } if (loggingEnabled) { return new MockStateStore(name, persistent).enableLogging(); } else { @@ -74,7 +82,7 @@ public static class MockStateStore implements StateStore { public boolean loggingEnabled = false; public boolean initialized = false; public boolean flushed = false; - public boolean closed = false; + public boolean closed = true; public final ArrayList keys = new ArrayList<>(); public MockStateStore(String name, boolean persistent) { @@ -96,6 +104,7 @@ public String name() { public void init(ProcessorContext context, StateStore root) { context.register(root, loggingEnabled, stateRestoreCallback); initialized = true; + closed = false; } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java index d4368d3a36c22..c32ed080003c8 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java @@ -17,14 +17,15 @@ package org.apache.kafka.test; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; +import org.apache.kafka.streams.processor.internals.RecordCollector; -public class NoOpRecordCollector extends RecordCollectorImpl { - public NoOpRecordCollector() { - super(null, "NoOpRecordCollector"); - } +import java.util.Collections; +import java.util.Map; + +public class NoOpRecordCollector implements RecordCollector { @Override public void send(final ProducerRecord record, final Serializer keySerializer, final Serializer valueSerializer) { @@ -45,4 +46,9 @@ public void flush() { public void close() { //no-op } + + @Override + public Map offsets() { + return Collections.emptyMap(); + } } diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 83a9092fa2011..7dad4082c4628 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.state.KeyValueStore; @@ -172,14 +173,13 @@ public List partitionsFor(String topic) { consumer.assign(offsetsByTopicPartition.keySet()); task = new StreamTask(id, - applicationId, - partitionsByTopic.values(), - topology, - consumer, - producer, - restoreStateConsumer, - config, - new StreamsMetrics() { + applicationId, + partitionsByTopic.values(), + topology, + consumer, + restoreStateConsumer, + config, + new StreamsMetrics() { @Override public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { return null; @@ -189,7 +189,7 @@ public Sensor addLatencySensor(String scopeName, String entityName, String opera public void recordLatency(Sensor sensor, long startNs, long endNs) { // do nothing } - }, new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()), new ThreadCache(1024 * 1024)); + }, new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()), new ThreadCache(1024 * 1024), new RecordCollectorImpl(producer, "id")); } /** From 2c221afc900baca3a8fbc48fff77b841d91b4858 Mon Sep 17 00:00:00 2001 From: Andrew Olson Date: Tue, 3 Jan 2017 08:57:30 -0800 Subject: [PATCH 13/24] KAFKA-2434: Remove identical topic subscription constraint for roundrobin strategy in old consumer API Author: Andrew Olson Reviewers: Onur Karaman Closes #145 from noslowerdna/KAFKA-2434 --- .../kafka/consumer/PartitionAssignor.scala | 31 ++++------- .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../consumer/PartitionAssignorTest.scala | 55 +++++++++++++++++++ 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 900a4b63e1988..f02df352acf8d 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -47,12 +47,13 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo myTopicCount.getConsumerThreadIdsPerTopic } - val partitionsForTopic: collection.Map[String, Seq[Int]] = - zkUtils.getPartitionsForTopics(myTopicThreadIds.keySet.toSeq) - val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] = zkUtils.getConsumersPerTopic(group, excludeInternalTopics) + // Some assignment strategies require knowledge of all topics consumed by any member of the group + val partitionsForTopic: collection.Map[String, Seq[Int]] = + zkUtils.getPartitionsForTopics(consumersForTopic.keySet.toSeq) + val consumers: Seq[String] = zkUtils.getConsumersInGroup(group).sorted } @@ -61,13 +62,7 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo * then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * will be within a delta of exactly one across all consumer threads.) - * - * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance - * and thread-id within that instance. Therefore, round-robin assignment is allowed only if: - * a) Every topic has the same number of streams within a consumer instance - * b) The set of subscribed topics is identical for every consumer instance within the group. */ - class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { @@ -77,18 +72,12 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) if (ctx.consumersForTopic.nonEmpty) { - // check conditions (a) and (b) - val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) - ctx.consumersForTopic.foreach { case (topic, threadIds) => - val threadIdSet = threadIds.toSet - require(threadIdSet == headThreadIdSet, - "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + - "AND if the stream counts across topics are identical for a given consumer instance.\n" + - "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + - "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) - } + // Collect consumer thread ids across all topics, remove duplicates, and sort to ensure determinism + val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) => + threadIds + }.toSet.toSeq.sorted - val threadAssignor = CoreUtils.circularIterator(headThreadIdSet.toSeq.sorted) + val threadAssignor = CoreUtils.circularIterator(allThreadIds) info("Starting round-robin assignment with consumers " + ctx.consumers) val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions) => @@ -106,7 +95,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { }) allTopicPartitions.foreach(topicPartition => { - val threadId = threadAssignor.next() + val threadId = threadAssignor.dropWhile(threadId => !ctx.consumersForTopic(topicPartition.topic).contains(threadId)).next // record the partition ownership decision val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer) assignmentForConsumer += (topicPartition -> threadId) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 4ef32e9c2057b..9bf0d200f93fc 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -853,7 +853,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } catch { case _: ZkNodeExistsException => // The node hasn't been deleted by the original owner. So wait a bit and retry. - info("waiting for the partition ownership to be deleted: " + partition) + info("waiting for the partition ownership to be deleted: " + partition + " for topic " + topic) false } } diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 9e568f843bd67..1e45bfbf6e998 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -54,6 +54,61 @@ class PartitionAssignorTest extends Logging { } } + @Test + def testRoundRobinPartitionAssignorStaticSubscriptions() { + val assignor = new RoundRobinAssignor + + /** test static subscription scenarios */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + val streamCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, 1) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkUtils.zkClient) + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true) + }) + } + + @Test + def testRoundRobinPartitionAssignorUnbalancedStaticSubscriptions() { + val assignor = new RoundRobinAssignor + val minConsumerCount = 5 + + /** test unbalanced static subscription scenarios */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = minConsumerCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = 10 + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, 10) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + // Exclude some topics from some consumers + val topicRange = (1 to topicCount - consumer % minConsumerCount) + val streamCounts = Map(topicRange.map(topic => { + ("topic-" + topic, 3) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkUtils.zkClient) + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils) + }) + } + @Test def testRangePartitionAssignor() { val assignor = new RangeAssignor From f38d1893984a0e2da7754a143bcfe25112b45871 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 3 Jan 2017 09:11:07 -0800 Subject: [PATCH 14/24] KAFKA-4528: Fix failure in ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout I was able to reproduce the failure in less than 10 runs before the change. With the change, the test passed 70 times consecutively. Author: Ismael Juma Reviewers: Guozhang Wang Closes #2298 from ijuma/kafka-4528-fix-test-async-send-timeout --- .../unit/kafka/producer/ProducerTest.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index ec51e207cf25d..769ea33d2d615 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -25,7 +25,7 @@ import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder import kafka.common.FailedToSendMessageException import kafka.consumer.SimpleConsumer -import kafka.message.Message +import kafka.message.{Message, MessageAndOffset} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} import kafka.utils._ @@ -77,10 +77,6 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ server2 = TestUtils.createServer(config2) servers = List(server1,server2) - val props = new Properties() - props.put("host", "localhost") - props.put("port", server1.boundPort().toString) - // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) } @@ -288,11 +284,11 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ def testAsyncSendCanCorrectlyFailWithTimeout() { val topic = "new-topic" // create topics in ZK - TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0, 1)), servers = servers) val timeoutMs = 500 val props = new Properties() - props.put("request.timeout.ms", String.valueOf(timeoutMs)) + props.put("request.timeout.ms", timeoutMs.toString) props.put("request.required.acks", "1") props.put("message.send.max.retries", "0") props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") @@ -306,11 +302,16 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ // do a simple test to make sure plumbing is okay try { // this message should be assigned to partition 0 whose leader is on broker 0 - producer.send(new KeyedMessage[String, String](topic, "test", "test")) - // cross check if brokers got the messages - val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) - val messageSet1 = response1.messageSet("new-topic", 0).iterator - assertTrue("Message set should have 1 message", messageSet1.hasNext) + producer.send(new KeyedMessage(topic, "test", "test")) + // cross check if the broker received the messages + // we need the loop because the broker won't return the message until it has been replicated and the producer is + // using acks=1 + var messageSet1: Iterator[MessageAndOffset] = null + TestUtils.waitUntilTrue(() => { + val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + messageSet1 = response1.messageSet(topic, 0).iterator + messageSet1.hasNext + }, "Message set should have 1 message") assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload) // stop IO threads and request handling, but leave networking operational @@ -320,8 +321,9 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ val t1 = Time.SYSTEM.milliseconds try { // this message should be assigned to partition 0 whose leader is on broker 0, but - // broker 0 will not response within timeoutMs millis. - producer.send(new KeyedMessage[String, String](topic, "test", "test")) + // broker 0 will not respond within timeoutMs millis. + producer.send(new KeyedMessage(topic, "test", "test")) + fail("Exception should have been thrown") } catch { case _: FailedToSendMessageException => /* success */ } From 657fb49d8eace91ddb1f81697ba01354c0dc8ee3 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 3 Jan 2017 11:02:20 -0800 Subject: [PATCH 15/24] KAFKA-4404; Add javadocs to document core Connect types, especially that integer types are signed Author: Ewen Cheslack-Postava Reviewers: Konstantine Karantasis , Jason Gustafson Closes #2296 from ewencp/kafka-4404-document-connect-signed-integer-types --- .../org/apache/kafka/connect/data/Schema.java | 61 ++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java index ae2eeb5447d19..3313182762150 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java @@ -44,7 +44,66 @@ public interface Schema { * The type of a schema. These only include the core types; logical types must be determined by checking the schema name. */ enum Type { - INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT; + /** + * 8-bit signed integer + * + * Note that if you have an unsigned 8-bit data source, {@link Type#INT16} will be required to safely capture all valid values + */ + INT8, + /** + * 16-bit signed integer + * + * Note that if you have an unsigned 16-bit data source, {@link Type#INT32} will be required to safely capture all valid values + */ + INT16, + /** + * 32-bit signed integer + * + * Note that if you have an unsigned 32-bit data source, {@link Type#INT64} will be required to safely capture all valid values + */ + INT32, + /** + * 64-bit signed integer + * + * Note that if you have an unsigned 64-bit data source, the {@link Decimal} logical type (encoded as {@link Type#BYTES}) + * will be required to safely capture all valid values + */ + INT64, + /** + * 32-bit IEEE 754 floating point number + */ + FLOAT32, + /** + * 64-bit IEEE 754 floating point number + */ + FLOAT64, + /** + * Boolean value (true or false) + */ + BOOLEAN, + /** + * Character string that supports all Unicode characters. + * + * Note that this does not imply any specific encoding (e.g. UTF-8) as this is an in-memory representation. + */ + STRING, + /** + * Sequence of unsigned 8-bit bytes + */ + BYTES, + /** + * An ordered sequence of elements, each of which shares the same type. + */ + ARRAY, + /** + * A mapping from keys to values. Both keys and values can be arbitrarily complex types, including complex types + * such as {@link Struct}. + */ + MAP, + /** + * A structured record containing a set of named fields, each field using a fixed, independent {@link Schema}. + */ + STRUCT; private String name; From 410c3edeb5a4c3c02ac64ef6497135686409ab72 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 3 Jan 2017 13:54:40 -0800 Subject: [PATCH 16/24] KAFKA-4429; Consumer lag metric should be zero if FetchResponse is empty Author: Dong Lin Reviewers: Ewen Cheslack-Postava , Jason Gustafson Closes #2155 from lindong28/KAFKA-4429 --- .../clients/consumer/internals/Fetcher.java | 3 ++ .../consumer/internals/FetcherTest.java | 49 ++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 526b0a925af2b..22588a875af09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -701,6 +701,9 @@ private PartitionRecords parseFetchedData(CompletedFetch completedFetch) { parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); ConsumerRecord record = parsed.get(parsed.size() - 1); this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); + } else if (partition.highWatermark >= 0) { + log.trace("Received empty fetch response for partition {} with offset {}", tp, position); + this.sensors.recordsFetchLag.record(partition.highWatermark - fetchOffset); } } else if (error == Errors.NOT_LEADER_FOR_PARTITION) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 272a5ee406bd2..0095697f0d643 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -607,7 +607,6 @@ public void testGetTopicMetadataLeaderNotAvailable() { */ @Test public void testQuotaMetrics() throws Exception { - List> records; subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); @@ -615,17 +614,10 @@ public void testQuotaMetrics() throws Exception { for (int i = 1; i < 4; i++) { // We need to make sure the message offset grows. Otherwise they will be considered as already consumed // and filtered out by consumer. - if (i > 1) { - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - for (int v = 0; v < 3; v++) { - builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); - } - this.records = builder.build(); - } - assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 100 * i)); - consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + for (int v = 0; v < 3; v++) + builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + List> records = fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(tp); assertEquals(3, records.size()); } @@ -636,6 +628,39 @@ public void testQuotaMetrics() throws Exception { assertEquals(300, maxMetric.value(), EPSILON); } + /* + * Send multiple requests. Verify that the client side quota metrics have the right values + */ + @Test + public void testFetcherMetrics() { + subscriptions.assignFromUser(singleton(tp)); + subscriptions.seek(tp, 0); + + Map allMetrics = metrics.metrics(); + KafkaMetric recordsFetchLagMax = allMetrics.get(metrics.metricName("records-lag-max", metricGroup, "")); + + // recordsFetchLagMax should be initialized to negative infinity + assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); + + // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse + fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0); + assertEquals(100, recordsFetchLagMax.value(), EPSILON); + + // recordsFetchLagMax should be hw - offset of the last message after receiving a non-empty FetchResponse + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + for (int v = 0; v < 3; v++) + builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0); + assertEquals(198, recordsFetchLagMax.value(), EPSILON); + } + + private Map>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) { + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(records, error, hw, throttleTime)); + consumerClient.poll(0); + return fetcher.fetchedRecords(); + } + @Test public void testGetOffsetsForTimesTimeout() { try { From 4d6af9bae10efbc498e9609b5950de7ba34df1ce Mon Sep 17 00:00:00 2001 From: fpj Date: Tue, 3 Jan 2017 14:49:33 -0800 Subject: [PATCH 17/24] KAFKA-4261: Provide debug option in vagrant-up.sh Author: fpj Reviewers: Ewen Cheslack-Postava Closes #1981 from fpj/vagrant-debug-option --- vagrant/vagrant-up.sh | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh index 1d8a648ec076a..b01c10db006c3 100755 --- a/vagrant/vagrant-up.sh +++ b/vagrant/vagrant-up.sh @@ -26,6 +26,7 @@ readonly ARGS="$@" AWS=false PARALLEL=true MAX_PARALLEL=5 +DEBUG=false readonly USAGE="Usage: $PROG_NAME [-h | --help] [--aws [--no-parallel] [--max-parallel MAX]]" readonly HELP="$(cat < 0 ]]; do MAX_PARALLEL="$2" shift ;; + --debug) + DEBUG=true + ;; *) # unknown option echo "Unknown option $1" @@ -200,7 +204,14 @@ function bring_up_aws { local parallel="$1" local max_parallel="$2" local machines="$(read_vagrant_machines)" - + case "$3" in + true) + local debug="--debug" + ;; + false) + local debug="" + ;; + esac zk_broker_machines=$(zk_broker "$machines") worker_machines=$(worker "$machines") @@ -208,18 +219,18 @@ function bring_up_aws { if [[ ! -z "$zk_broker_machines" ]]; then # We still have to bring up zookeeper/broker nodes serially echo "Bringing up zookeeper/broker machines serially" - vagrant up --provider=aws --no-parallel --no-provision $zk_broker_machines + vagrant up --provider=aws --no-parallel --no-provision $zk_broker_machines $debug vagrant hostmanager vagrant provision fi if [[ ! -z "$worker_machines" ]]; then echo "Bringing up test worker machines in parallel" - vagrant_batch_command "vagrant up --provider=aws" "$worker_machines" "$max_parallel" + vagrant_batch_command "vagrant up $debug --provider=aws" "$worker_machines" "$max_parallel" vagrant hostmanager fi else - vagrant up --provider=aws --no-parallel --no-provision + vagrant up --provider=aws --no-parallel --no-provision $debug vagrant hostmanager vagrant provision fi @@ -227,7 +238,7 @@ function bring_up_aws { function main { if [[ "$AWS" == "true" ]]; then - bring_up_aws "$PARALLEL" "$MAX_PARALLEL" + bring_up_aws "$PARALLEL" "$MAX_PARALLEL" "$DEBUG" else bring_up_local fi From 194a2f2fa88d20672fee1c377e90f0ac50ee4fd7 Mon Sep 17 00:00:00 2001 From: amethystic Date: Wed, 4 Jan 2017 11:54:29 +0800 Subject: [PATCH 18/24] kafka-4295: ConsoleConsumer does not delete the temporary group in zookeeper added some comments to explain why we capture ZkException when invoking deleteRecursive(dir) in maybeDeletePath --- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 7d0909797c034..424b27ca8ba97 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -93,7 +93,7 @@ object ZkUtils { try { zk.deleteRecursive(dir) } catch { - case _: ZkException => zk.deleteRecursive(dir) + case _: ZkException => zk.deleteRecursive(dir) // Occasionally, child node list is not empty before deleting parent ZkNode, give a second chance to try deleting. case _: Throwable => // swallow } finally { zk.close() From 57eab4b76696f4f3c06cab77fee22c4656d734e2 Mon Sep 17 00:00:00 2001 From: amethystic Date: Thu, 5 Jan 2017 15:50:48 +0800 Subject: [PATCH 19/24] kafka-4295: ConsoleConsumer does not delete the temporary group in zookeeper added a unit test case to ensure no zknode left after consoleconsumer got exited. --- .../kafka/tools/ConsoleConsumerTest.scala | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 013ed3e424508..e753467f2bf0e 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -17,17 +17,41 @@ package kafka.tools -import java.io.{PrintStream, FileOutputStream} +import java.io.{FileOutputStream, PrintStream} +import java.util.Properties import kafka.common.MessageFormatter -import kafka.consumer.{BaseConsumer, BaseConsumerRecord} -import kafka.utils.TestUtils +import kafka.consumer.{BaseConsumer, BaseConsumerRecord, OldConsumer, Whitelist} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, ZkUtils} import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test -import org.scalatest.junit.JUnitSuite -class ConsoleConsumerTest extends JUnitSuite { +class ConsoleConsumerTest extends KafkaServerTestHarness { + + override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(KafkaConfig.fromProps(_, new Properties())) + + @Test + def testZkNodeDeleteForOldConsumerWithUnspecifiedGroupID() { + val topic = "test-topic" + val consumerArgs: Array[String] = Array( + "--zookeeper", this.zkConnect, + "--topic", topic, + "--from-beginning") + val conf = new ConsoleConsumer.ConsumerConfig(consumerArgs) + val consumer = new OldConsumer(Whitelist(topic), ConsoleConsumer.getOldConsumerProps(conf)) + val groupID = conf.consumerProps.get("group.id") + try { + assertTrue(s"Consumer group should be created.", zkUtils.getChildren(ZkUtils.ConsumersPath).head == groupID) + } finally { + consumer.stop() + ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), ZkUtils.ConsumersPath + "/" + groupID) + } + assertTrue("The zk node for this group should be deleted.", zkUtils.getChildren(ZkUtils.ConsumersPath).isEmpty) + } @Test def shouldLimitReadsToMaxMessageLimit() { @@ -190,4 +214,5 @@ class ConsoleConsumerTest extends JUnitSuite { assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) } + } From d03325f89b3323048e08c4eccc21c1b98ba40dc4 Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 7 Jan 2017 12:10:17 +0800 Subject: [PATCH 20/24] kafka-4295: ConsoleConsumer does not delete the temporary group in zookeeper Addressed Ijuma's comments 1. Restored ZkUtils to trunk code 2. Restored ConsoleConsumerTest to trunk code 3. Restored ZkUtils.maybeDeletePath to trunk code 4. Replaced ZkUtils.maybeDeletePath with AdminUtils.deleteConsumerGroupInZK --- .../scala/kafka/tools/ConsoleConsumer.scala | 25 +++++++++---- core/src/main/scala/kafka/utils/ZkUtils.scala | 6 ++-- .../ConsoleConsumerIntegrationTest.scala | 36 +++++++++++++++++++ .../kafka/tools/ConsoleConsumerTest.scala | 35 +++--------------- 4 files changed, 62 insertions(+), 40 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 5a0955884e6ce..48abaf3042821 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -20,19 +20,23 @@ package kafka.tools import java.io.PrintStream import java.util.concurrent.CountDownLatch import java.util.{Locale, Properties, Random} + import joptsimple._ +import kafka.admin.AdminUtils import kafka.api.OffsetRequest import kafka.common.{MessageFormatter, StreamEndException} import kafka.consumer._ import kafka.message._ import kafka.metrics.KafkaMetricsReporter -import kafka.utils._ +import kafka.utils.{ZkUtils, _} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.utils.Utils import org.apache.log4j.Logger + import scala.collection.JavaConverters._ /** @@ -79,10 +83,9 @@ object ConsoleConsumer extends Logging { reportRecordCount() // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack - if (!conf.groupIdPassed && conf.options.has(conf.zkConnectOpt)) { - ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) + if (!conf.groupIdPassed && conf.useOldConsumer) { + deleteZkPathForConsumerGroup(conf.options.valueOf(conf.zkConnectOpt), conf.consumerProps.getProperty("group.id")) } - shutdownLatch.countDown() } } @@ -181,8 +184,9 @@ object ConsoleConsumer extends Logging { System.exit(1) } - if (config.options.has(config.deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id")) + if (config.options.has(config.deleteConsumerOffsetsOpt)) { + deleteZkPathForConsumerGroup(config.options.valueOf(config.zkConnectOpt), config.consumerProps.getProperty("group.id")) + } if (config.timeoutMs >= 0) props.put("consumer.timeout.ms", config.timeoutMs.toString) @@ -409,6 +413,15 @@ object ConsoleConsumer extends Logging { case _: Throwable => false } } + + def deleteZkPathForConsumerGroup(zkUrl: String, path: String) { + val zkUtils = ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled()) + try { + AdminUtils.deleteConsumerGroupInZK(zkUtils, path) + } finally { + zkUtils.close() + } + } } class DefaultMessageFormatter extends MessageFormatter { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 424b27ca8ba97..fcb564867e0af 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -89,14 +89,12 @@ object ZkUtils { } def maybeDeletePath(zkUrl: String, dir: String) { - val zk = createZkClient(zkUrl, 30*1000, 30*1000) try { + val zk = createZkClient(zkUrl, 30*1000, 30*1000) zk.deleteRecursive(dir) + zk.close() } catch { - case _: ZkException => zk.deleteRecursive(dir) // Occasionally, child node list is not empty before deleting parent ZkNode, give a second chance to try deleting. case _: Throwable => // swallow - } finally { - zk.close() } } diff --git a/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala new file mode 100644 index 0000000000000..f7e8031443278 --- /dev/null +++ b/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala @@ -0,0 +1,36 @@ +package integration.kafka.tools + +import java.util.Properties + +import kafka.consumer.{OldConsumer, Whitelist} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.tools.ConsoleConsumer +import kafka.utils.{TestUtils, ZkUtils} +import org.junit.Assert.assertTrue +import org.junit.Test + + +class ConsoleConsumerIntegrationTest extends KafkaServerTestHarness { + override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(KafkaConfig.fromProps(_, new Properties())) + + @Test + def testZkNodeDeleteForOldConsumerWithUnspecifiedGroupID() { + val topic = "test-topic" + val consumerArgs: Array[String] = Array( + "--zookeeper", this.zkConnect, + "--topic", topic, + "--from-beginning") + val conf = new ConsoleConsumer.ConsumerConfig(consumerArgs) + val consumer = new OldConsumer(Whitelist(topic), ConsoleConsumer.getOldConsumerProps(conf)) + val groupID = conf.consumerProps.get("group.id") + try { + assertTrue("Consumer group should be created.", zkUtils.getChildren(ZkUtils.ConsumersPath).head == groupID) + } finally { + consumer.stop() + ConsoleConsumer.deleteZkPathForConsumerGroup(conf.options.valueOf(conf.zkConnectOpt), conf.consumerProps.getProperty("group.id")) + } + assertTrue("The zk node for this group should be deleted.", zkUtils.getChildren(ZkUtils.ConsumersPath).isEmpty) + } +} diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index e753467f2bf0e..013ed3e424508 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -17,41 +17,17 @@ package kafka.tools -import java.io.{FileOutputStream, PrintStream} -import java.util.Properties +import java.io.{PrintStream, FileOutputStream} import kafka.common.MessageFormatter -import kafka.consumer.{BaseConsumer, BaseConsumerRecord, OldConsumer, Whitelist} -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, ZkUtils} +import kafka.consumer.{BaseConsumer, BaseConsumerRecord} +import kafka.utils.TestUtils import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test +import org.scalatest.junit.JUnitSuite -class ConsoleConsumerTest extends KafkaServerTestHarness { - - override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) - .map(KafkaConfig.fromProps(_, new Properties())) - - @Test - def testZkNodeDeleteForOldConsumerWithUnspecifiedGroupID() { - val topic = "test-topic" - val consumerArgs: Array[String] = Array( - "--zookeeper", this.zkConnect, - "--topic", topic, - "--from-beginning") - val conf = new ConsoleConsumer.ConsumerConfig(consumerArgs) - val consumer = new OldConsumer(Whitelist(topic), ConsoleConsumer.getOldConsumerProps(conf)) - val groupID = conf.consumerProps.get("group.id") - try { - assertTrue(s"Consumer group should be created.", zkUtils.getChildren(ZkUtils.ConsumersPath).head == groupID) - } finally { - consumer.stop() - ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), ZkUtils.ConsumersPath + "/" + groupID) - } - assertTrue("The zk node for this group should be deleted.", zkUtils.getChildren(ZkUtils.ConsumersPath).isEmpty) - } +class ConsoleConsumerTest extends JUnitSuite { @Test def shouldLimitReadsToMaxMessageLimit() { @@ -214,5 +190,4 @@ class ConsoleConsumerTest extends KafkaServerTestHarness { assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) } - } From 7a9cfb859e722e9fa15ab2506adcbc838255559a Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 7 Jan 2017 12:15:47 +0800 Subject: [PATCH 21/24] tweaked some imports to keep same as trunk --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 48abaf3042821..6cebe1a97e909 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -20,7 +20,6 @@ package kafka.tools import java.io.PrintStream import java.util.concurrent.CountDownLatch import java.util.{Locale, Properties, Random} - import joptsimple._ import kafka.admin.AdminUtils import kafka.api.OffsetRequest @@ -28,7 +27,7 @@ import kafka.common.{MessageFormatter, StreamEndException} import kafka.consumer._ import kafka.message._ import kafka.metrics.KafkaMetricsReporter -import kafka.utils.{ZkUtils, _} +import kafka.utils._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.record.TimestampType @@ -36,7 +35,6 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.utils.Utils import org.apache.log4j.Logger - import scala.collection.JavaConverters._ /** From 09699a1f7c3e72e4197e277439470bd66567880e Mon Sep 17 00:00:00 2001 From: amethystic Date: Mon, 9 Jan 2017 11:53:42 +0800 Subject: [PATCH 22/24] change access level for ConsoleConsumer.deleteZkPathForConsumerGroup to private --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 6cebe1a97e909..fad8cc84e62f5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -412,7 +412,7 @@ object ConsoleConsumer extends Logging { } } - def deleteZkPathForConsumerGroup(zkUrl: String, path: String) { + private def deleteZkPathForConsumerGroup(zkUrl: String, path: String) { val zkUtils = ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled()) try { AdminUtils.deleteConsumerGroupInZK(zkUtils, path) From f52cfa0a71c381d2f1308a63e1b4348b0ca603bb Mon Sep 17 00:00:00 2001 From: amethystic Date: Mon, 9 Jan 2017 11:58:06 +0800 Subject: [PATCH 23/24] changed access level for ConsoleConsumer.deleteZkPathForConsumerGroup to private[tools]x --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 +- .../kafka/tools/ConsoleConsumerIntegrationTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index fad8cc84e62f5..25674c64d934b 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -412,7 +412,7 @@ object ConsoleConsumer extends Logging { } } - private def deleteZkPathForConsumerGroup(zkUrl: String, path: String) { + private[tools] def deleteZkPathForConsumerGroup(zkUrl: String, path: String) { val zkUtils = ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled()) try { AdminUtils.deleteConsumerGroupInZK(zkUtils, path) diff --git a/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala index f7e8031443278..9feab523a6a33 100644 --- a/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala @@ -1,4 +1,4 @@ -package integration.kafka.tools +package kafka.tools import java.util.Properties From c4bf1d9dfc22421cee10a2da45def8a99a0b5f7b Mon Sep 17 00:00:00 2001 From: amethystic Date: Mon, 9 Jan 2017 12:00:18 +0800 Subject: [PATCH 24/24] removed useless import --- .../integration/kafka/tools/ConsoleConsumerIntegrationTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala index 9feab523a6a33..24fb6dada58e7 100644 --- a/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/ConsoleConsumerIntegrationTest.scala @@ -5,7 +5,6 @@ import java.util.Properties import kafka.consumer.{OldConsumer, Whitelist} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.tools.ConsoleConsumer import kafka.utils.{TestUtils, ZkUtils} import org.junit.Assert.assertTrue import org.junit.Test