From 9af5ffe7f06ce88f36fb6057ae689faf578b16b2 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Fri, 23 Sep 2016 16:09:15 -0700 Subject: [PATCH] KAFKA-3831: Prepare for updating new-consumer-based Mirror Maker's default partition assignment strategy to round robin This patch adds proper warning message for updating the default partition assignment strategy of Mirror Maker from range to round robin. The actual switch would occur as part of a major release cycle (to be scheduled. --- .../main/scala/kafka/tools/MirrorMaker.scala | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 434607425e1e..1373f51d631c 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -25,7 +25,7 @@ import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig => OldConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder @@ -43,6 +43,7 @@ import org.apache.kafka.common.record.Record import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.util.control.ControlThrowable +import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig} /** * The mirror maker has the following architecture: @@ -174,6 +175,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt) + val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt)) + val useNewConsumer = options.has(useNewConsumerOpt) if (useNewConsumer) { if (options.has(blacklistOpt)) { @@ -184,6 +187,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { error("whitelist must be specified when using new consumer in mirror maker.") System.exit(1) } + + if (!consumerProps.keySet().contains(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)) + System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based mirror maker will " + + "change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If " + + "you prefer to make this switch in advance of that release add the following to the corresponding new-consumer " + + "config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'") } else { if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) { error("Exactly one of whitelist or blacklist is required.") @@ -233,7 +242,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener") createOldConsumers( numStreams, - options.valueOf(consumerConfigOpt), + consumerProps, customRebalanceListener, Option(options.valueOf(whitelistOpt)), Option(options.valueOf(blacklistOpt))) @@ -256,7 +265,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { "org.apache.kafka.clients.consumer.ConsumerRebalanceListner") createNewConsumers( numStreams, - options.valueOf(consumerConfigOpt), + consumerProps, customRebalanceListener, Option(options.valueOf(whitelistOpt))) } @@ -289,12 +298,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } private def createOldConsumers(numStreams: Int, - consumerConfigPath: String, - customRebalanceListener: Option[ConsumerRebalanceListener], - whitelist: Option[String], - blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer] = { - // Create consumer connector - val consumerConfigProps = Utils.loadProps(consumerConfigPath) + consumerConfigProps: Properties, + customRebalanceListener: Option[ConsumerRebalanceListener], + whitelist: Option[String], + blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer] = { // Disable consumer auto offsets commit to prevent data loss. maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false") // Set the consumer timeout so we will not block for low volume pipeline. The timeout is necessary to make sure @@ -304,7 +311,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val groupIdString = consumerConfigProps.getProperty("group.id") val connectors = (0 until numStreams) map { i => consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString) - val consumerConfig = new ConsumerConfig(consumerConfigProps) + val consumerConfig = new OldConsumerConfig(consumerConfigProps) new ZookeeperConsumerConnector(consumerConfig) } @@ -324,11 +331,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def createNewConsumers(numStreams: Int, - consumerConfigPath: String, + consumerConfigProps: Properties, customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener], whitelist: Option[String]) : Seq[MirrorMakerBaseConsumer] = { - // Create consumer connector - val consumerConfigProps = Utils.loadProps(consumerConfigPath) // Disable consumer auto offsets commit to prevent data loss. maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false") // Hardcode the deserializer to ByteArrayDeserializer