From f61d82f3152f8e6cf758fa349c8198289e0deae8 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 28 Oct 2014 23:06:33 -0700 Subject: [PATCH 1/5] [SPARK-4122][STREAMING] Add a library that can write data back to Kafka from Spark Streaming. This adds a library that can writes dstreams to Kafka. An implicit also has been added so users can call dstream.writeToKafka(..) --- .../streaming/kafka/KafkaOutputWriter.scala | 125 ++++++++++++++++++ .../spark/streaming/kafka/ProducerCache.scala | 34 +++++ .../streaming/kafka/KafkaStreamSuite.scala | 92 +++++++++++++ 3 files changed, 251 insertions(+) create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala new file mode 100644 index 0000000000000..a94a41583b42a --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kafka + +import java.util.Properties + +import kafka.producer.{ProducerConfig, KeyedMessage, Producer} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * Import this object in this form: + * {{{ + * import org.apache.spark.streaming.kafka.KafkaWriter._ + * }}} + * + * Once imported, the `writeToKafka` can be called on any [[DStream]] object in this form: + * {{{ + * dstream.writeToKafka(producerConfig, f) + * }}} + */ +object KafkaWriter { + import scala.language.implicitConversions + /** + * This implicit method allows the user to call dstream.writeToKafka(..) + * @param dstream - DStream to write to Kafka + * @tparam T - The type of the DStream + * @tparam K - The type of the key to serialize to + * @tparam V - The type of the value to serialize to + * @return + */ + implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: DStream[T]): KafkaWriter[T] = { + new KafkaWriter[T](dstream) + } +} + +/** + * + * This class can be used to write data to Kafka from Spark Streaming. To write data to Kafka + * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your application and call + * `dstream.writeToKafka(producerConf, func)` + * + * Here is an example: + * {{{ + * // Adding this line allows the user to call dstream.writeDStreamToKafka(..) + * import org.apache.spark.streaming.kafka.KafkaWriter._ + * + * class ExampleWriter { + * val instream = ssc.queueStream(toBe) + * val producerConf = new Properties() + * producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder") + * producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder") + * producerConf.put("metadata.broker.list", "kafka.example.com:5545") + * producerConf.put("request.required.acks", "1") + * instream.writeToKafka(producerConf, + * (x: String) => new KeyedMessage[String, String]("default", null, x)) + * ssc.start() + * } + * + * }}} + * @param dstream - The [[DStream]] to be written to Kafka + * + */ +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) { + + /** + * To write data from a DStream to Kafka, call this function after creating the DStream. Once + * the DStream is passed into this function, all data coming from the DStream is written out to + * Kafka. The properties instance takes the configuration required to connect to the Kafka + * brokers in the standard Kafka format. The serializerFunc is a function that converts each + * element of the RDD to a Kafka [[KeyedMessage]]. This closure should be serializable - so it + * should use only instances of Serializables. + * @param producerConfig The configuration that can be used to connect to Kafka + * @param serializerFunc The function to convert the data from the stream into Kafka + * [[KeyedMessage]]s. + * @tparam K The type of the key + * @tparam V The type of the value + * + */ + def writeToKafka[K, V](producerConfig: Properties, + serializerFunc: T => KeyedMessage[K, V]): Unit = { + + // Broadcast the producer to avoid sending it every time. + val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig) + + def func = (rdd: RDD[T]) => { + rdd.foreachPartition(events => { + // The ForEachDStream runs the function locally on the driver. So the + // ProducerCache from the driver is likely to get serialized and + // sent, which is fine - because at that point the Producer itself is + // not initialized, so a None is sent over the wire. + // Get the producer from that local executor and write! + val producer: Producer[K, V] = { + if (ProducerCache.isCached) { + ProducerCache.getCachedProducer + .asInstanceOf[Producer[K, V]] + } else { + val producer = + new Producer[K, V](new ProducerConfig(broadcastedConfig.value)) + ProducerCache.cacheProducer(producer) + producer + } + } + producer.send(events.map(serializerFunc).toArray: _*) + }) + } + dstream.foreachRDD(func) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala new file mode 100644 index 0000000000000..3a0bf359bf527 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kafka + +object ProducerCache { + + private var producerOpt: Option[Any] = None + + def getCachedProducer: Any = { + producerOpt.get + } + + def cacheProducer(producer: Any): Unit = { + producerOpt = Some(producer) + } + + def isCached: Boolean = { + producerOpt.isDefined + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 6943326eb750e..b4cf70545ef57 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -19,10 +19,17 @@ package org.apache.spark.streaming.kafka import java.io.File import java.net.InetSocketAddress +import java.util import java.util.{Properties, Random} +import akka.actor.FSM.-> +import org.apache.spark.rdd.RDD +import org.slf4j.{LoggerFactory, Logger} + import scala.collection.mutable +import kafka.consumer._ +import kafka.message.MessageAndMetadata import kafka.admin.CreateTopicCommand import kafka.common.{KafkaException, TopicAndPartition} import kafka.producer.{KeyedMessage, ProducerConfig, Producer} @@ -35,10 +42,13 @@ import org.I0Itec.zkclient.ZkClient import org.apache.zookeeper.server.ZooKeeperServer import org.apache.zookeeper.server.NIOServerCnxnFactory +import org.apache.spark.streaming.kafka.KafkaWriter._ import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils +import scala.collection.mutable.ArrayBuffer + class KafkaStreamSuite extends TestSuiteBase { import KafkaTestUtils._ @@ -138,6 +148,42 @@ class KafkaStreamSuite extends TestSuiteBase { ssc.stop() } + test("Test writing back to Kafka") { + val ssc = new StreamingContext(master, framework, batchDuration) + val toBe = new mutable.Queue[RDD[String]]() + var j = 0 + while (j < 9) { + toBe.enqueue(ssc.sc.makeRDD(Seq(j.toString, (j + 1).toString, (j + 2).toString))) + j += 3 + } + val instream = ssc.queueStream(toBe) + val producerConf = new Properties() + producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder") + producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder") + producerConf.put("metadata.broker.list", s"localhost:$brokerPort") + producerConf.put("request.required.acks", "1") + instream.writeToKafka(producerConf, + (x: String) => new KeyedMessage[String, Array[Byte]]("topic1", null,x.getBytes)) + ssc.start() + var i = 0 + val expectedResults = (0 to 8).map(_.toString).toSeq + val actualResults = new ArrayBuffer[String]() + val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", + "group.id" -> s"test-consumer-${random.nextInt(10000)}", + "auto.offset.reset" -> "smallest", "topic" -> "topic1") + val consumer = new KafkaConsumer(kafkaParams) + consumer.initTopicList(List("topic1")) + while (i < 9) { + val fetchedMsg = new String(consumer.getNextMessage("topic1").message + .asInstanceOf[Array[Byte]]) + actualResults += fetchedMsg + i += 1 + } + val actualResultSorted = actualResults.sorted + assert(expectedResults.toSeq === actualResultSorted.toSeq) + ssc.stop() + } + private def createTestMessage(topic: String, sent: Map[String, Int]) : Seq[KeyedMessage[String, String]] = { val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { @@ -159,6 +205,7 @@ class KafkaStreamSuite extends TestSuiteBase { producer.send(createTestMessage(topic, sent): _*) logInfo("==================== 6 ====================") } + } object KafkaTestUtils { @@ -226,4 +273,49 @@ object KafkaTestUtils { Utils.deleteRecursively(logDir) } } + + class KafkaConsumer(config: Map[String, String]) { + val props = new Properties() + for ((k,v) <- config) { + props.put(k, v) + } + val consumer: ConsumerConnector = kafka.consumer.Consumer.create(new ConsumerConfig(props)) + private var consumerMap: scala.collection.Map[String, List[KafkaStream[Array[Byte], + Array[Byte]]]] = null + + private final val logger: Logger = LoggerFactory.getLogger(classOf[KafkaConsumer]) + + def initTopicList(topics: List[String]) { + val topicCountMap = new mutable.HashMap[String, Int] + for (topic <- topics) { + topicCountMap(topic) = 1 + } + consumerMap = consumer.createMessageStreams(topicCountMap.asInstanceOf[collection + .Map[String, Int]]) + } + + def getNextMessage(topic: String): MessageAndMetadata[_, _] = { + val streams: scala.List[KafkaStream[Array[Byte], Array[Byte]]] = consumerMap(topic) + val stream: KafkaStream[Array[Byte], Array[Byte]] = streams(0) + val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator() + try { + if (it.hasNext()) { + it.next() + } + else { + null + } + } + catch { + case e: ConsumerTimeoutException => { + logger.error("0 messages available to fetch for the topic " + topic) + null + } + } + } + + def shutdown(): Unit = { + consumer.shutdown() + } + } } From 372c749458e22ba1a9acd2badbfad51e4dda3968 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 28 Oct 2014 23:19:38 -0700 Subject: [PATCH 2/5] Reorganize imports. --- .../spark/streaming/kafka/KafkaOutputWriter.scala | 5 +++-- .../spark/streaming/kafka/KafkaStreamSuite.scala | 14 ++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala index a94a41583b42a..bdce94d31ef83 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala @@ -18,12 +18,13 @@ package org.apache.spark.streaming.kafka import java.util.Properties +import scala.reflect.ClassTag + import kafka.producer.{ProducerConfig, KeyedMessage, Producer} + import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream -import scala.reflect.ClassTag - /** * Import this object in this form: * {{{ diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index b4cf70545ef57..d1b33951ea579 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -19,14 +19,10 @@ package org.apache.spark.streaming.kafka import java.io.File import java.net.InetSocketAddress -import java.util import java.util.{Properties, Random} -import akka.actor.FSM.-> -import org.apache.spark.rdd.RDD -import org.slf4j.{LoggerFactory, Logger} - import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import kafka.consumer._ import kafka.message.MessageAndMetadata @@ -36,19 +32,17 @@ import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import kafka.utils.ZKStringSerializer import kafka.serializer.{StringDecoder, StringEncoder} import kafka.server.{KafkaConfig, KafkaServer} - -import org.I0Itec.zkclient.ZkClient - import org.apache.zookeeper.server.ZooKeeperServer import org.apache.zookeeper.server.NIOServerCnxnFactory +import org.I0Itec.zkclient.ZkClient +import org.slf4j.{LoggerFactory, Logger} +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka.KafkaWriter._ import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils -import scala.collection.mutable.ArrayBuffer - class KafkaStreamSuite extends TestSuiteBase { import KafkaTestUtils._ From e6ef32f54d77c706f7b36b199b58655b052111dc Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 29 Oct 2014 14:50:44 -0700 Subject: [PATCH 3/5] Make Producer Cache thread-safe --- .../apache/spark/streaming/kafka/KafkaOutputWriter.scala | 1 + .../org/apache/spark/streaming/kafka/ProducerCache.scala | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala index bdce94d31ef83..5322699ff167a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.streaming.kafka import java.util.Properties diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala index 3a0bf359bf527..bc988e3055808 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala @@ -16,19 +16,19 @@ */ package org.apache.spark.streaming.kafka -object ProducerCache { +private[kafka] object ProducerCache { private var producerOpt: Option[Any] = None - def getCachedProducer: Any = { + def getCachedProducer: Any = synchronized { producerOpt.get } - def cacheProducer(producer: Any): Unit = { + def cacheProducer(producer: Any): Unit = synchronized { producerOpt = Some(producer) } - def isCached: Boolean = { + def isCached: Boolean = synchronized { producerOpt.isDefined } } From aa1f10dc5684c3dcb647048fe5651a1c9ab79154 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 29 Oct 2014 16:38:52 -0700 Subject: [PATCH 4/5] Create one producer per partition, rather than caching the producer. --- .../streaming/kafka/KafkaOutputWriter.scala | 33 +++++++++--------- .../spark/streaming/kafka/ProducerCache.scala | 34 ------------------- 2 files changed, 16 insertions(+), 51 deletions(-) delete mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala index 5322699ff167a..e52d59d0675ba 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala @@ -23,6 +23,7 @@ import scala.reflect.ClassTag import kafka.producer.{ProducerConfig, KeyedMessage, Producer} +import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream @@ -79,7 +80,7 @@ object KafkaWriter { * @param dstream - The [[DStream]] to be written to Kafka * */ -class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) { +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Logging { /** * To write data from a DStream to Kafka, call this function after creating the DStream. Once @@ -103,23 +104,21 @@ class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) { def func = (rdd: RDD[T]) => { rdd.foreachPartition(events => { - // The ForEachDStream runs the function locally on the driver. So the - // ProducerCache from the driver is likely to get serialized and - // sent, which is fine - because at that point the Producer itself is - // not initialized, so a None is sent over the wire. - // Get the producer from that local executor and write! - val producer: Producer[K, V] = { - if (ProducerCache.isCached) { - ProducerCache.getCachedProducer - .asInstanceOf[Producer[K, V]] - } else { - val producer = - new Producer[K, V](new ProducerConfig(broadcastedConfig.value)) - ProducerCache.cacheProducer(producer) - producer - } + // The ForEachDStream runs the function locally on the driver. + // This code can alternatively use sc.runJob, but this approach seemed cleaner. + val producer: Producer[K, V] = + new Producer[K, V](new ProducerConfig(broadcastedConfig.value)) + try { + producer.send(events.map(serializerFunc).toArray: _*) + logDebug("Data sent successfully to Kafka") + } catch { + case e: Exception => + logError("Failed to send data to Kafka", e) + throw e + } finally { + producer.close() + logDebug("Kafka Producer closed successfully.") } - producer.send(events.map(serializerFunc).toArray: _*) }) } dstream.foreachRDD(func) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala deleted file mode 100644 index bc988e3055808..0000000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ProducerCache.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kafka - -private[kafka] object ProducerCache { - - private var producerOpt: Option[Any] = None - - def getCachedProducer: Any = synchronized { - producerOpt.get - } - - def cacheProducer(producer: Any): Unit = synchronized { - producerOpt = Some(producer) - } - - def isCached: Boolean = synchronized { - producerOpt.isDefined - } -} From 0a45f1ab5ba5f9440a78e47e48b48f0321d440c1 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 29 Oct 2014 20:29:17 -0700 Subject: [PATCH 5/5] Extend Serializable in KafkaWriter --- .../org/apache/spark/streaming/kafka/KafkaOutputWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala index e52d59d0675ba..9f7eafc820a1d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala @@ -80,7 +80,7 @@ object KafkaWriter { * @param dstream - The [[DStream]] to be written to Kafka * */ -class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Logging { +class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends Serializable with Logging { /** * To write data from a DStream to Kafka, call this function after creating the DStream. Once