From b7d42ff3aef06b54b168bd8868b6012de3116aa2 Mon Sep 17 00:00:00 2001 From: prabs Date: Wed, 4 Feb 2015 01:17:38 +0530 Subject: [PATCH 01/21] Mqtt streaming support in Python --- external/mqtt-assembly/pom.xml | 105 +++++++++++++++++++++++++++++++ pom.xml | 1 + project/SparkBuild.scala | 6 +- python/pyspark/streaming/mqtt.py | 57 +++++++++++++++++ 4 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 external/mqtt-assembly/pom.xml create mode 100644 python/pyspark/streaming/mqtt.py diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml new file mode 100644 index 0000000000000..01d0c2faed93a --- /dev/null +++ b/external/mqtt-assembly/pom.xml @@ -0,0 +1,105 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.3.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-mqtt-assembly_2.10 + jar + Spark Project External Kafka Assembly + http://spark.apache.org/ + + + streaming-mqtt-assembly + scala-${scala.binary.version} + spark-streaming-mqtt-assembly-${project.version}.jar + ${project.build.directory}/${spark.jar.dir}/${spark.jar.basename} + + + + + org.apache.spark + spark-streaming-mqtt_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + + false + ${spark.jar} + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + reference.conf + + + log4j.properties + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index ffa96128a3d61..df06632997029 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ external/flume-sink external/flume-assembly external/mqtt + external/mqtt-assembly external/zeromq examples repl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3408c6d51ed4c..7555aa5e3861e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -45,8 +45,8 @@ object BuildCommons { sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) = - Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly") + val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly) = + Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-mqtt-assembly") .map(ProjectRef(buildLocation, _)) val tools = ProjectRef(buildLocation, "tools") @@ -347,7 +347,7 @@ object Assembly { .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String]) }, jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) => - if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) { + if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-mqtt-assembly")) { // This must match the same name used in maven (see external/kafka-assembly/pom.xml) s"${mName}-${v}.jar" } else { diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py new file mode 100644 index 0000000000000..423869bffdf74 --- /dev/null +++ b/python/pyspark/streaming/mqtt.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from py4j.java_collections import MapConverter +from py4j.java_gateway import java_import, Py4JError + +from pyspark.storagelevel import StorageLevel +from pyspark.serializers import PairDeserializer, NoOpSerializer +from pyspark.streaming import DStream + +__all__ = ['MQTTUtils'] + + +class MQTTUtils(object): + + @staticmethod + def createStream(ssc, brokerUrl, topic + storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): + """ + Create an input stream that pulls messages from a Mqtt Broker. + :param ssc: StreamingContext object + :param brokerUrl: Url of remote mqtt publisher + :param topic: topic name to subscribe to + :param storageLevel: RDD storage level. + :return: A DStream object + """ + java_import(ssc._jvm, "org.apache.spark.streaming.mqtt.MQTTUtils") + + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + + try: + jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel) + + except Py4JError, e: + # TODO: use --jar once it also work on driver + if not e.message or 'call a package' in e.message: + print "No Mqtt package, please put the assembly jar into classpath:" + print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \ + "scala-*/spark-streaming-mqtt-assembly-*.jar" + raise e + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) + stream = DStream(jstream, ssc, ser) + return stream From 3aa7fffdd6ce15cc0b1c2625d7fa92f04543e8fb Mon Sep 17 00:00:00 2001 From: prabs Date: Wed, 4 Feb 2015 01:18:17 +0530 Subject: [PATCH 02/21] Added Python streaming mqtt word count example --- .../main/python/streaming/mqtt_wordcount.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 examples/src/main/python/streaming/mqtt_wordcount.py diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py new file mode 100644 index 0000000000000..524d177a74e34 --- /dev/null +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + A sample wordcount with MqttStream stream + Usage: mqtt_wordcount.py + To work with Mqtt, Mqtt Message broker/server required. + Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker + In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + Run Mqtt publisher as + `$ bin/run-example \ + org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + and then run the example as + `$ bin/spark-submit --driver-class-path external/mqtt-assembly/target/scala-*/\ + spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \ + tcp://localhost:1883 foo` +""" + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.streaming.mqtt import MQTTUtils + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: mqtt_wordcount.py " + exit(-1) + + sc = SparkContext(appName="PythonStreamingMQTTWordCount") + ssc = StreamingContext(sc, 1) + + broker_url = sys.argv[1] + topic = sys.argv[2] + + data = MQTTUtils.createStream(ssc, topic, broker_url) + lines = data.map(lambda x: x[1]) + counts = lines.flatMap(lambda line: line.split(" ")) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a+b) + counts.pprint() + + ssc.start() + ssc.awaitTermination() From b34c3c117fbec0d5cbdca32037c72ee4c8fb7110 Mon Sep 17 00:00:00 2001 From: prabs Date: Fri, 6 Feb 2015 01:42:28 +0530 Subject: [PATCH 03/21] adress comments --- examples/src/main/python/streaming/mqtt_wordcount.py | 5 ++--- python/pyspark/streaming/mqtt.py | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py index 524d177a74e34..3c5a73d25c2e3 100644 --- a/examples/src/main/python/streaming/mqtt_wordcount.py +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -44,11 +44,10 @@ sc = SparkContext(appName="PythonStreamingMQTTWordCount") ssc = StreamingContext(sc, 1) - broker_url = sys.argv[1] + brokerUrl = sys.argv[1] topic = sys.argv[2] - data = MQTTUtils.createStream(ssc, topic, broker_url) - lines = data.map(lambda x: x[1]) + lines = MQTTUtils.createStream(ssc, brokerUrl, topic) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 423869bffdf74..d87e9fcbfd67c 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -19,7 +19,7 @@ from py4j.java_gateway import java_import, Py4JError from pyspark.storagelevel import StorageLevel -from pyspark.serializers import PairDeserializer, NoOpSerializer +from pyspark.serializers import UTF8Deserializer from pyspark.streaming import DStream __all__ = ['MQTTUtils'] @@ -28,7 +28,7 @@ class MQTTUtils(object): @staticmethod - def createStream(ssc, brokerUrl, topic + def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): """ Create an input stream that pulls messages from a Mqtt Broker. @@ -52,6 +52,4 @@ def createStream(ssc, brokerUrl, topic print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \ "scala-*/spark-streaming-mqtt-assembly-*.jar" raise e - ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) - stream = DStream(jstream, ssc, ser) - return stream + return DStream(jstream, ssc, UTF8Deserializer()) From 3f4df12d28ca8db7d010e5897591b1a553f2de43 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Sun, 29 Mar 2015 22:24:05 +0530 Subject: [PATCH 04/21] updated version --- external/mqtt-assembly/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 01d0c2faed93a..5ba105e6f36c7 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -20,8 +20,8 @@ 4.0.0 org.apache.spark - spark-parent - 1.3.0-SNAPSHOT + spark-parent_2.10 + 1.4.0-SNAPSHOT ../../pom.xml From ee387ae4ab190d24ba8ad4d36eeeb7cf957b89ab Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Wed, 22 Apr 2015 14:39:59 +0400 Subject: [PATCH 05/21] Fix assembly jar location of mqtt-assembly --- external/mqtt-assembly/pom.xml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 5ba105e6f36c7..33f7f08c9cbdc 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -33,9 +33,6 @@ streaming-mqtt-assembly - scala-${scala.binary.version} - spark-streaming-mqtt-assembly-${project.version}.jar - ${project.build.directory}/${spark.jar.dir}/${spark.jar.basename} @@ -61,7 +58,6 @@ maven-shade-plugin false - ${spark.jar} *:* From 795ec275b50894c25f10c11e5e37f4b96c35062d Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Tue, 23 Jun 2015 23:20:32 +0400 Subject: [PATCH 06/21] address comments --- .../main/python/streaming/mqtt_wordcount.py | 2 +- external/mqtt-assembly/pom.xml | 4 +-- python/pyspark/streaming/mqtt.py | 28 +++++++++++++++---- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py index 3c5a73d25c2e3..0fb585e58fbe4 100644 --- a/examples/src/main/python/streaming/mqtt_wordcount.py +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -25,7 +25,7 @@ `$ bin/run-example \ org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` and then run the example as - `$ bin/spark-submit --driver-class-path external/mqtt-assembly/target/scala-*/\ + `$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\ spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \ tcp://localhost:1883 foo` """ diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 33f7f08c9cbdc..7c5ba7051ac15 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -21,14 +21,14 @@ org.apache.spark spark-parent_2.10 - 1.4.0-SNAPSHOT + 1.5.0-SNAPSHOT ../../pom.xml org.apache.spark spark-streaming-mqtt-assembly_2.10 jar - Spark Project External Kafka Assembly + Spark Project External MQTT Assembly http://spark.apache.org/ diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index d87e9fcbfd67c..558965a70b5a5 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -44,12 +44,28 @@ def createStream(ssc, brokerUrl, topic, try: jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel) - except Py4JError, e: - # TODO: use --jar once it also work on driver - if not e.message or 'call a package' in e.message: - print "No Mqtt package, please put the assembly jar into classpath:" - print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \ - "scala-*/spark-streaming-mqtt-assembly-*.jar" + if 'ClassNotFoundException' in str(e.java_exception): + MQTTUtils._printErrorMsg(ssc.sparkContext) raise e return DStream(jstream, ssc, UTF8Deserializer()) + + @staticmethod + def _printErrorMsg(sc): + print(""" +________________________________________________________________________________________________ + + Spark Streaming's MQTT libraries not found in class path. Try one of the following. + + 1. Include the MQTT library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s. + Then, include the jar in the spark-submit command as + + $ bin/spark-submit --jars ... +________________________________________________________________________________________________ +""" % (sc.version, sc.version)) From a11968b2277a18bc384d44ff17631463aa56eb13 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Mon, 29 Jun 2015 20:31:45 +0400 Subject: [PATCH 07/21] fixed python style --- python/pyspark/streaming/mqtt.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 558965a70b5a5..9dee0f74589a4 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -16,7 +16,7 @@ # from py4j.java_collections import MapConverter -from py4j.java_gateway import java_import, Py4JError +from py4j.java_gateway import java_import, Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import UTF8Deserializer @@ -44,15 +44,16 @@ def createStream(ssc, brokerUrl, topic, try: jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel) - except Py4JError, e: - if 'ClassNotFoundException' in str(e.java_exception): + except Py4JJavaError as e: + if 'ClassNotFoundException' in str(e.java_exception): MQTTUtils._printErrorMsg(ssc.sparkContext) raise e + return DStream(jstream, ssc, UTF8Deserializer()) - - @staticmethod - def _printErrorMsg(sc): - print(""" + + @staticmethod + def _printErrorMsg(sc): + print(""" ________________________________________________________________________________________________ Spark Streaming's MQTT libraries not found in class path. Try one of the following. From 9767d82b46037aeb9d8af8e825a1e6a237ca5673 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Mon, 6 Jul 2015 19:40:03 +0400 Subject: [PATCH 08/21] implemented Python-friendly class --- .../spark/streaming/mqtt/MQTTUtils.scala | 22 +++++++++++++++++++ python/pyspark/streaming/mqtt.py | 10 ++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 1142d0f56ba34..de8f5650fbe55 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt import scala.reflect.ClassTag +import org.apache.spark.api.java.function.Function import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} @@ -74,3 +75,24 @@ object MQTTUtils { createStream(jssc.ssc, brokerUrl, topic, storageLevel) } } + +/** + * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and + * function so that it can be easily instantiated and called from Python's MQTTUtils. + */ +private class MQTTUtilsPythonHelper { + + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ): JavaDStream[Array[Byte]] = { + val dstream = MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel) + dstream.map(new Function[String, Array[Byte]] { + override def call(data: String): Array[Byte] = { + data.getBytes("UTF-8") + } + }) + } +} diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 9dee0f74589a4..f06598971c548 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -15,8 +15,7 @@ # limitations under the License. # -from py4j.java_collections import MapConverter -from py4j.java_gateway import java_import, Py4JJavaError +from py4j.java_gateway import Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import UTF8Deserializer @@ -38,12 +37,13 @@ def createStream(ssc, brokerUrl, topic, :param storageLevel: RDD storage level. :return: A DStream object """ - java_import(ssc._jvm, "org.apache.spark.streaming.mqtt.MQTTUtils") - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) try: - jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel) + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper") + helper = helperClass.newInstance() + jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) except Py4JJavaError as e: if 'ClassNotFoundException' in str(e.java_exception): MQTTUtils._printErrorMsg(ssc.sparkContext) From a5a8f9f6e900a9efe683be50d25979ed4867e311 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 10 Jul 2015 17:29:24 +0400 Subject: [PATCH 09/21] added Python test --- dev/run-tests.py | 3 +- dev/sparktestsupport/modules.py | 3 +- docs/streaming-programming-guide.md | 2 +- external/mqtt/pom.xml | 1 - .../spark/streaming/mqtt/MQTTTestUtils.scala | 124 +++++++++++++++++ .../streaming/mqtt/MQTTStreamSuite.scala | 130 +++--------------- python/pyspark/streaming/tests.py | 67 ++++++++- 7 files changed, 216 insertions(+), 114 deletions(-) create mode 100644 external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala diff --git a/dev/run-tests.py b/dev/run-tests.py index 1f0d218514f92..90535fd3b7b03 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -294,7 +294,8 @@ def build_spark_sbt(hadoop_version): sbt_goals = ["package", "assembly/assembly", "streaming-kafka-assembly/assembly", - "streaming-flume-assembly/assembly"] + "streaming-flume-assembly/assembly", + "streaming-mqtt-assembly/assembly"] profiles_and_goals = build_profiles + sbt_goals print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ", diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 993583e2f4119..45a03c7ea7447 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -170,6 +170,7 @@ def contains_file(self, filename): dependencies=[streaming], source_file_regexes=[ "external/mqtt", + "external/mqtt-assembly", ], sbt_test_goals=[ "streaming-mqtt/test", @@ -290,7 +291,7 @@ def contains_file(self, filename): pyspark_streaming = Module( name="pyspark-streaming", - dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly], + dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly, streaming_mqtt], source_file_regexes=[ "python/pyspark/streaming" ], diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index e72d5580dae55..33d835ba1c381 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea {:.no_toc} Python API As of Spark {{site.SPARK_VERSION_SHORT}}, -out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future. +out of these sources, *only* Kafka, Flume and MQTT are available in the Python API. We will add more advanced sources in the Python API in future. This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 0e41e5781784b..a28dd3603503a 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -72,7 +72,6 @@ org.apache.activemq activemq-core 5.7.0 - test diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala new file mode 100644 index 0000000000000..555c26aad1811 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import java.net.{ServerSocket, URI} +import java.util.concurrent.{TimeUnit, CountDownLatch} + +import scala.language.postfixOps + +import org.apache.activemq.broker.{BrokerService, TransportConnector} +import org.apache.commons.lang3.RandomUtils +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence + +import org.apache.spark.streaming.{StreamingContext, Milliseconds} +import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted +import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkConf} + +/** + * Share codes for Scala and Python unit tests + */ +private class MQTTTestUtils extends Logging { + + private val persistenceDir = Utils.createTempDir() + private val brokerHost = "localhost" + private var brokerPort = findFreePort() + + private var broker: BrokerService = _ + private var connector: TransportConnector = _ + + def brokerUri: String = { + s"$brokerHost:$brokerPort" + } + + def setup(): Unit = { + broker = new BrokerService() + broker.setDataDirectoryFile(Utils.createTempDir()) + connector = new TransportConnector() + connector.setName("mqtt") + connector.setUri(new URI("mqtt://" + brokerUri)) + broker.addConnector(connector) + broker.start() + } + + def teardown(): Unit = { + if (broker != null) { + broker.stop() + broker = null + } + if (connector != null) { + connector.stop() + connector = null + } + } + + private def findFreePort(): Int = { + val candidatePort = RandomUtils.nextInt(1024, 65536) + Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + }, new SparkConf())._2 + } + + def publishData(topic: String, data: String): Unit = { + var client: MqttClient = null + try { + val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient("tcp://" + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { + val msgTopic = client.getTopic(topic) + val message = new MqttMessage(data.getBytes("utf-8")) + message.setQos(1) + message.setRetained(true) + + for (i <- 0 to 10) { + try { + msgTopic.publish(message) + } catch { + case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => + // wait for Spark streaming to consume something from the message queue + Thread.sleep(50) + } + } + } + } finally { + client.disconnect() + client.close() + client = null + } + } + + /** + * Block until at least one receiver has started or timeout occurs. + */ + def waitForReceiverToStart(ssc: StreamingContext) = { + val latch = new CountDownLatch(1) + ssc.addStreamingListener(new StreamingListener { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { + latch.countDown() + } + }) + + assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") + } +} diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index c4bf5aa7869bb..4d352cba96a3a 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,61 +17,45 @@ package org.apache.spark.streaming.mqtt -import java.net.{URI, ServerSocket} -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - import scala.concurrent.duration._ import scala.language.postfixOps -import org.apache.activemq.broker.{TransportConnector, BrokerService} -import org.apache.commons.lang3.RandomUtils -import org.eclipse.paho.client.mqttv3._ -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence - -import org.scalatest.BeforeAndAfter +import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.scheduler.StreamingListener -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.util.Utils - -class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, StreamingContext} - private val batchDuration = Milliseconds(500) - private val master = "local[2]" - private val framework = this.getClass.getSimpleName - private val freePort = findFreePort() - private val brokerUri = "//localhost:" + freePort - private val topic = "def" - private val persistenceDir = Utils.createTempDir() +class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { + private val topic = "topic" private var ssc: StreamingContext = _ - private var broker: BrokerService = _ - private var connector: TransportConnector = _ + private var MQTTTestUtils: MQTTTestUtils = _ - before { - ssc = new StreamingContext(master, framework, batchDuration) - setupMQTT() + override def beforeAll(): Unit = { + MQTTTestUtils = new MQTTTestUtils + MQTTTestUtils.setup() } - after { + override def afterAll(): Unit = { if (ssc != null) { ssc.stop() ssc = null } - Utils.deleteRecursively(persistenceDir) - tearDownMQTT() + + if (MQTTTestUtils != null) { + MQTTTestUtils.teardown() + MQTTTestUtils = null + } } test("mqtt input stream") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + ssc = new StreamingContext(sparkConf, Milliseconds(500)) val sendMessage = "MQTT demo for spark streaming" val receiveStream = - MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) + MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY) @volatile var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => if (rdd.collect.length > 0) { @@ -83,85 +67,13 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter // wait for the receiver to start before publishing data, or we risk failing // the test nondeterministically. See SPARK-4631 - waitForReceiverToStart() + MQTTTestUtils.waitForReceiverToStart(ssc) + + MQTTTestUtils.publishData(topic, sendMessage) - publishData(sendMessage) eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { assert(sendMessage.equals(receiveMessage(0))) } ssc.stop() } - - private def setupMQTT() { - broker = new BrokerService() - broker.setDataDirectoryFile(Utils.createTempDir()) - connector = new TransportConnector() - connector.setName("mqtt") - connector.setUri(new URI("mqtt:" + brokerUri)) - broker.addConnector(connector) - broker.start() - } - - private def tearDownMQTT() { - if (broker != null) { - broker.stop() - broker = null - } - if (connector != null) { - connector.stop() - connector = null - } - } - - private def findFreePort(): Int = { - val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { - val socket = new ServerSocket(trialPort) - socket.close() - (null, trialPort) - }, new SparkConf())._2 - } - - def publishData(data: String): Unit = { - var client: MqttClient = null - try { - val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) - client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) - client.connect() - if (client.isConnected) { - val msgTopic = client.getTopic(topic) - val message = new MqttMessage(data.getBytes("utf-8")) - message.setQos(1) - message.setRetained(true) - - for (i <- 0 to 10) { - try { - msgTopic.publish(message) - } catch { - case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => - // wait for Spark streaming to consume something from the message queue - Thread.sleep(50) - } - } - } - } finally { - client.disconnect() - client.close() - client = null - } - } - - /** - * Block until at least one receiver has started or timeout occurs. - */ - private def waitForReceiverToStart() = { - val latch = new CountDownLatch(1) - ssc.addStreamingListener(new StreamingListener { - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - latch.countDown() - } - }) - - assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") - } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 188c8ff12067e..3012cd1e1a1b7 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -39,6 +39,7 @@ from pyspark.streaming.context import StreamingContext from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition from pyspark.streaming.flume import FlumeUtils +from pyspark.streaming.mqtt import MQTTUtils class PySparkStreamingTestCase(unittest.TestCase): @@ -826,6 +827,52 @@ def test_flume_polling(self): def test_flume_polling_multiple_hosts(self): self._testMultipleTimes(self._testFlumePollingMultipleHosts) +class MQTTStreamTests(PySparkStreamingTestCase): + timeout = 20 # seconds + duration = 1 + + def setUp(self): + super(MQTTStreamTests, self).setUp() + + utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") + self._utils = utilsClz.newInstance() + self._MQTTTestUtils.setup() + + def tearDown(self): + if self._MQTTTestUtils is not None: + self._MQTTTestUtils.teardown() + self._MQTTTestUtils = None + + super(MQTTStreamTests, self).tearDown() + + def _randomTopic(self): + return "topic-%d" % random.randint(0, 10000) + + def _validateStreamResult(self, sendData, stream): + result = [] + + def get_output(_, rdd): + for data in rdd.collect(): + result.append(data) + + dstream.foreachRDD(get_output) + receiveData = ' '.join(result[0]) + self.assertEqual(sendData, receiveData) + + def test_mqtt_stream(self): + """Test the Python Kafka stream API.""" + topic = self._randomTopic() + sendData = "MQTT demo for spark streaming" + ssc = self.ssc + + self._MQTTTestUtils.createTopic(topic) + self._MQTTTestUtils.waitForReceiverToStart(ssc) + self._MQTTTestUtils.publishData(topic, sendData) + + stream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic) + self._validateStreamResult(sendData, stream) + def search_kafka_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] @@ -862,10 +909,28 @@ def search_flume_assembly_jar(): else: return jars[0] +def search_mqtt_assembly_jar(): + SPARK_HOME = os.environ["SPARK_HOME"] + mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly") + jars = glob.glob( + os.path.join(mqtt_assembly_dir, "target/scala-*/spark-streaming-mqtt-assembly-*.jar")) + if not jars: + raise Exception( + ("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) + + "You need to build Spark with " + "'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or " + "'build/mvn package' before running this test") + elif len(jars) > 1: + raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in %s; please " + "remove all but one") % mqtt_assembly_dir) + else: + return jars[0] + if __name__ == "__main__": kafka_assembly_jar = search_kafka_assembly_jar() flume_assembly_jar = search_flume_assembly_jar() - jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar) + mqtt_assembly_jar = search_mqtt_assembly_jar() + jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar) os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars unittest.main() From e1ee0163a7a09c5e64baea2c549126ca2b451f56 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Sat, 11 Jul 2015 10:04:26 +0400 Subject: [PATCH 10/21] scala style fix --- .../org/apache/spark/streaming/mqtt/MQTTTestUtils.scala | 2 +- .../org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 555c26aad1811..e5036fbc6d626 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -111,7 +111,7 @@ private class MQTTTestUtils extends Logging { /** * Block until at least one receiver has started or timeout occurs. */ - def waitForReceiverToStart(ssc: StreamingContext) = { + def waitForReceiverToStart(ssc: StreamingContext) : Unit = { val latch = new CountDownLatch(1) ssc.addStreamingListener(new StreamingListener { override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 4d352cba96a3a..d406749c9b4f1 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -54,8 +54,10 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) ssc = new StreamingContext(sparkConf, Milliseconds(500)) val sendMessage = "MQTT demo for spark streaming" - val receiveStream = - MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY) + + val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, + StorageLevel.MEMORY_ONLY) + @volatile var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => if (rdd.collect.length > 0) { From 1f0cfe9617128cf091fdc4afefd7839c3acbaa47 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Sat, 11 Jul 2015 10:26:40 +0400 Subject: [PATCH 11/21] python style fix --- dev/sparktestsupport/modules.py | 8 +++++++- python/pyspark/streaming/tests.py | 9 ++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 45a03c7ea7447..b283753f2dfd7 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -291,7 +291,13 @@ def contains_file(self, filename): pyspark_streaming = Module( name="pyspark-streaming", - dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly, streaming_mqtt], + dependencies=[ + pyspark_core, + streaming, + streaming_kafka, + streaming_flume_assembly, + streaming_mqtt + ], source_file_regexes=[ "python/pyspark/streaming" ], diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 3012cd1e1a1b7..a4324a748ae32 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -827,6 +827,7 @@ def test_flume_polling(self): def test_flume_polling_multiple_hosts(self): self._testMultipleTimes(self._testFlumePollingMultipleHosts) + class MQTTStreamTests(PySparkStreamingTestCase): timeout = 20 # seconds duration = 1 @@ -841,8 +842,8 @@ def setUp(self): def tearDown(self): if self._MQTTTestUtils is not None: - self._MQTTTestUtils.teardown() - self._MQTTTestUtils = None + self._MQTTTestUtils.teardown() + self._MQTTTestUtils = None super(MQTTStreamTests, self).tearDown() @@ -905,10 +906,11 @@ def search_flume_assembly_jar(): "'build/mvn package' before running this test") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please " - "remove all but one") % flume_assembly_dir) + "remove all but one") % flume_assembly_dir) else: return jars[0] + def search_mqtt_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly") @@ -926,6 +928,7 @@ def search_mqtt_assembly_jar(): else: return jars[0] + if __name__ == "__main__": kafka_assembly_jar = search_kafka_assembly_jar() flume_assembly_jar = search_flume_assembly_jar() From 80474d1893e9296827be44879d7205b6c738ded4 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Sat, 11 Jul 2015 12:51:50 +0400 Subject: [PATCH 12/21] fix --- python/pyspark/streaming/tests.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a4324a748ae32..970594c7fa10c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -835,9 +835,9 @@ class MQTTStreamTests(PySparkStreamingTestCase): def setUp(self): super(MQTTStreamTests, self).setUp() - utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") - self._utils = utilsClz.newInstance() + self._MQTTTestUtils = MQTTTestUtilsClz.newInstance() self._MQTTTestUtils.setup() def tearDown(self): @@ -850,7 +850,7 @@ def tearDown(self): def _randomTopic(self): return "topic-%d" % random.randint(0, 10000) - def _validateStreamResult(self, sendData, stream): + def _validateStreamResult(self, sendData, dstream): result = [] def get_output(_, rdd): @@ -862,16 +862,15 @@ def get_output(_, rdd): self.assertEqual(sendData, receiveData) def test_mqtt_stream(self): - """Test the Python Kafka stream API.""" + """Test the Python MQTT stream API.""" topic = self._randomTopic() sendData = "MQTT demo for spark streaming" ssc = self.ssc - self._MQTTTestUtils.createTopic(topic) self._MQTTTestUtils.waitForReceiverToStart(ssc) self._MQTTTestUtils.publishData(topic, sendData) - stream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic) + stream = MQTTUtils.createStream(ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic) self._validateStreamResult(sendData, stream) From 97244ec63937f6c23990f73b054af3d450344bb8 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 23 Jul 2015 02:09:25 +0800 Subject: [PATCH 13/21] Make sbt build the assembly test jar for streaming mqtt --- project/SparkBuild.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7555aa5e3861e..454678e5fbc28 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -177,6 +177,9 @@ object SparkBuild extends PomBuild { /* Enable Assembly for all assembly projects */ assemblyProjects.foreach(enable(Assembly.settings)) + /* Enable Assembly for streamingMqtt test */ + enable(inConfig(Test)(Assembly.settings))(streamingMqtt) + /* Package pyspark artifacts in a separate zip file for YARN. */ enable(PySparkAssembly.settings)(assembly) @@ -354,6 +357,9 @@ object Assembly { s"${mName}-${v}-hadoop${hv}.jar" } }, + jarName in (Test, assembly) <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) => + s"${mName}-test-${v}.jar" + }, mergeStrategy in assembly := { case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard From 87fc6771586aaf025c2810b5fa3a160a9773c4d2 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Thu, 23 Jul 2015 14:00:14 +0400 Subject: [PATCH 14/21] address the comments: keep the whole MQTTTestUtils in test and then link to test jar from python fix issue under Maven build return JavaDStream[String] directly. --- dev/run-tests.py | 3 +- external/mqtt-assembly/pom.xml | 1 + external/mqtt/pom.xml | 1 + .../spark/streaming/mqtt/MQTTUtils.scala | 9 +-- .../spark/streaming/mqtt/MQTTTestUtils.scala | 6 +- python/pyspark/streaming/tests.py | 59 +++++++++++++++---- 6 files changed, 55 insertions(+), 24 deletions(-) rename external/mqtt/src/{main => test}/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala (95%) diff --git a/dev/run-tests.py b/dev/run-tests.py index 90535fd3b7b03..237fb76c9b3d9 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -295,7 +295,8 @@ def build_spark_sbt(hadoop_version): "assembly/assembly", "streaming-kafka-assembly/assembly", "streaming-flume-assembly/assembly", - "streaming-mqtt-assembly/assembly"] + "streaming-mqtt-assembly/assembly", + "streaming-mqtt/test:assembly"] profiles_and_goals = build_profiles + sbt_goals print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ", diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 7c5ba7051ac15..e216a9676abcc 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -58,6 +58,7 @@ maven-shade-plugin false + ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar *:* diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index a28dd3603503a..0e41e5781784b 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -72,6 +72,7 @@ org.apache.activemq activemq-core 5.7.0 + test diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index de8f5650fbe55..22dabb36efa11 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -87,12 +87,7 @@ private class MQTTUtilsPythonHelper { brokerUrl: String, topic: String, storageLevel: StorageLevel - ): JavaDStream[Array[Byte]] = { - val dstream = MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel) - dstream.map(new Function[String, Array[Byte]] { - override def call(data: String): Array[Byte] = { - data.getBytes("UTF-8") - } - }) + ): JavaDStream[String] = { + MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala similarity index 95% rename from external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala rename to external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index e5036fbc6d626..34e81b3f0f84f 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.mqtt import java.net.{ServerSocket, URI} -import java.util.concurrent.{TimeUnit, CountDownLatch} +import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.language.postfixOps @@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomUtils import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -import org.apache.spark.streaming.{StreamingContext, Milliseconds} +import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.util.Utils @@ -40,7 +40,7 @@ private class MQTTTestUtils extends Logging { private val persistenceDir = Utils.createTempDir() private val brokerHost = "localhost" - private var brokerPort = findFreePort() + private val brokerPort = findFreePort() private var broker: BrokerService = _ private var connector: TransportConnector = _ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 970594c7fa10c..d3592f0a1f7c2 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -850,28 +850,43 @@ def tearDown(self): def _randomTopic(self): return "topic-%d" % random.randint(0, 10000) - def _validateStreamResult(self, sendData, dstream): + def _startContext(self, topic): + # Start the StreamingContext and also collect the result + stream = MQTTUtils.createStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic) result = [] - def get_output(_, rdd): + def getOutput(_, rdd): for data in rdd.collect(): result.append(data) - dstream.foreachRDD(get_output) - receiveData = ' '.join(result[0]) + stream.foreachRDD(getOutput) + self.ssc.start() + return result + + def _publishData(self, topic, data): + start_time = time.time() + while True: + try: + self._MQTTTestUtils.publishData(topic, data) + break + except: + if time.time() - start_time < self.timeout: + time.sleep(0.01) + else: + raise + + def _validateStreamResult(self, sendData, result): + receiveData = ''.join(result[0]) self.assertEqual(sendData, receiveData) def test_mqtt_stream(self): """Test the Python MQTT stream API.""" - topic = self._randomTopic() sendData = "MQTT demo for spark streaming" - ssc = self.ssc - - self._MQTTTestUtils.waitForReceiverToStart(ssc) - self._MQTTTestUtils.publishData(topic, sendData) - - stream = MQTTUtils.createStream(ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic) - self._validateStreamResult(sendData, stream) + topic = self._randomTopic() + result = self._startContext(topic) + self._publishData(topic, sendData) + self.wait_for(result, len(sendData)) + self._validateStreamResult(sendData, result) def search_kafka_assembly_jar(): @@ -928,11 +943,29 @@ def search_mqtt_assembly_jar(): return jars[0] +def search_mqtt_test_jar(): + SPARK_HOME = os.environ["SPARK_HOME"] + mqtt_test_dir = os.path.join(SPARK_HOME, "external/mqtt") + jars = glob.glob( + os.path.join(mqtt_test_dir, "target/scala-*/spark-streaming-mqtt-test-*.jar")) + if not jars: + raise Exception( + ("Failed to find Spark Streaming MQTT test jar in %s. " % mqtt_test_dir) + + "You need to build Spark with " + "'build/sbt assembly/assembly streaming-mqtt/test:assembly'") + elif len(jars) > 1: + raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please " + "remove all but one") % mqtt_test_dir) + else: + return jars[0] + if __name__ == "__main__": kafka_assembly_jar = search_kafka_assembly_jar() flume_assembly_jar = search_flume_assembly_jar() mqtt_assembly_jar = search_mqtt_assembly_jar() - jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar) + mqtt_test_jar = search_mqtt_test_jar() + jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, + mqtt_assembly_jar, mqtt_test_jar) os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars unittest.main() From a6747cb58f504bd681162bba67e0679b33b42915 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 24 Jul 2015 16:20:46 +0400 Subject: [PATCH 15/21] wait for starting the receiver before publishing data --- .../org/apache/spark/streaming/mqtt/MQTTUtils.scala | 1 - .../apache/spark/streaming/mqtt/MQTTTestUtils.scala | 12 ++++++++++++ python/pyspark/streaming/tests.py | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 22dabb36efa11..38a1114863d15 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt import scala.reflect.ClassTag -import org.apache.spark.api.java.function.Function import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 34e81b3f0f84f..e48760a0a08f4 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -28,6 +28,7 @@ import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.util.Utils @@ -121,4 +122,15 @@ private class MQTTTestUtils extends Logging { assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") } + + def waitForReceiverToStart(jssc: JavaStreamingContext) : Unit = { + val latch = new CountDownLatch(1) + jssc.addStreamingListener(new StreamingListener { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { + latch.countDown() + } + }) + + assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") + } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index d3592f0a1f7c2..b27577f878618 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -884,6 +884,7 @@ def test_mqtt_stream(self): sendData = "MQTT demo for spark streaming" topic = self._randomTopic() result = self._startContext(topic) + self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc) self._publishData(topic, sendData) self.wait_for(result, len(sendData)) self._validateStreamResult(sendData, result) From d07f45490a9554c1e3ea9c76956f383a511a2e1f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 26 Jul 2015 16:07:04 +0800 Subject: [PATCH 16/21] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test --- .../streaming/mqtt/MQTTStreamSuite.scala | 22 +++++---- .../spark/streaming/mqtt/MQTTTestUtils.scala | 45 ++++++++++++------- python/pyspark/streaming/tests.py | 25 +++-------- 3 files changed, 49 insertions(+), 43 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index d406749c9b4f1..98821c71c7aee 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -20,30 +20,34 @@ package org.apache.spark.streaming.mqtt import scala.concurrent.duration._ import scala.language.postfixOps -import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} -class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { +class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { + + private val batchDuration = Milliseconds(500) + private val master = "local[2]" + private val framework = this.getClass.getSimpleName + private val topic = "def" - private val topic = "topic" private var ssc: StreamingContext = _ private var MQTTTestUtils: MQTTTestUtils = _ - override def beforeAll(): Unit = { + before { + ssc = new StreamingContext(master, framework, batchDuration) MQTTTestUtils = new MQTTTestUtils MQTTTestUtils.setup() } - override def afterAll(): Unit = { + after { if (ssc != null) { ssc.stop() ssc = null } - if (MQTTTestUtils != null) { MQTTTestUtils.teardown() MQTTTestUtils = null @@ -51,10 +55,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA } test("mqtt input stream") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - ssc = new StreamingContext(sparkConf, Milliseconds(500)) val sendMessage = "MQTT demo for spark streaming" - val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY) @@ -65,6 +66,9 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA receiveMessage } } + + MQTTTestUtils.registerStreamingListener(ssc) + ssc.start() // wait for the receiver to start before publishing data, or we risk failing diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index e48760a0a08f4..6c85019ae0723 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.language.postfixOps +import com.google.common.base.Charsets.UTF_8 import org.apache.activemq.broker.{BrokerService, TransportConnector} import org.apache.commons.lang3.RandomUtils import org.eclipse.paho.client.mqttv3._ @@ -46,6 +47,8 @@ private class MQTTTestUtils extends Logging { private var broker: BrokerService = _ private var connector: TransportConnector = _ + private var receiverStartedLatch = new CountDownLatch(1) + def brokerUri: String = { s"$brokerHost:$brokerPort" } @@ -69,6 +72,8 @@ private class MQTTTestUtils extends Logging { connector.stop() connector = null } + Utils.deleteRecursively(persistenceDir) + receiverStartedLatch = null } private def findFreePort(): Int = { @@ -88,7 +93,7 @@ private class MQTTTestUtils extends Logging { client.connect() if (client.isConnected) { val msgTopic = client.getTopic(topic) - val message = new MqttMessage(data.getBytes("utf-8")) + val message = new MqttMessage(data.getBytes(UTF_8)) message.setQos(1) message.setRetained(true) @@ -110,27 +115,37 @@ private class MQTTTestUtils extends Logging { } /** - * Block until at least one receiver has started or timeout occurs. + * Call this one before starting StreamingContext so that we won't miss the + * StreamingListenerReceiverStarted event. */ - def waitForReceiverToStart(ssc: StreamingContext) : Unit = { - val latch = new CountDownLatch(1) + def registerStreamingListener(jssc: JavaStreamingContext): Unit = { + registerStreamingListener(jssc.ssc) + } + + /** + * Call this one before starting StreamingContext so that we won't miss the + * StreamingListenerReceiverStarted event. + */ + def registerStreamingListener(ssc: StreamingContext): Unit = { ssc.addStreamingListener(new StreamingListener { override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - latch.countDown() + receiverStartedLatch.countDown() } }) - - assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") } - def waitForReceiverToStart(jssc: JavaStreamingContext) : Unit = { - val latch = new CountDownLatch(1) - jssc.addStreamingListener(new StreamingListener { - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - latch.countDown() - } - }) + /** + * Block until at least one receiver has started or timeout occurs. + */ + def waitForReceiverToStart(jssc: JavaStreamingContext): Unit = { + waitForReceiverToStart(jssc.ssc) + } - assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") + /** + * Block until at least one receiver has started or timeout occurs. + */ + def waitForReceiverToStart(ssc: StreamingContext): Unit = { + assert( + receiverStartedLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index b27577f878618..77f9ccf0b114a 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -863,31 +863,18 @@ def getOutput(_, rdd): self.ssc.start() return result - def _publishData(self, topic, data): - start_time = time.time() - while True: - try: - self._MQTTTestUtils.publishData(topic, data) - break - except: - if time.time() - start_time < self.timeout: - time.sleep(0.01) - else: - raise - - def _validateStreamResult(self, sendData, result): - receiveData = ''.join(result[0]) - self.assertEqual(sendData, receiveData) - def test_mqtt_stream(self): """Test the Python MQTT stream API.""" sendData = "MQTT demo for spark streaming" topic = self._randomTopic() + self._MQTTTestUtils.registerStreamingListener(self.ssc._jssc) result = self._startContext(topic) self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc) - self._publishData(topic, sendData) - self.wait_for(result, len(sendData)) - self._validateStreamResult(sendData, result) + self._MQTTTestUtils.publishData(topic, sendData) + self.wait_for(result, 1) + # Because "publishData" sends duplicate messages, here we should use > 0 + self.assertTrue(len(result) > 0) + self.assertEqual(sendData, result[0]) def search_kafka_assembly_jar(): From 126608a02b55287684762811b0ade99dbce7d109 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Thu, 30 Jul 2015 14:02:24 +0400 Subject: [PATCH 17/21] address the comments --- .../src/main/python/streaming/mqtt_wordcount.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py index 0fb585e58fbe4..617ce5ea6775e 100644 --- a/examples/src/main/python/streaming/mqtt_wordcount.py +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -18,13 +18,14 @@ """ A sample wordcount with MqttStream stream Usage: mqtt_wordcount.py - To work with Mqtt, Mqtt Message broker/server required. - Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker - In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` - Run Mqtt publisher as - `$ bin/run-example \ - org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` - and then run the example as + + To run this in your local machine, you need to setup a MQTT broker and publisher first, + Mosquitto is one of the open source MQTT Brokers, see + http://mosquitto.org/ + Eclipse paho project provides number of clients and utilities for working with MQTT, see + http://www.eclipse.org/paho/#getting-started + + and then run the example `$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\ spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \ tcp://localhost:1883 foo` From 5f8a1d435526a601d0c435359df2b42ccc4dbbb3 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 31 Jul 2015 16:19:14 +0800 Subject: [PATCH 18/21] Make the maven build generate the test jar for Python MQTT tests --- external/mqtt/pom.xml | 28 +++++++++++++++ external/mqtt/src/main/assembly/assembly.xml | 38 ++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 external/mqtt/src/main/assembly/assembly.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 0e41e5781784b..69b309876a0db 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -78,5 +78,33 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + test-jar-with-dependencies + package + + single + + + + spark-streaming-mqtt-test-${project.version} + ${project.build.directory}/scala-${scala.binary.version}/ + false + + false + + src/main/assembly/assembly.xml + + + + + + diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml new file mode 100644 index 0000000000000..92318ec16dba8 --- /dev/null +++ b/external/mqtt/src/main/assembly/assembly.xml @@ -0,0 +1,38 @@ + + + test-jar-with-dependencies + + jar + + false + + + + true + test + false + + org.apache.hadoop:*:jar + org.apache.spark:*:jar + org.apache.zookeeper:*:jar + org.apache.avro:*:jar + + + + + From 478f844f25b2388675836015af04b6be94de0d8e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 1 Aug 2015 00:44:51 +0800 Subject: [PATCH 19/21] Add unpack --- external/mqtt/src/main/assembly/assembly.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml index 92318ec16dba8..13747d65cbee5 100644 --- a/external/mqtt/src/main/assembly/assembly.xml +++ b/external/mqtt/src/main/assembly/assembly.xml @@ -26,6 +26,7 @@ true test false + true org.apache.hadoop:*:jar org.apache.spark:*:jar From 47278c5a1a5364cbd327569050c133eb49098134 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 1 Aug 2015 00:58:38 +0800 Subject: [PATCH 20/21] Include the project class files --- external/mqtt/src/main/assembly/assembly.xml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml index 13747d65cbee5..ecab5b360eb3e 100644 --- a/external/mqtt/src/main/assembly/assembly.xml +++ b/external/mqtt/src/main/assembly/assembly.xml @@ -21,15 +21,20 @@ false + + + ${project.build.directory}/scala-${scala.binary.version}/test-classes + / + + + true test - false true org.apache.hadoop:*:jar - org.apache.spark:*:jar org.apache.zookeeper:*:jar org.apache.avro:*:jar From 935615c63479da81dab544325e2bc2641891bffd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 1 Aug 2015 02:00:48 +0800 Subject: [PATCH 21/21] Fix the flaky MQTT tests --- .../streaming/mqtt/MQTTStreamSuite.scala | 10 +---- .../spark/streaming/mqtt/MQTTTestUtils.scala | 42 ------------------- python/pyspark/streaming/tests.py | 27 ++++++++---- 3 files changed, 22 insertions(+), 57 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 98821c71c7aee..879be775ea9f9 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -67,17 +67,11 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter } } - MQTTTestUtils.registerStreamingListener(ssc) - ssc.start() - // wait for the receiver to start before publishing data, or we risk failing - // the test nondeterministically. See SPARK-4631 - MQTTTestUtils.waitForReceiverToStart(ssc) - - MQTTTestUtils.publishData(topic, sendMessage) - + // Retry it because we don't know when the receiver will start. eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + MQTTTestUtils.publishData(topic, sendMessage) assert(sendMessage.equals(receiveMessage(0))) } ssc.stop() diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 6c85019ae0723..47cc9af497778 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.mqtt import java.net.{ServerSocket, URI} -import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.language.postfixOps @@ -28,10 +27,6 @@ import org.apache.commons.lang3.RandomUtils import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.JavaStreamingContext -import org.apache.spark.streaming.scheduler.StreamingListener -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf} @@ -47,8 +42,6 @@ private class MQTTTestUtils extends Logging { private var broker: BrokerService = _ private var connector: TransportConnector = _ - private var receiverStartedLatch = new CountDownLatch(1) - def brokerUri: String = { s"$brokerHost:$brokerPort" } @@ -73,7 +66,6 @@ private class MQTTTestUtils extends Logging { connector = null } Utils.deleteRecursively(persistenceDir) - receiverStartedLatch = null } private def findFreePort(): Int = { @@ -114,38 +106,4 @@ private class MQTTTestUtils extends Logging { } } - /** - * Call this one before starting StreamingContext so that we won't miss the - * StreamingListenerReceiverStarted event. - */ - def registerStreamingListener(jssc: JavaStreamingContext): Unit = { - registerStreamingListener(jssc.ssc) - } - - /** - * Call this one before starting StreamingContext so that we won't miss the - * StreamingListenerReceiverStarted event. - */ - def registerStreamingListener(ssc: StreamingContext): Unit = { - ssc.addStreamingListener(new StreamingListener { - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - receiverStartedLatch.countDown() - } - }) - } - - /** - * Block until at least one receiver has started or timeout occurs. - */ - def waitForReceiverToStart(jssc: JavaStreamingContext): Unit = { - waitForReceiverToStart(jssc.ssc) - } - - /** - * Block until at least one receiver has started or timeout occurs. - */ - def waitForReceiverToStart(ssc: StreamingContext): Unit = { - assert( - receiverStartedLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") - } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 0da312b89b72f..2e37aa96c614f 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -931,14 +931,27 @@ def test_mqtt_stream(self): """Test the Python MQTT stream API.""" sendData = "MQTT demo for spark streaming" topic = self._randomTopic() - self._MQTTTestUtils.registerStreamingListener(self.ssc._jssc) result = self._startContext(topic) - self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc) - self._MQTTTestUtils.publishData(topic, sendData) - self.wait_for(result, 1) - # Because "publishData" sends duplicate messages, here we should use > 0 - self.assertTrue(len(result) > 0) - self.assertEqual(sendData, result[0]) + + def retry(): + self._MQTTTestUtils.publishData(topic, sendData) + # Because "publishData" sends duplicate messages, here we should use > 0 + self.assertTrue(len(result) > 0) + self.assertEqual(sendData, result[0]) + + # Retry it because we don't know when the receiver will start. + self._retry_or_timeout(retry) + + def _retry_or_timeout(self, test_func): + start_time = time.time() + while True: + try: + test_func() + break + except: + if time.time() - start_time > self.timeout: + raise + time.sleep(0.01) def search_kafka_assembly_jar():