Skip to content
Permalink
Browse files

add msg filter option and user id extractor

  • Loading branch information...
billryan committed Mar 14, 2019
1 parent c9f930e commit 85b2dabc7fa2050aaaf1afea64a1e64ccd79be8e
Showing with 40 additions and 5 deletions.
  1. +40 −5 core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -95,6 +95,12 @@ object MirrorMaker extends Logging {
.describedAs("Java regex (String)")
.ofType(classOf[String])

val msgFilterOpt = parser.accepts("message.handler.args",
"Message handler args to filter for mirror.")
.withRequiredArg()
.describedAs("substring to filter raw message")
.ofType(classOf[String])

val helpOpt = parser.accepts("help", "Print this message.")

if(args.length == 0)
@@ -149,6 +155,11 @@ object MirrorMaker extends Logging {
else
new Blacklist(options.valueOf(blacklistOpt))

val msgFilterSpec = if (options.has(msgFilterOpt))
options.valueOf(msgFilterOpt)
else
""

var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
try {
streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder())).flatten
@@ -157,7 +168,7 @@ object MirrorMaker extends Logging {
fatal("Unable to create stream - shutting down mirror maker.")
connectors.foreach(_.shutdown)
}
consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2))
consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2, msgFilterSpec))
assert(consumerThreads.size == numConsumers)

Runtime.getRuntime.addShutdownHook(new Thread() {
@@ -243,12 +254,17 @@ object MirrorMaker extends Logging {

class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
mirrorDataChannel: DataChannel,
threadId: Int)
threadId: Int,
msgFilter: String)
extends Thread with Logging with KafkaMetricsGroup {

private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-consumer-" + threadId
private var isCleanShutdown: Boolean = true
private val msgFilterBytes = msgFilter.getBytes()
// extract id from "distinct_id":"userid_573"
private val ID_STR_START = "\"distinct_id\":\"".getBytes // "distinct_id":"
private val ID_STR_END = "\"".getBytes // "
this.logIdent = "[%s] ".format(threadName)

this.setName(threadName)
@@ -257,8 +273,12 @@ object MirrorMaker extends Logging {
info("Starting mirror maker consumer thread " + threadName)
try {
for (msgAndMetadata <- stream) {
val data = new ProducerRecord[Array[Byte],Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
mirrorDataChannel.put(data)
val msg: Array[Byte] = msgAndMetadata.message()
if (msgFilter.isEmpty || msg.containsSlice(msgFilterBytes)) {
val keyData = subByteArray(msg, ID_STR_START, ID_STR_END).getOrElse(msgAndMetadata.key)
val data = new ProducerRecord[Array[Byte],Array[Byte]](msgAndMetadata.topic, keyData, msg)
mirrorDataChannel.put(data)
}
}
} catch {
case e: Throwable => {
@@ -284,6 +304,21 @@ object MirrorMaker extends Logging {
case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted. This might leak data!")
}
}

// extract id from sensors json string: {"distinct_id":"userid_573","xxx":"xxx"}
def subByteArray(rawByteArray: Array[Byte], startSlice: Array[Byte], endSlice: Array[Byte]): Option[Array[Byte]] = {
var result: Option[Array[Byte]] = None
val oldStartIndex = rawByteArray.indexOfSlice(startSlice)
val newStartIndex = oldStartIndex + startSlice.length
if ((oldStartIndex >= 0) && (newStartIndex < rawByteArray.length)) {
val newEndIndex = rawByteArray.indexOfSlice(endSlice, newStartIndex)
if (newEndIndex >= 0) {
result = Some(rawByteArray.slice(newStartIndex, newEndIndex))
}
}

result
}
}

class ProducerThread (val dataChannel: DataChannel,
@@ -348,4 +383,4 @@ object MirrorMaker extends Logging {
}
}
}
}
}

0 comments on commit 85b2dab

Please sign in to comment.
You can’t perform that action at this time.