From 1b6a53792750ef67cba0926a364861676eb18cb5 Mon Sep 17 00:00:00 2001 From: huxi Date: Fri, 28 Oct 2016 13:58:52 +0800 Subject: [PATCH 1/9] Fix kafka-4351: Topic regex behavioral change with MirrorMaker new consumer Author: huxi_2b@hotmail.com Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 1a6ba69b6d9ce..bd1f4b5c6d303 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -561,7 +561,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) if (whitelistOpt.isDefined) { try { - consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) + val revisedWhitelist = whitelistOpt.get.trim + .replace(',', '|') + .replace(" ", "") + .replaceAll("""^["']+""","") + .replaceAll("""["']+$""","") // property files may bring quotes + consumer.subscribe(Pattern.compile(revisedWhitelist), consumerRebalanceListener) } catch { case pse: PatternSyntaxException => error("Invalid expression syntax: %s".format(whitelistOpt.get)) From 34b370752fe9f0a838140e3977c51dc5a1245008 Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 29 Oct 2016 11:34:34 +0800 Subject: [PATCH 2/9] kafka 4351: Topic regex behavioral change with MirrorMaker new consumer author: huxi_2b@hotmail.com Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index bd1f4b5c6d303..5a099d8613110 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -561,14 +561,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) if (whitelistOpt.isDefined) { try { - val revisedWhitelist = whitelistOpt.get.trim - .replace(',', '|') - .replace(" ", "") - .replaceAll("""^["']+""","") - .replaceAll("""["']+$""","") // property files may bring quotes - consumer.subscribe(Pattern.compile(revisedWhitelist), consumerRebalanceListener) + val whitelist = Whitelist(whitelistOpt.get) + consumer.subscribe(Pattern.compile(whitelist.regex), consumerRebalanceListener) } catch { - case pse: PatternSyntaxException => + case pse: PatternSyntaxException | RuntimeException => error("Invalid expression syntax: %s".format(whitelistOpt.get)) throw pse } From 5e89ea397a818751775e292f2d5ed8219499aafb Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 29 Oct 2016 11:56:35 +0800 Subject: [PATCH 3/9] kafka-4351: Topic regex behavioral change with MirrorMaker new consumer Author: huxi_2b@hotmail.com Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5a099d8613110..b0c357c4266a2 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -564,7 +564,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val whitelist = Whitelist(whitelistOpt.get) consumer.subscribe(Pattern.compile(whitelist.regex), consumerRebalanceListener) } catch { - case pse: PatternSyntaxException | RuntimeException => + case pse: RuntimeException => error("Invalid expression syntax: %s".format(whitelistOpt.get)) throw pse } From d828949dfb17ae1e07818866877e915252c79f60 Mon Sep 17 00:00:00 2001 From: huxi Date: Fri, 18 Nov 2016 09:42:31 +0800 Subject: [PATCH 4/9] Update docs/upgrade.html for Kafka-4351. Add statement on 'regular expression is supported for the --whitelist option of the new consumer in MirrorMaker' --- docs/upgrade.html | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/upgrade.html b/docs/upgrade.html index e6b9747d0f98e..f034079abadfa 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -119,6 +119,8 @@
Notable changes in
  • Overloaded constructors were added to kafka.api.FetchRequest and kafka.javaapi.FetchRequest to allow the caller to specify the order of the partitions (since order is significant in v3). The previously existing constructors were deprecated and the partitions are shuffled before the request is sent to avoid starvation issues.
  • +
  • Regular expression is allowed when using the new Java consumer in MirrorMaker. User who employs the new consumer can specify a valid Java-style regular + expression for the --whitelist option.
  • New Protocol Versions
    From 8258b280a5e15283874dc29d7c4df6679bbbd0c3 Mon Sep 17 00:00:00 2001 From: huxi Date: Tue, 13 Dec 2016 16:18:45 +0800 Subject: [PATCH 5/9] add unit-test case for Kafka4351: Topic regex behavioral change with MirrorMaker new consumer @author: huxi2b@gmail.com Add a test case for MirrorMaker to verify it supports regex. --- .../main/scala/kafka/tools/MirrorMaker.scala | 19 ++++- .../unit/kafka/tools/MirrorMakerTest.scala | 79 ++++++++++++++++++- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d6137a111aa04..c609f7d0f852f 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -310,6 +310,22 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { mirrorMakerThreads.foreach(_.awaitShutdown()) } + // Only for testing + private[kafka] def createMirrorMakerProducer(brokerList: String): Unit = { + // create producer + val producerProps = new Properties() + // Defaults to no data loss settings. + maybeSetDefaultProperty(producerProps, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString) + maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString) + maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all") + maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") + // Always set producer key and value serializer to ByteArraySerializer. + producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producer = new MirrorMakerProducer(producerProps) + } + private def createOldConsumers(numStreams: Int, consumerConfigProps: Properties, customRebalanceListener: Option[ConsumerRebalanceListener], @@ -544,7 +560,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], + // Only for testing + private[kafka] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener], whitelistOpt: Option[String]) extends MirrorMakerBaseConsumer { diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala index d6a5470ce43f6..535d5588ba1cf 100644 --- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala @@ -17,12 +17,85 @@ package kafka.tools -import kafka.consumer.BaseConsumerRecord +import java.util +import java.util.Collections.singletonList +import java.util.Properties + +import kafka.consumer.{BaseConsumerRecord, ConsumerTimeoutException} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.tools.MirrorMaker.MirrorMakerNewConsumer +import kafka.utils.{CoreUtils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.directory.mavibot.btree.serializer.ByteSerializer +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.record.{Record, TimestampType} +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, BytesDeserializer, StringDeserializer} import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Before, Test} + +class MirrorMakerTest extends ZooKeeperTestHarness { + + private var server1: KafkaServer = null + + @Before + override def setUp() { + super.setUp() + val props1 = TestUtils.createBrokerConfig(0, zkConnect, false) + val config1 = KafkaConfig.fromProps(props1) + server1 = TestUtils.createServer(config1) -class MirrorMakerTest { + val topic = "new-topic" + TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = List(server1)) + } + + @After + override def tearDown() { + server1.shutdown + CoreUtils.delete(server1.config.logDirs) + super.tearDown() + } + + @Test + def testWhitelistTopic() { + val topic = "new-topic" + val msg = "test message" + val brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1)) + + // Create a test producer to delivery a message + val props = new Properties(); + props.put("bootstrap.servers", brokerList); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + val producer1 = new KafkaProducer[Array[Byte], Array[Byte]](props) + producer1.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, msg.getBytes())) + producer1.flush() // Explicitly invoke flush method to make effect immediately + producer1.close() // Close the producer + + // Create a MirrorMaker consumer + val config = new util.HashMap[String, AnyRef] + config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-gropu") + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + config.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + config.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config) + MirrorMaker.createMirrorMakerProducer(brokerList) + + val whitelist = Some("another_topic|foo" ) // Test a regular expression + val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist) + mirrorMakerConsumer.init() + try { + val data = mirrorMakerConsumer.receive() + assertTrue("MirrorMaker consumer should get the correct test topic: " + topic, data.topic.equals(topic)) + assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value).equals(msg)) + } catch { + case e: RuntimeException => + fail("Unexpected exception: " + e) + } finally { + consumer.close() + } + } @Test def testDefaultMirrorMakerMessageHandler() { From f6c570dbfe2914b09eaaedfeda76814f4214b0e0 Mon Sep 17 00:00:00 2001 From: huxi Date: Tue, 13 Dec 2016 16:24:58 +0800 Subject: [PATCH 6/9] remove useless imports and fix the regex string for Kafka4351: Topic regex behavioral change with MirrorMaker new consumer @author: huxi2b@gmail.com --- .../unit/kafka/tools/MirrorMakerTest.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala index 535d5588ba1cf..907cce675ebd8 100644 --- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala @@ -18,19 +18,16 @@ package kafka.tools import java.util -import java.util.Collections.singletonList import java.util.Properties -import kafka.consumer.{BaseConsumerRecord, ConsumerTimeoutException} +import kafka.consumer.{BaseConsumerRecord} import kafka.server.{KafkaConfig, KafkaServer} import kafka.tools.MirrorMaker.MirrorMakerNewConsumer import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import org.apache.directory.mavibot.btree.serializer.ByteSerializer import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} -import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.record.{Record, TimestampType} -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, BytesDeserializer, StringDeserializer} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -63,10 +60,10 @@ class MirrorMakerTest extends ZooKeeperTestHarness { val brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1)) // Create a test producer to delivery a message - val props = new Properties(); - props.put("bootstrap.servers", brokerList); - props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + val props = new Properties() + props.put("bootstrap.servers", brokerList) + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") val producer1 = new KafkaProducer[Array[Byte], Array[Byte]](props) producer1.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, msg.getBytes())) producer1.flush() // Explicitly invoke flush method to make effect immediately @@ -77,12 +74,12 @@ class MirrorMakerTest extends ZooKeeperTestHarness { config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-gropu") config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - config.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - config.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + config.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") + config.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config) MirrorMaker.createMirrorMakerProducer(brokerList) - val whitelist = Some("another_topic|foo" ) // Test a regular expression + val whitelist = Some("new.*|another_topic|foo" ) // Test a regular expression val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist) mirrorMakerConsumer.init() try { From fa2538fc4d0b19c29bee2a9961257d3699c834db Mon Sep 17 00:00:00 2001 From: huxi Date: Thu, 22 Dec 2016 13:54:21 +0800 Subject: [PATCH 7/9] kafka--4351: Topic regex behavioral change with MirrorMaker new consumer 1. Move unit test case under integration/kafka/tools 2. Remove test code in MirrorMaker --- .../main/scala/kafka/tools/MirrorMaker.scala | 22 +----- .../kafka/tools/TestMirrorMaker.scala | 69 +++++++++++++++++ .../unit/kafka/tools/MirrorMakerTest.scala | 76 +------------------ 3 files changed, 75 insertions(+), 92 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 665dd477d6d18..5f235d9046b93 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -20,7 +20,7 @@ package kafka.tools import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.regex.{Pattern, PatternSyntaxException} +import java.util.regex.Pattern import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge @@ -64,7 +64,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig} */ object MirrorMaker extends Logging with KafkaMetricsGroup { - private var producer: MirrorMakerProducer = null + private[tools] var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) // Track the messages not successfully sent by mirror maker. @@ -310,22 +310,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { mirrorMakerThreads.foreach(_.awaitShutdown()) } - // Only for testing - private[kafka] def createMirrorMakerProducer(brokerList: String): Unit = { - // create producer - val producerProps = new Properties() - // Defaults to no data loss settings. - maybeSetDefaultProperty(producerProps, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString) - maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString) - maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all") - maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") - // Always set producer key and value serializer to ByteArraySerializer. - producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producer = new MirrorMakerProducer(producerProps) - } - private def createOldConsumers(numStreams: Int, consumerConfigProps: Properties, customRebalanceListener: Option[ConsumerRebalanceListener], @@ -704,7 +688,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerProducer(val producerProps: Properties) { + private[kafka] class MirrorMakerProducer(val producerProps: Properties) { val sync = producerProps.getProperty("producer.type", "async").equals("sync") diff --git a/core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala b/core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala new file mode 100644 index 0000000000000..ff2b9f62fdc49 --- /dev/null +++ b/core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala @@ -0,0 +1,69 @@ +package kafka.tools + +import java.util.Properties + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer} +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.junit.Assert.assertTrue +import org.junit.{After, Before, Test} + +class TestMirrorMaker extends KafkaServerTestHarness { + + override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(KafkaConfig.fromProps(_, new Properties())) + + @Before + override def setUp() { + super.setUp() + } + + @After + override def tearDown() { + super.tearDown() + } + + @Test + def testRegularExpressionTopic() { + val topic = "new-topic" + val msg = "a test message" + val brokerList = TestUtils.getBrokerListStrFromServers(servers) + + // Create a test producer to delivery a message + val producerProps = new Properties() + producerProps.put("bootstrap.servers", brokerList) + producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + val producer = new MirrorMakerProducer(producerProps) + MirrorMaker.producer = producer + MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) + MirrorMaker.producer.close() + + // Create a MirrorMaker consumer + val consumerProps = new Properties() + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + val whitelist = Some("new.*,another_topic,foo" ) + val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist) + mirrorMakerConsumer.init() + try { + val data = mirrorMakerConsumer.receive() + println(new String(data.value)) + assertTrue(s"MirrorMaker consumer should get the correct topic: $topic", data.topic.equals(topic)) + assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value).equals(msg)) + } catch { + case e: RuntimeException => + fail("Unexpected exception: " + e) + } finally { + consumer.close() + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala index 907cce675ebd8..d6a5470ce43f6 100644 --- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala @@ -17,82 +17,12 @@ package kafka.tools -import java.util -import java.util.Properties - -import kafka.consumer.{BaseConsumerRecord} -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.tools.MirrorMaker.MirrorMakerNewConsumer -import kafka.utils.{CoreUtils, TestUtils} -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import kafka.consumer.BaseConsumerRecord import org.apache.kafka.common.record.{Record, TimestampType} import org.junit.Assert._ -import org.junit.{After, Before, Test} - -class MirrorMakerTest extends ZooKeeperTestHarness { - - private var server1: KafkaServer = null - - @Before - override def setUp() { - super.setUp() - val props1 = TestUtils.createBrokerConfig(0, zkConnect, false) - val config1 = KafkaConfig.fromProps(props1) - server1 = TestUtils.createServer(config1) +import org.junit.Test - val topic = "new-topic" - TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = List(server1)) - } - - @After - override def tearDown() { - server1.shutdown - CoreUtils.delete(server1.config.logDirs) - super.tearDown() - } - - @Test - def testWhitelistTopic() { - val topic = "new-topic" - val msg = "test message" - val brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1)) - - // Create a test producer to delivery a message - val props = new Properties() - props.put("bootstrap.servers", brokerList) - props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") - val producer1 = new KafkaProducer[Array[Byte], Array[Byte]](props) - producer1.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, msg.getBytes())) - producer1.flush() // Explicitly invoke flush method to make effect immediately - producer1.close() // Close the producer - - // Create a MirrorMaker consumer - val config = new util.HashMap[String, AnyRef] - config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-gropu") - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - config.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") - config.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config) - MirrorMaker.createMirrorMakerProducer(brokerList) - - val whitelist = Some("new.*|another_topic|foo" ) // Test a regular expression - val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist) - mirrorMakerConsumer.init() - try { - val data = mirrorMakerConsumer.receive() - assertTrue("MirrorMaker consumer should get the correct test topic: " + topic, data.topic.equals(topic)) - assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value).equals(msg)) - } catch { - case e: RuntimeException => - fail("Unexpected exception: " + e) - } finally { - consumer.close() - } - } +class MirrorMakerTest { @Test def testDefaultMirrorMakerMessageHandler() { From 07565f4e4dad18df018ca7b44489d1362c0931cd Mon Sep 17 00:00:00 2001 From: amethystic Date: Sun, 25 Dec 2016 19:34:22 +0800 Subject: [PATCH 8/9] kafka-4351: Topic regex behavioral change with MirrorMaker new consumer 1. narrow down the visibility for some method and class 2. modify test case --- .../main/scala/kafka/tools/MirrorMaker.scala | 11 +++-- ...scala => MirrorMakerIntegrationTest.scala} | 43 +++++++++---------- 2 files changed, 26 insertions(+), 28 deletions(-) rename core/src/test/scala/integration/kafka/tools/{TestMirrorMaker.scala => MirrorMakerIntegrationTest.scala} (67%) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5f235d9046b93..4708729d77e25 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -575,7 +575,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } // Only for testing - private[kafka] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], + private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener], whitelistOpt: Option[String]) extends MirrorMakerBaseConsumer { @@ -590,13 +590,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def init() { debug("Initiating new consumer") val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) - if (whitelistOpt.isDefined) { + whitelistOpt.foreach { whitelist => try { - val whitelist = Whitelist(whitelistOpt.get) - consumer.subscribe(Pattern.compile(whitelist.regex), consumerRebalanceListener) + consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener) } catch { case pse: RuntimeException => - error("Invalid expression syntax: %s".format(whitelistOpt.get)) + error("Invalid expression syntax: %s".format(whitelist)) throw pse } } @@ -688,7 +687,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private[kafka] class MirrorMakerProducer(val producerProps: Properties) { + private[tools] class MirrorMakerProducer(val producerProps: Properties) { val sync = producerProps.getProperty("producer.type", "async").equals("sync") diff --git a/core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala similarity index 67% rename from core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala rename to core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index ff2b9f62fdc49..d53e49aead77f 100644 --- a/core/src/test/scala/integration/kafka/tools/TestMirrorMaker.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -2,6 +2,7 @@ package kafka.tools import java.util.Properties +import kafka.consumer.ConsumerTimeoutException import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer} @@ -10,23 +11,13 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.junit.Assert.assertTrue -import org.junit.{After, Before, Test} +import org.junit.Test -class TestMirrorMaker extends KafkaServerTestHarness { +class MirrorMakerIntegrationTest extends KafkaServerTestHarness { override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) .map(KafkaConfig.fromProps(_, new Properties())) - @Before - override def setUp() { - super.setUp() - } - - @After - override def tearDown() { - super.tearDown() - } - @Test def testRegularExpressionTopic() { val topic = "new-topic" @@ -35,7 +26,8 @@ class TestMirrorMaker extends KafkaServerTestHarness { // Create a test producer to delivery a message val producerProps = new Properties() - producerProps.put("bootstrap.servers", brokerList) + producerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put("producer.type", "sync") producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") val producer = new MirrorMakerProducer(producerProps) @@ -43,24 +35,31 @@ class TestMirrorMaker extends KafkaServerTestHarness { MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) MirrorMaker.producer.close() + servers foreach { server => + println(server.zkUtils.getAllTopics().mkString(",")) + } // Create a MirrorMaker consumer val consumerProps = new Properties() - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1") + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - val whitelist = Some("new.*,another_topic,foo" ) + val whitelist = Some("new.*,another_topic,foo") val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist) mirrorMakerConsumer.init() try { - val data = mirrorMakerConsumer.receive() - println(new String(data.value)) - assertTrue(s"MirrorMaker consumer should get the correct topic: $topic", data.topic.equals(topic)) - assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value).equals(msg)) - } catch { - case e: RuntimeException => - fail("Unexpected exception: " + e) + val maxTryCount = 3 // it might need to call multiple poll calls to retrieve the message + for (_ <- 0 until maxTryCount) { + try { + val data = mirrorMakerConsumer.receive() + assertTrue(s"MirrorMaker consumer should get the correct topic: $topic", data.topic == topic) + assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value) == msg) + return + } catch { + case _: ConsumerTimeoutException => // swallow it + } + } } finally { consumer.close() } From b1b6586a5e9841c769b4d0602ae2d32d69aa9800 Mon Sep 17 00:00:00 2001 From: amethystic Date: Sat, 31 Dec 2016 11:17:58 +0800 Subject: [PATCH 9/9] kafka-4351: Topic regex behavioral change with MirrorMaker new consumer fix MirrorMakerIntegrationTest.testRegularExpressionTopic code as per Ijuma's comments. --- .../kafka/tools/MirrorMakerIntegrationTest.scala | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index d53e49aead77f..d1b1e438415e7 100644 --- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -10,7 +10,6 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.junit.Assert.assertTrue import org.junit.Test class MirrorMakerIntegrationTest extends KafkaServerTestHarness { @@ -35,9 +34,6 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) MirrorMaker.producer.close() - servers foreach { server => - println(server.zkUtils.getAllTopics().mkString(",")) - } // Create a MirrorMaker consumer val consumerProps = new Properties() consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") @@ -49,17 +45,14 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist) mirrorMakerConsumer.init() try { - val maxTryCount = 3 // it might need to call multiple poll calls to retrieve the message - for (_ <- 0 until maxTryCount) { + TestUtils.waitUntilTrue(() => { try { val data = mirrorMakerConsumer.receive() - assertTrue(s"MirrorMaker consumer should get the correct topic: $topic", data.topic == topic) - assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value) == msg) - return + data.topic == topic && new String(data.value) == msg } catch { - case _: ConsumerTimeoutException => // swallow it + case _: ConsumerTimeoutException => false } - } + }, "MirrorMaker consumer should be able to subscribe the correct topic and read the correct message.") } finally { consumer.close() }