From 796d4ca6ed2e420c73e1beab9518a324df74457c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 24 Jan 2014 18:51:21 +0800 Subject: [PATCH] Add real Kafka streaming test Conflicts: external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala project/SparkBuild.scala --- external/kafka/pom.xml | 6 + .../streaming/kafka/JavaKafkaStreamSuite.java | 117 +++++++++--- .../streaming/kafka/KafkaStreamSuite.scala | 170 ++++++++++++++++-- 3 files changed, 257 insertions(+), 36 deletions(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index daf03360bc5f5..7fe43834a5d8e 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -70,6 +70,12 @@ + + net.sf.jopt-simple + jopt-simple + 4.5 + test + org.scalatest scalatest_${scala.binary.version} diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 9f8046bf00f8f..641c17a9f4c08 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -17,31 +17,108 @@ package org.apache.spark.streaming.kafka; +import java.io.Serializable; import java.util.HashMap; +import java.util.List; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.junit.Test; -import com.google.common.collect.Maps; -import kafka.serializer.StringDecoder; -import org.apache.spark.storage.StorageLevel; +import scala.Predef; +import scala.Tuple2; +import scala.collection.JavaConverters; + +import junit.framework.Assert; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaStreamSuite; + +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable { + private transient KafkaStreamSuite testSuite = new KafkaStreamSuite(); + + @Before + @Override + public void setUp() { + testSuite.beforeFunction(); + System.clearProperty("spark.driver.port"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + } + + @After + @Override + public void tearDown() { + ssc.stop(); + ssc = null; + System.clearProperty("spark.driver.port"); + testSuite.afterFunction(); + } -public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { @Test public void testKafkaStream() { - HashMap topics = Maps.newHashMap(); - - // tests the API, does not actually test data receiving - JavaPairReceiverInputDStream test1 = - KafkaUtils.createStream(ssc, "localhost:12345", "group", topics); - JavaPairReceiverInputDStream test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK_SER_2()); - - HashMap kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect", "localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairReceiverInputDStream test3 = KafkaUtils.createStream(ssc, - String.class, String.class, StringDecoder.class, StringDecoder.class, - kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + String topic = "topic1"; + HashMap topics = new HashMap(); + topics.put(topic, 1); + + HashMap sent = new HashMap(); + sent.put("a", 5); + sent.put("b", 3); + sent.put("c", 10); + + JavaPairDStream stream = KafkaUtils.createStream(ssc, + testSuite.zkConnect(), + "group", + topics); + + final HashMap result = new HashMap(); + + JavaDStream words = stream.map( + new Function, String>() { + @Override + public String call(Tuple2 tuple2) throws Exception { + return tuple2._2(); + } + } + ); + + words.countByValue().foreachRDD( + new Function, Void>() { + @Override + public Void call(JavaPairRDD rdd) throws Exception { + List> ret = rdd.collect(); + for (Tuple2 r : ret) { + if (result.containsKey(r._1())) { + result.put(r._1(), result.get(r._1()) + r._2()); + } else { + result.put(r._1(), r._2()); + } + } + + return null; + } + } + ); + + ssc.start(); + + HashMap tmp = new HashMap(sent); + testSuite.produceAndSendTestMessage(topic, + JavaConverters.asScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms() + )); + + ssc.awaitTermination(10000); + + Assert.assertEquals(sent.size(), result.size()); + for (String k : sent.keySet()) { + Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue()); + } } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index e6f2c4a5cf5d1..5bf6cefed9109 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -17,28 +17,166 @@ package org.apache.spark.streaming.kafka -import kafka.serializer.StringDecoder +import java.io.File +import java.net.InetSocketAddress +import java.util.{Properties, Random} + +import scala.collection.mutable + +import kafka.admin.CreateTopicCommand +import kafka.common.TopicAndPartition +import kafka.producer.{KeyedMessage, ProducerConfig, Producer} +import kafka.utils.ZKStringSerializer +import kafka.serializer.StringEncoder +import kafka.server.{KafkaConfig, KafkaServer} + import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.zookeeper.server.ZooKeeperServer +import org.apache.zookeeper.server.NIOServerCnxnFactory + +import org.I0Itec.zkclient.ZkClient class KafkaStreamSuite extends TestSuiteBase { + val zkConnect = "localhost:2181" + var zookeeper: EmbeddedZookeeper = _ + var zkClient: ZkClient = _ + val zkConnectionTimeout = 6000 + val zkSessionTimeout = 6000 + + val brokerPort = 9092 + val brokerProps = getBrokerConfig(brokerPort) + val brokerConf = new KafkaConfig(brokerProps) + var server: KafkaServer = _ + + override def beforeFunction() { + // Zookeeper server startup + zookeeper = new EmbeddedZookeeper(zkConnect) + zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + + // Kafka broker startup + server = new KafkaServer(brokerConf) + server.startup() + + super.beforeFunction() + } + + override def afterFunction() { + server.shutdown() + brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) } + + zkClient.close() + zookeeper.shutdown() + + super.afterFunction() + } test("kafka input stream") { val ssc = new StreamingContext(master, framework, batchDuration) - val topics = Map("my-topic" -> 1) - - // tests the API, does not actually test data receiving - val test1: ReceiverInputDStream[(String, String)] = - KafkaUtils.createStream(ssc, "localhost:1234", "group", topics) - val test2: ReceiverInputDStream[(String, String)] = - KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) - val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3: ReceiverInputDStream[(String, String)] = - KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) - - // TODO: Actually test receiving data + val topic = "topic1" + val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) + + val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1)) + val result = new mutable.HashMap[String, Long]() + stream.map { case (k, v) => v } + .countByValue() + .foreachRDD { r => + val ret = r.collect() + ret.toMap.foreach { kv => + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 + result.put(kv._1, count) + } + } + ssc.start() + produceAndSendTestMessage(topic, sent) + ssc.awaitTermination(10000) + + assert(sent.size === result.size) + sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } + ssc.stop() } + + private def getBrokerConfig(port: Int): Properties = { + val props = new Properties() + props.put("broker.id", "0") + props. + put("host.name", "localhost") + props.put("port", port.toString) + props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath) + props.put("zookeeper.connect", zkConnect) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props + } + + private def getProducerConfig(brokerList: String): Properties = { + val props = new Properties() + props.put("metadata.broker.list", brokerList) + props.put("serializer.class", classOf[StringEncoder].getName) + props + } + + private def createTestMessage(topic: String, sent: Map[String, Int]) + : Seq[KeyedMessage[String, String]] = { + val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { + new KeyedMessage[String, String](topic, s) + } + messages.toSeq + } + + def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) { + val brokerAddr = brokerConf.hostName + ":" + brokerConf.port + val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + + // wait until metadata is propagated + Thread.sleep(1000) + assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))) + + producer.send(createTestMessage(topic, sent): _*) + producer.close() + } +} + +object KafkaStreamSuite { + val random = new Random() + + def tmpDir(): File = { + val tmp = System.getProperty("java.io.tmpdir") + val f = new File(tmp, "spark-kafka-" + random.nextInt(1000)) + f.mkdirs() + f + } + + def deleteDir(file: File) { + if (file.isFile) { + file.delete() + } else { + for (f <- file.listFiles()) { + deleteDir(f) + } + file.delete() + } + } +} + +class EmbeddedZookeeper(val zkConnect: String) { + val random = new Random() + val snapshotDir = KafkaStreamSuite.tmpDir() + val logDir = KafkaStreamSuite.tmpDir() + + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val(ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + def shutdown() { + factory.shutdown() + KafkaStreamSuite.deleteDir(snapshotDir) + KafkaStreamSuite.deleteDir(logDir) + } }