diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index daf03360bc5f5..7fe43834a5d8e 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -70,6 +70,12 @@
+
+ net.sf.jopt-simple
+ jopt-simple
+ 4.5
+ test
+
org.scalatest
scalatest_${scala.binary.version}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 9f8046bf00f8f..641c17a9f4c08 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -17,31 +17,108 @@
package org.apache.spark.streaming.kafka;
+import java.io.Serializable;
import java.util.HashMap;
+import java.util.List;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.junit.Test;
-import com.google.common.collect.Maps;
-import kafka.serializer.StringDecoder;
-import org.apache.spark.storage.StorageLevel;
+import scala.Predef;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+import junit.framework.Assert;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaStreamSuite;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
+ private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
+
+ @Before
+ @Override
+ public void setUp() {
+ testSuite.beforeFunction();
+ System.clearProperty("spark.driver.port");
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ }
+
+ @After
+ @Override
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ System.clearProperty("spark.driver.port");
+ testSuite.afterFunction();
+ }
-public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
- HashMap topics = Maps.newHashMap();
-
- // tests the API, does not actually test data receiving
- JavaPairReceiverInputDStream test1 =
- KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
- JavaPairReceiverInputDStream test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
- StorageLevel.MEMORY_AND_DISK_SER_2());
-
- HashMap kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect", "localhost:12345");
- kafkaParams.put("group.id","consumer-group");
- JavaPairReceiverInputDStream test3 = KafkaUtils.createStream(ssc,
- String.class, String.class, StringDecoder.class, StringDecoder.class,
- kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
+ String topic = "topic1";
+ HashMap topics = new HashMap();
+ topics.put(topic, 1);
+
+ HashMap sent = new HashMap();
+ sent.put("a", 5);
+ sent.put("b", 3);
+ sent.put("c", 10);
+
+ JavaPairDStream stream = KafkaUtils.createStream(ssc,
+ testSuite.zkConnect(),
+ "group",
+ topics);
+
+ final HashMap result = new HashMap();
+
+ JavaDStream words = stream.map(
+ new Function, String>() {
+ @Override
+ public String call(Tuple2 tuple2) throws Exception {
+ return tuple2._2();
+ }
+ }
+ );
+
+ words.countByValue().foreachRDD(
+ new Function, Void>() {
+ @Override
+ public Void call(JavaPairRDD rdd) throws Exception {
+ List> ret = rdd.collect();
+ for (Tuple2 r : ret) {
+ if (result.containsKey(r._1())) {
+ result.put(r._1(), result.get(r._1()) + r._2());
+ } else {
+ result.put(r._1(), r._2());
+ }
+ }
+
+ return null;
+ }
+ }
+ );
+
+ ssc.start();
+
+ HashMap tmp = new HashMap(sent);
+ testSuite.produceAndSendTestMessage(topic,
+ JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
+ Predef.>conforms()
+ ));
+
+ ssc.awaitTermination(10000);
+
+ Assert.assertEquals(sent.size(), result.size());
+ for (String k : sent.keySet()) {
+ Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
+ }
}
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index e6f2c4a5cf5d1..5bf6cefed9109 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -17,28 +17,166 @@
package org.apache.spark.streaming.kafka
-import kafka.serializer.StringDecoder
+import java.io.File
+import java.net.InetSocketAddress
+import java.util.{Properties, Random}
+
+import scala.collection.mutable
+
+import kafka.admin.CreateTopicCommand
+import kafka.common.TopicAndPartition
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
+import kafka.utils.ZKStringSerializer
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.zookeeper.server.ZooKeeperServer
+import org.apache.zookeeper.server.NIOServerCnxnFactory
+
+import org.I0Itec.zkclient.ZkClient
class KafkaStreamSuite extends TestSuiteBase {
+ val zkConnect = "localhost:2181"
+ var zookeeper: EmbeddedZookeeper = _
+ var zkClient: ZkClient = _
+ val zkConnectionTimeout = 6000
+ val zkSessionTimeout = 6000
+
+ val brokerPort = 9092
+ val brokerProps = getBrokerConfig(brokerPort)
+ val brokerConf = new KafkaConfig(brokerProps)
+ var server: KafkaServer = _
+
+ override def beforeFunction() {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(zkConnect)
+ zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+
+ // Kafka broker startup
+ server = new KafkaServer(brokerConf)
+ server.startup()
+
+ super.beforeFunction()
+ }
+
+ override def afterFunction() {
+ server.shutdown()
+ brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) }
+
+ zkClient.close()
+ zookeeper.shutdown()
+
+ super.afterFunction()
+ }
test("kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
- val topics = Map("my-topic" -> 1)
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[(String, String)] =
- KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
- val test2: ReceiverInputDStream[(String, String)] =
- KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
- val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3: ReceiverInputDStream[(String, String)] =
- KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
-
- // TODO: Actually test receiving data
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+
+ val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1))
+ val result = new mutable.HashMap[String, Long]()
+ stream.map { case (k, v) => v }
+ .countByValue()
+ .foreachRDD { r =>
+ val ret = r.collect()
+ ret.toMap.foreach { kv =>
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
+ }
+ ssc.start()
+ produceAndSendTestMessage(topic, sent)
+ ssc.awaitTermination(10000)
+
+ assert(sent.size === result.size)
+ sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
+
ssc.stop()
}
+
+ private def getBrokerConfig(port: Int): Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.
+ put("host.name", "localhost")
+ props.put("port", port.toString)
+ props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkConnect)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props
+ }
+
+ private def getProducerConfig(brokerList: String): Properties = {
+ val props = new Properties()
+ props.put("metadata.broker.list", brokerList)
+ props.put("serializer.class", classOf[StringEncoder].getName)
+ props
+ }
+
+ private def createTestMessage(topic: String, sent: Map[String, Int])
+ : Seq[KeyedMessage[String, String]] = {
+ val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
+ new KeyedMessage[String, String](topic, s)
+ }
+ messages.toSeq
+ }
+
+ def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) {
+ val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
+ val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+
+ // wait until metadata is propagated
+ Thread.sleep(1000)
+ assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)))
+
+ producer.send(createTestMessage(topic, sent): _*)
+ producer.close()
+ }
+}
+
+object KafkaStreamSuite {
+ val random = new Random()
+
+ def tmpDir(): File = {
+ val tmp = System.getProperty("java.io.tmpdir")
+ val f = new File(tmp, "spark-kafka-" + random.nextInt(1000))
+ f.mkdirs()
+ f
+ }
+
+ def deleteDir(file: File) {
+ if (file.isFile) {
+ file.delete()
+ } else {
+ for (f <- file.listFiles()) {
+ deleteDir(f)
+ }
+ file.delete()
+ }
+ }
+}
+
+class EmbeddedZookeeper(val zkConnect: String) {
+ val random = new Random()
+ val snapshotDir = KafkaStreamSuite.tmpDir()
+ val logDir = KafkaStreamSuite.tmpDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val(ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ def shutdown() {
+ factory.shutdown()
+ KafkaStreamSuite.deleteDir(snapshotDir)
+ KafkaStreamSuite.deleteDir(logDir)
+ }
}