Skip to content

Commit

Permalink
code refactor according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Aug 5, 2014
1 parent 5222330 commit b6a505f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ public void testKafkaStream() throws InterruptedException {
testSuite.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
testSuite.produceAndSendMessage(topic,
JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
kafkaParams.put("group.id", "test-consumer-" + testSuite.random().nextInt(10000));
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory

import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class KafkaStreamSuite extends TestSuiteBase {
import KafkaStreamSuite._
import KafkaTestUtils._

val zkConnect = "localhost:2181"
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000

val brokerPort = 9092
val brokerProps = getBrokerConfig(brokerPort)
val brokerProps = getBrokerConfig(brokerPort, zkConnect)
val brokerConf = new KafkaConfig(brokerProps)

protected var zookeeper: EmbeddedZookeeper = _
Expand Down Expand Up @@ -76,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase {
override def afterFunction() {
producer.close()
server.shutdown()
brokerConf.logDirs.foreach { f => deleteDir(new File(f)) }
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }

zkClient.close()
zookeeper.shutdown()
Expand Down Expand Up @@ -119,25 +120,6 @@ class KafkaStreamSuite extends TestSuiteBase {
ssc.stop()
}

private def getBrokerConfig(port: Int): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", createTmpDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}

private def getProducerConfig(brokerList: String): Properties = {
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

private def createTestMessage(topic: String, sent: Map[String, Int])
: Seq[KeyedMessage[String, String]] = {
val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
Expand All @@ -161,25 +143,26 @@ class KafkaStreamSuite extends TestSuiteBase {
}
}

object KafkaStreamSuite {
object KafkaTestUtils {
val random = new Random()

def createTmpDir(): File = {
val tmp = System.getProperty("java.io.tmpdir")
val f = new File(tmp, "spark-kafka-" + random.nextInt(10000))
f.mkdirs()
f
def getBrokerConfig(port: Int, zkConnect: String): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}

def deleteDir(file: File) {
if (file.isFile) {
file.delete()
} else {
for (f <- file.listFiles()) {
deleteDir(f)
}
file.delete()
}
def getProducerConfig(brokerList: String): Properties = {
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
Expand All @@ -202,25 +185,25 @@ object KafkaStreamSuite {
TopicAndPartition(topic, partition))), timeout),
s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}

class EmbeddedZookeeper(val zkConnect: String) {
val random = new Random()
val snapshotDir = KafkaStreamSuite.createTmpDir()
val logDir = KafkaStreamSuite.createTmpDir()
class EmbeddedZookeeper(val zkConnect: String) {
val random = new Random()
val snapshotDir = Utils.createTempDir()
val logDir = Utils.createTempDir()

val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val(ip, port) = {
val splits = zkConnect.split(":")
(splits(0), splits(1).toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)

def shutdown() {
factory.shutdown()
KafkaStreamSuite.deleteDir(snapshotDir)
KafkaStreamSuite.deleteDir(logDir)
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
(splits(0), splits(1).toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)

def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)
Utils.deleteRecursively(logDir)
}
}
}

0 comments on commit b6a505f

Please sign in to comment.