diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 3a7da3fb243a1..7d67a72d5cc7b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.language.postfixOps import scala.util.Random +import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.common.KafkaException @@ -44,6 +45,8 @@ import org.apache.spark.util.Utils /** * This is a helper class for Kafka test suites. This has the functionality to set up * and tear down local Kafka servers, and to push data using Kafka producers. + * + * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. */ private class KafkaTestUtils extends Logging { @@ -55,7 +58,7 @@ private class KafkaTestUtils extends Logging { private var zookeeper: EmbeddedZookeeper = _ - var zkClient: ZkClient = _ + private var zkClient: ZkClient = _ // Kafka broker related configurations private val brokerHost = "localhost" @@ -82,18 +85,25 @@ private class KafkaTestUtils extends Logging { s"$brokerHost:$brokerPort" } - /** Set up the Embedded Zookeeper server and get the proper Zookeeper port */ - def setupEmbeddedZookeeper(): Unit = { + def zookeeperClient: ZkClient = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") + Option(zkClient).getOrElse( + throw new IllegalStateException("Zookeeper client is not yet initialized")) + } + + // Set up the Embedded Zookeeper server and get the proper Zookeeper port + private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort + zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, + ZKStringSerializer) zkReady = true - zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) } - /** Set up the Embedded Kafka server */ - def setupEmbeddedKafkaServer(): Unit = { + // Set up the Embedded Kafka server + private def setupEmbeddedKafkaServer(): Unit = { assert(zkReady, "Zookeeper should be set up beforehand") // Kafka broker startup var bindSuccess: Boolean = false @@ -116,8 +126,14 @@ private class KafkaTestUtils extends Logging { brokerReady = true } + /** setup thw whole embedded servers, including Zookeeper and Kafka brokers */ + def setupEmbeddedServers(): Unit = { + setupEmbeddedZookeeper() + setupEmbeddedKafkaServer() + } + /** Tear down the whole servers, including Kafka broker and Zookeeper */ - def tearDownEmbeddedServers(): Unit = { + def teardownEmbeddedServers(): Unit = { brokerReady = false zkReady = false @@ -151,7 +167,7 @@ private class KafkaTestUtils extends Logging { waitUntilMetadataIsPropagated(topic, 0) } - /** Java function for sending messages to the Kafka broker */ + /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { import scala.collection.JavaConversions._ sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*)) @@ -191,6 +207,37 @@ private class KafkaTestUtils extends Logging { } private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + // A simplified version of scalatest eventually, rewrite here is to avoid adding extra test + // dependency + def eventually[T](timeout: Time, interval: Time)(func: => T): T = { + def makeAttempt(): Either[Throwable, T] = { + try { + Right(func) + } catch { + case e if NonFatal(e) => Left(e) + } + } + + val startTime = System.currentTimeMillis() + @tailrec + def tryAgain(attempt: Int): T = { + makeAttempt() match { + case Right(result) => result + case Left(e) => + val duration = System.currentTimeMillis() - startTime + if (duration < timeout.milliseconds) { + Thread.sleep(interval.milliseconds) + } else { + throw new TimeoutException(e.getMessage) + } + + tryAgain(attempt + 1) + } + } + + tryAgain(1) + } + eventually(Time(10000), Time(100)) { assert( server.apis.metadataCache.containsTopicAndPartition(topic, partition), @@ -199,38 +246,7 @@ private class KafkaTestUtils extends Logging { } } - // A simplified version of scalatest eventually, rewrite here is to avoid adding extra test - // dependency - private def eventually[T](timeout: Time, interval: Time)(func: => T): T = { - def makeAttempt(): Either[Throwable, T] = { - try { - Right(func) - } catch { - case e: Throwable => Left(e) - } - } - - val startTime = System.currentTimeMillis() - @tailrec - def tryAgain(attempt: Int): T = { - makeAttempt() match { - case Right(result) => result - case Left(e) => - val duration = System.currentTimeMillis() - startTime - if (duration < timeout.milliseconds) { - Thread.sleep(interval.milliseconds) - } else { - throw new TimeoutException(e.getMessage) - } - - tryAgain(attempt + 1) - } - } - - tryAgain(1) - } - - class EmbeddedZookeeper(val zkConnect: String) { + private class EmbeddedZookeeper(val zkConnect: String) { val random = new Random() val snapshotDir = Utils.createTempDir() val logDir = Utils.createTempDir() diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index d617c4c6261dd..1659dfd15e4d2 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -46,8 +46,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedZookeeper(); - kafkaTestUtils.setupEmbeddedKafkaServer(); + kafkaTestUtils.setupEmbeddedServers(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -64,7 +63,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils.teardownEmbeddedServers(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index c6b31f9760f76..32d5593c8aa33 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -42,8 +42,7 @@ public class JavaKafkaRDDSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedZookeeper(); - kafkaTestUtils.setupEmbeddedKafkaServer(); + kafkaTestUtils.setupEmbeddedServers(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -59,7 +58,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils.teardownEmbeddedServers(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 4b46d9975eec5..65318c95ec7c2 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Random; -import scala.Predef; import scala.Tuple2; import kafka.serializer.StringDecoder; @@ -48,8 +47,7 @@ public class JavaKafkaStreamSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedZookeeper(); - kafkaTestUtils.setupEmbeddedKafkaServer(); + kafkaTestUtils.setupEmbeddedServers(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -66,7 +64,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils.teardownEmbeddedServers(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index df5e15fa5e3dd..4fe231ab42c62 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -54,13 +54,12 @@ class DirectKafkaStreamSuite override def beforeAll { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() } override def afterAll { if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } @@ -88,7 +87,7 @@ class DirectKafkaStreamSuite } val totalSent = data.values.sum * topics.size val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "smallest" ) @@ -134,7 +133,7 @@ class DirectKafkaStreamSuite val data = Map("a" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "largest" ) val kc = new KafkaCluster(kafkaParams) @@ -179,7 +178,7 @@ class DirectKafkaStreamSuite val data = Map("a" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "largest" ) val kc = new KafkaCluster(kafkaParams) @@ -225,7 +224,7 @@ class DirectKafkaStreamSuite testDir = Utils.createTempDir() val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "smallest" ) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index be668043faf5b..1e9539a467dbc 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka import scala.util.Random import kafka.common.TopicAndPartition -import org.scalatest.{FunSuite, BeforeAndAfterAll} +import org.scalatest.{BeforeAndAfterAll, FunSuite} class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { private val topic = "kcsuitetopic" + Random.nextInt(10000) @@ -31,17 +31,16 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll() { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() kafkaTestUtils.createTopic(topic) kafkaTestUtils.sendMessages(topic, Map("a" -> 1)) - kc = new KafkaCluster(Map("metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}")) + kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress)) } override def afterAll() { if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 419e12cb1b854..eb9bc2a1e08e2 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import org.scalatest.{FunSuite, BeforeAndAfterAll} +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark._ @@ -37,8 +37,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll { sc = new SparkContext(sparkConf) kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() } override def afterAll { @@ -48,7 +47,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { } if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index dee6c4ebd253e..3ec670f376507 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -23,31 +23,30 @@ import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} -class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfter { +class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfterAll { private var ssc: StreamingContext = _ private var kafkaTestUtils: KafkaTestUtils = _ - before { + override def beforeAll(): Unit = { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() } - after { + override def afterAll(): Unit = { if (ssc != null) { ssc.stop() ssc = null } if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index e8c551ab9cb2b..59000b0b26c7f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.streaming.kafka - import java.io.File import scala.collection.mutable @@ -28,7 +27,7 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.apache.commons.io.FileUtils -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf @@ -36,7 +35,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils -class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventually { +class ReliableKafkaStreamSuite extends FunSuite + with BeforeAndAfterAll with BeforeAndAfter with Eventually { private val sparkConf = new SparkConf() .setMaster("local[4]") @@ -51,10 +51,9 @@ class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventua private var ssc: StreamingContext = _ private var tempDirectory: File = null - before { + override def beforeAll() : Unit = { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( @@ -62,31 +61,33 @@ class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventua "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) - - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - tempDirectory = Utils.createTempDir() - ssc.checkpoint(tempDirectory.getAbsolutePath) - } - - after { - if (ssc != null) { - ssc.stop() - } -<<<<<<< HEAD Utils.deleteRecursively(tempDirectory) tearDownKafka() -======= + } + override def afterAll(): Unit = { if (tempDirectory != null && tempDirectory.exists()) { FileUtils.deleteDirectory(tempDirectory) tempDirectory = null } if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } ->>>>>>> Refactor the Kafka unit test and add Python Kafka unittest support + } + + before { + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + tempDirectory = Files.createTempDir() + ssc.checkpoint(tempDirectory.getAbsolutePath) + } + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } } @@ -146,9 +147,8 @@ class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventua /** Getting partition offset from Zookeeper. */ private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { - assert(kafkaTestUtils.zkClient != null, "Zookeeper client is not initialized") val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - ZkUtils.readDataMaybeNull(kafkaTestUtils.zkClient, zkPath)._1.map(_.toLong) + ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong) } } diff --git a/python/run-tests b/python/run-tests index a593d7b94e8bd..9c099bed6092f 100755 --- a/python/run-tests +++ b/python/run-tests @@ -37,9 +37,7 @@ rm -rf metastore warehouse function run_test() { echo "Running test: $1" | tee -a $LOG_FILE - echo "Additional argument: $2" | tee -a $LOG_FILE - - SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 $2 > $LOG_FILE 2>&1 + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) @@ -104,7 +102,7 @@ function run_streaming_tests() { KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}" - for f in "${JAR_PATH}"/spark-streaming-kafka-assembly*.jar; do + for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 echo "You need to build Spark before running this program" 1>&2 @@ -113,8 +111,9 @@ function run_streaming_tests() { KAFKA_ASSEMBLY_JAR="$f" done + export EXTRA_PYSPARK_SUBMIT_ARGS="-v --jars ${KAFKA_ASSEMBLY_JAR}" run_test "pyspark/streaming/util.py" - run_test "pyspark/streaming/tests.py" "--jars ${KAFKA_ASSEMBLY_JAR}" + run_test "pyspark/streaming/tests.py" } echo "Running PySpark tests. Output is in python/$LOG_FILE."