Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Make new consumer default for Mirror Maker #1914

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 31 additions & 18 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ package kafka.tools
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.regex.{PatternSyntaxException, Pattern}
import java.util.regex.{Pattern, PatternSyntaxException}
import java.util.{Collections, Properties}

import com.yammer.metrics.core.Gauge
import joptsimple.OptionParser
import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig => OldConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
import kafka.consumer.{BaseConsumer, BaseConsumerRecord, Blacklist, ConsumerIterator, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector, ConsumerConfig => OldConsumerConfig}
import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
import kafka.utils.{CommandLineUtils, CoreUtils, Logging, ZKConfig}
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer, CommitFailedException}
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -95,7 +95,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
.ofType(classOf[String])

val useNewConsumerOpt = parser.accepts("new.consumer",
"Use new consumer in mirror maker.")
"Use new consumer in mirror maker (this is the default).")

val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
Expand Down Expand Up @@ -170,34 +170,47 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {

if (options.has(helpOpt)) {
parser.printHelpOn(System.out)
System.exit(0)
sys.exit(0)
}

CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)

val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
val useOldConsumer = consumerProps.containsKey(ZKConfig.ZkConnectProp)

val useNewConsumer = options.has(useNewConsumerOpt)
if (useNewConsumer) {
if (useOldConsumer) {
if (options.has(useNewConsumerOpt)) {
error(s"The consumer configuration parameter `${ZKConfig.ZkConnectProp}` is not valid when using --new.consumer")
sys.exit(1)
}

if (consumerProps.containsKey(NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
error(s"The configuration parameters `${ZKConfig.ZkConnectProp}` (old consumer) and " +
s"`${NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}` (new consumer) cannot be used together.")
sys.exit(1)
}

if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
error("Exactly one of whitelist or blacklist is required.")
sys.exit(1)
}
} else {
if (options.has(blacklistOpt)) {
error("blacklist can not be used when using new consumer in mirror maker. Use whitelist instead.")
System.exit(1)
sys.exit(1)
}

if (!options.has(whitelistOpt)) {
error("whitelist must be specified when using new consumer in mirror maker.")
System.exit(1)
sys.exit(1)
}

if (!consumerProps.keySet().contains(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
if (!consumerProps.containsKey(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based mirror maker will " +
"change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If " +
"you prefer to make this switch in advance of that release add the following to the corresponding new-consumer " +
"config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
} else {
if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
error("Exactly one of whitelist or blacklist is required.")
System.exit(1)
}

}

abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
Expand All @@ -223,7 +236,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
producer = new MirrorMakerProducer(producerProps)

// Create consumers
val mirrorMakerConsumers = if (!useNewConsumer) {
val mirrorMakerConsumers = if (useOldConsumer) {
val customRebalanceListener = {
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
if (customRebalanceListenerClass != null) {
Expand Down Expand Up @@ -450,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// if it exits accidentally, stop the entire mirror maker
if (!isShuttingdown.get()) {
fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
System.exit(-1)
sys.exit(-1)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -915,19 +915,27 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group)
def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
}

object ZKConfig {
val ZkConnectProp = "zookeeper.connect"
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
}

class ZKConfig(props: VerifiableProperties) {
import ZKConfig._

/** ZK host string */
val zkConnect = props.getString("zookeeper.connect")
val zkConnect = props.getString(ZkConnectProp)

/** zookeeper session timeout */
val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000)

/** the max time that the client waits to establish a connection to zookeeper */
val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp, zkSessionTimeoutMs)

/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
}

object ZkPath {
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def start_cmd(self, node):
"--topic %(topic)s --consumer.config %(config_file)s" % args

if self.new_consumer:
cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args
cmd += " --bootstrap-server %(broker_list)s" % args
else:
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
Expand Down
2 changes: 0 additions & 2 deletions tests/kafkatest/services/mirror_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ def start_cmd(self, node):
cmd += " --whitelist=\"%s\"" % self.whitelist
if self.blacklist is not None:
cmd += " --blacklist=\"%s\"" % self.blacklist
if self.new_consumer:
cmd += " --new.consumer"

cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
return cmd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def args(self):
}

if self.new_consumer:
args['new-consumer'] = ""
args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
args['zookeeper'] = self.kafka.zk.connect_setting()
Expand Down