From 0f120f976ce66e37cb5180df2b024def1cfcd6bb Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 22 Jan 2016 14:22:05 -0800 Subject: [PATCH 1/2] KAFKA-3140: Fix PatternSyntaxException and hand caused by it in MirrorMaker on passing invalid java regex string as whitelist. --- .../main/scala/kafka/tools/MirrorMaker.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index f1d56b5f1065b..bf5612107caf6 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -20,7 +20,7 @@ package kafka.tools import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.regex.Pattern +import java.util.regex.{PatternSyntaxException, Pattern} import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge @@ -385,8 +385,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def run() { info("Starting mirror maker thread " + threadName) - mirrorMakerConsumer.init() try { + mirrorMakerConsumer.init() + // We need the two while loop to make sure when old consumer is used, even there is no message we // still commit offset. When new consumer is used, this is handled by poll(timeout). while (!exitingOnSendFailure && !shuttingDown) { @@ -515,8 +516,17 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def init() { debug("Initiating new consumer") val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) - if (whitelistOpt.isDefined) - consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) + if (whitelistOpt.isDefined) { + val patternOpt: Option[Pattern] = try { + Some(Pattern.compile(whitelistOpt.get)) + } catch { + case pse: PatternSyntaxException => + error("Invalid expression syntax: %s".format(whitelistOpt.get)) + throw pse + } + + consumer.subscribe(patternOpt.get, consumerRebalanceListener) + } } // New consumer always hasNext From 67542e9a7e8ae5068e414b95c3c37543f4d73170 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 22 Jan 2016 14:37:39 -0800 Subject: [PATCH 2/2] Address review comment. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index bf5612107caf6..f03623a20983f 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -517,15 +517,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { debug("Initiating new consumer") val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) if (whitelistOpt.isDefined) { - val patternOpt: Option[Pattern] = try { - Some(Pattern.compile(whitelistOpt.get)) + try { + consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) } catch { case pse: PatternSyntaxException => error("Invalid expression syntax: %s".format(whitelistOpt.get)) throw pse } - - consumer.subscribe(patternOpt.get, consumerRebalanceListener) } }