Skip to content

Commit

Permalink
MINOR: Make new consumer default for Mirror Maker
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1914 from hachikuji/mm-default-new-consumer

(cherry picked from commit 3db752a)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
hachikuji authored and ijuma committed Sep 27, 2016
1 parent bb3a860 commit aadda5a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 26 deletions.
49 changes: 31 additions & 18 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ package kafka.tools
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.regex.{PatternSyntaxException, Pattern}
import java.util.regex.{Pattern, PatternSyntaxException}
import java.util.{Collections, Properties}

import com.yammer.metrics.core.Gauge
import joptsimple.OptionParser
import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig => OldConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
import kafka.consumer.{BaseConsumer, BaseConsumerRecord, Blacklist, ConsumerIterator, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector, ConsumerConfig => OldConsumerConfig}
import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
import kafka.utils.{CommandLineUtils, CoreUtils, Logging, ZKConfig}
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer, CommitFailedException}
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -95,7 +95,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.ofType(classOf[String])

val useNewConsumerOpt = parser.accepts("new.consumer",
"Use new consumer in mirror maker.")
"Use new consumer in mirror maker (this is the default).")

val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
Expand Down Expand Up @@ -170,34 +170,47 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {

if (options.has(helpOpt)) {
parser.printHelpOn(System.out)
System.exit(0)
sys.exit(0)
}

CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)

val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
val useOldConsumer = consumerProps.containsKey(ZKConfig.ZkConnectProp)

val useNewConsumer = options.has(useNewConsumerOpt)
if (useNewConsumer) {
if (useOldConsumer) {
if (options.has(useNewConsumerOpt)) {
error(s"The consumer configuration parameter `${ZKConfig.ZkConnectProp}` is not valid when using --new.consumer")
sys.exit(1)
}

if (consumerProps.containsKey(NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
error(s"The configuration parameters `${ZKConfig.ZkConnectProp}` (old consumer) and " +
s"`${NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}` (new consumer) cannot be used together.")
sys.exit(1)
}

if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
error("Exactly one of whitelist or blacklist is required.")
sys.exit(1)
}
} else {
if (options.has(blacklistOpt)) {
error("blacklist can not be used when using new consumer in mirror maker. Use whitelist instead.")
System.exit(1)
sys.exit(1)
}

if (!options.has(whitelistOpt)) {
error("whitelist must be specified when using new consumer in mirror maker.")
System.exit(1)
sys.exit(1)
}

if (!consumerProps.keySet().contains(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
if (!consumerProps.containsKey(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based mirror maker will " +
"change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If " +
"you prefer to make this switch in advance of that release add the following to the corresponding new-consumer " +
"config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
} else {
if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
error("Exactly one of whitelist or blacklist is required.")
System.exit(1)
}

}

abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
Expand All @@ -223,7 +236,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
producer = new MirrorMakerProducer(producerProps)

// Create consumers
val mirrorMakerConsumers = if (!useNewConsumer) {
val mirrorMakerConsumers = if (useOldConsumer) {
val customRebalanceListener = {
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
if (customRebalanceListenerClass != null) {
Expand Down Expand Up @@ -450,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// if it exits accidentally, stop the entire mirror maker
if (!isShuttingdown.get()) {
fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
System.exit(-1)
sys.exit(-1)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -915,19 +915,27 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group)
def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
}

object ZKConfig {
val ZkConnectProp = "zookeeper.connect"
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
}

class ZKConfig(props: VerifiableProperties) {
import ZKConfig._

/** ZK host string */
val zkConnect = props.getString("zookeeper.connect")
val zkConnect = props.getString(ZkConnectProp)

/** zookeeper session timeout */
val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000)

/** the max time that the client waits to establish a connection to zookeeper */
val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp, zkSessionTimeoutMs)

/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
}

object ZkPath {
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def start_cmd(self, node):
"--topic %(topic)s --consumer.config %(config_file)s" % args

if self.new_consumer:
cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args
cmd += " --bootstrap-server %(broker_list)s" % args
else:
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
Expand Down
2 changes: 0 additions & 2 deletions tests/kafkatest/services/mirror_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ def start_cmd(self, node):
cmd += " --whitelist=\"%s\"" % self.whitelist
if self.blacklist is not None:
cmd += " --blacklist=\"%s\"" % self.blacklist
if self.new_consumer:
cmd += " --new.consumer"

cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
return cmd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def args(self):
}

if self.new_consumer:
args['new-consumer'] = ""
args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
args['zookeeper'] = self.kafka.zk.connect_setting()
Expand Down

0 comments on commit aadda5a

Please sign in to comment.