Skip to content

Commit

Permalink
Added Java API and added more Scala and Java unit tests. Also updated…
Browse files Browse the repository at this point in the history
… docs.
  • Loading branch information
tdas committed Feb 6, 2015
1 parent e73589c commit 50f2b56
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
@transient ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ object KafkaCluster {
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
val hpa = hp.split(":")
if (hpa.size == 1) {
throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]")
}
(hpa(0), hpa(1).toInt)
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.List;
import java.util.ArrayList;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;

import junit.framework.Assert;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.junit.Test;
import org.junit.After;
import org.junit.Before;

public class JavaDirectKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
suiteBase.tearDownKafka();
}

@Test
public void testKafkaStream() throws InterruptedException {
String topic1 = "topic1";
String topic2 = "topic2";

List<String> topic1data = createTopicAndSendData(topic1);
List<String> topic2data = createTopicAndSendData(topic2);

HashSet<String> sent = new HashSet<String>();
sent.addAll(topic1data);
sent.addAll(topic2data);

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
kafkaParams.put("auto.offset.reset", "smallest");

JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicToSet(topic1)
).map(
new Function<Tuple2<String, String>, String>() {
@Override
public String call(scala.Tuple2<String, String> kv) throws Exception {
return kv._2();
}
}
);

JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
topicOffsetToMap(topic2, (long) 0),
new Function<MessageAndMetadata<String, String>, String>() {
@Override
public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
return msgAndMd.message();
}
}
);
JavaDStream<String> unifiedStream = stream1.union(stream2);

final HashSet<String> result = new HashSet<String>();
unifiedStream.foreachRDD(
new Function<JavaRDD<String>, Void> () {
@Override
public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception {
result.addAll(rdd.collect());
return null;
}
}
);
ssc.start();
long startTime = System.currentTimeMillis();
boolean matches = false;
while (!matches && System.currentTimeMillis() - startTime < 20000) {
matches = sent.size() == result.size();
Thread.sleep(50);
}
Assert.assertEquals(sent, result);
ssc.stop();
}

private HashSet<String> topicToSet(String topic) {
HashSet<String> topicSet = new HashSet<String>();
topicSet.add(topic);
return topicSet;
}

private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
topicMap.put(new TopicAndPartition(topic, scala.Int.box(0)), offsetToStart);
return topicMap;
}

private List<String> createTopicAndSendData(String topic) {
List<String> data = java.util.Arrays.asList(topic+"-1", topic+"-2", topic+"-3");
HashMap<String, Integer> sent = new HashMap<String, Integer>();
for(String i: data) {
sent.put(i, 1);
}

suiteBase.createTopic(topic);

HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,15 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.util.Utils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata

class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
with BeforeAndAfter with BeforeAndAfterAll with Eventually {
val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)

val brokerHost = "localhost"

val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerHost:$brokerPort",
"auto.offset.reset" -> "smallest"
)

var ssc: StreamingContext = _
var testDir: File = _
Expand All @@ -66,13 +62,18 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
}

test("basic receiving with multiple topics") {
val topics = Set("newA", "newB")
test("basic receiving with multiple topics and smallest starting offset") {
val topics = Set("topic1", "topic2", "topic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
createTopic(t)
produceAndSendMessage(t, data)
}
val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "smallest"
)

ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
Expand Down Expand Up @@ -106,13 +107,107 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
ssc.stop()
}
test("receiving from largest starting offset") {
val topic = "largest"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
createTopic(topic)
val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "largest"
)
val kc = new KafkaCluster(kafkaParams)
def getLatestOffset(): Long = {
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
}

// Send some initial messages before starting context
produceAndSendMessage(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
assert(getLatestOffset() > 3)
}
val offsetBeforeStart = getLatestOffset()

// Setup context and kafka stream with largest offset
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set(topic))
}
assert(
stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
.fromOffsets(topicPartition) >= offsetBeforeStart,
"Start offset not from latest"
)

val collectedData = new mutable.ArrayBuffer[String]()
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
ssc.start()
val newData = Map("b" -> 10)
produceAndSendMessage(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
}


test("creating stream by offset") {
val topic = "offset"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
createTopic(topic)
val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "largest"
)
val kc = new KafkaCluster(kafkaParams)
def getLatestOffset(): Long = {
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
}

// Send some initial messages before starting context
produceAndSendMessage(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
assert(getLatestOffset() >= 10)
}
val offsetBeforeStart = getLatestOffset()

// Setup context and kafka stream with largest offset
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc, kafkaParams, Map(topicPartition -> 11L),
(m: MessageAndMetadata[String, String]) => m.message())
}
assert(
stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
.fromOffsets(topicPartition) >= offsetBeforeStart,
"Start offset not from latest"
)

val collectedData = new mutable.ArrayBuffer[String]()
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
ssc.start()
val newData = Map("b" -> 10)
produceAndSendMessage(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
}

// Test to verify the offset ranges can be recovered from the checkpoints
test("offset recovery") {
val topic = "recovery"
createTopic(topic)
testDir = Utils.createTempDir()

val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "smallest"
)

// Send data to Kafka and wait for it to be received
def sendDataAndWaitForReceive(data: Seq[Int]) {
val strings = data.map { _.toString}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,10 @@ import org.scalatest.BeforeAndAfter
import kafka.common.TopicAndPartition

class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
val brokerHost = "localhost"

val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")

val kafkaParams = Map("metadata.broker.list" -> s"$brokerAddress")
val kc = new KafkaCluster(kafkaParams)

val topic = "kcsuitetopic" + Random.nextInt(10000)

val topicAndPartition = TopicAndPartition(topic, 0)

before {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ import org.apache.spark.util.Utils
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {

val zkHost = "localhost"
var zkPort: Int = 0
var zkAddress: String = _
var zkClient: ZkClient = _

private val zkHost = "localhost"
val brokerHost = "localhost"
var brokerPort = 9092
var brokerAddress: String = _

private val zkConnectionTimeout = 6000
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
private var zkPort: Int = 0
protected var brokerPort = 9092
private var brokerConf: KafkaConfig = _
private var server: KafkaServer = _
private var producer: Producer[String, String] = _
Expand All @@ -67,6 +70,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkAddress = s"$zkHost:$zkPort"

logInfo("==================== 0 ====================")

zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
Expand Down Expand Up @@ -94,6 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}

Thread.sleep(2000)
brokerAddress = s"$brokerHost:$brokerPort"
logInfo("==================== 4 ====================")
}

Expand Down

0 comments on commit 50f2b56

Please sign in to comment.