From c44aec39103530af0ecd17270adbd74528eed35d Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Fri, 10 Jun 2016 15:29:06 -0700 Subject: [PATCH] KAFKA-3818: Change Mirror Maker default assignment strategy to round robin It might make more sense to use round robin assignment by default for Mirror Maker since it gives a better balance between the instances, in particular when the number of Mirror Maker instances exceeds the typical number of partitions per topic. There doesn't seem to be any need to keep range assignment since copartitioning is not an issue. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d7e09e4efdb0..2696f2fe391b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -28,14 +28,14 @@ import joptsimple.OptionParser import kafka.consumer.BaseConsumerRecord import kafka.metrics.KafkaMetricsGroup import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist} -import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RoundRobinAssignor} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} -import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap @@ -254,6 +254,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { whitelist: Option[String]): Seq[ConsumerWrapper] = { // Disable consumer auto offsets commit to prevent data loss. maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false") + // Use round robin assignment by default for Mirror Maker since it gives a better balance between the instances, + // in particular when the number of Mirror Maker instances exceeds the typical number of partitions per topic. + // There doesn't seem to be any need to keep range assignment since co-partitioning is not an issue. + maybeSetDefaultProperty(consumerConfigProps, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName) // Hardcode the deserializer to ByteArrayDeserializer consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName) consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)