Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5155] [PySpark] [Streaming] Mqtt streaming support in Python #4229

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b7d42ff
Mqtt streaming support in Python
prabeesh Feb 3, 2015
3aa7fff
Added Python streaming mqtt word count example
prabeesh Feb 3, 2015
b34c3c1
adress comments
prabeesh Feb 5, 2015
3f4df12
updated version
prabeesh Mar 29, 2015
ee387ae
Fix assembly jar location of mqtt-assembly
prabeesh Apr 22, 2015
795ec27
address comments
prabeesh Jun 23, 2015
a11968b
fixed python style
prabeesh Jun 29, 2015
9767d82
implemented Python-friendly class
prabeesh Jul 6, 2015
a5a8f9f
added Python test
prabeesh Jul 10, 2015
e1ee016
scala style fix
prabeesh Jul 11, 2015
1f0cfe9
python style fix
prabeesh Jul 11, 2015
80474d1
fix
prabeesh Jul 11, 2015
97244ec
Make sbt build the assembly test jar for streaming mqtt
zsxwing Jul 22, 2015
87fc677
address the comments:
Jul 23, 2015
a6747cb
wait for starting the receiver before publishing data
prabeesh Jul 24, 2015
d07f454
Register StreamingListerner before starting StreamingContext; Revert …
zsxwing Jul 26, 2015
b90b709
Merge pull request #1 from zsxwing/pr4229
prabeesh Jul 26, 2015
126608a
address the comments
Jul 30, 2015
734db99
Merge branch 'master' into pr4229
zsxwing Jul 31, 2015
5f8a1d4
Make the maven build generate the test jar for Python MQTT tests
zsxwing Jul 31, 2015
478f844
Add unpack
zsxwing Jul 31, 2015
47278c5
Include the project class files
zsxwing Jul 31, 2015
935615c
Fix the flaky MQTT tests
zsxwing Jul 31, 2015
abf5f18
Merge branch 'master' into pr4229
zsxwing Aug 1, 2015
03f3e88
Merge pull request #2 from zsxwing/pr4229
prabeesh Aug 2, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ def build_spark_sbt(hadoop_version):
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly"]
"streaming-flume-assembly/assembly",
"streaming-mqtt-assembly/assembly",
"streaming-mqtt/test:assembly"]
profiles_and_goals = build_profiles + sbt_goals

print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",
Expand Down
9 changes: 8 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def contains_file(self, filename):
dependencies=[streaming],
source_file_regexes=[
"external/mqtt",
"external/mqtt-assembly",
],
sbt_test_goals=[
"streaming-mqtt/test",
Expand Down Expand Up @@ -290,7 +291,13 @@ def contains_file(self, filename):

pyspark_streaming = Module(
name="pyspark-streaming",
dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
dependencies=[
pyspark_core,
streaming,
streaming_kafka,
streaming_flume_assembly,
streaming_mqtt
],
source_file_regexes=[
"python/pyspark/streaming"
],
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka, Flume and MQTT are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
57 changes: 57 additions & 0 deletions examples/src/main/python/streaming/mqtt_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A sample wordcount with MqttStream stream
Usage: mqtt_wordcount.py <broker url> <topic>
To work with Mqtt, Mqtt Message broker/server required.
Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
Run Mqtt publisher as
`$ bin/run-example \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ran inside of mosquitto, could you mention that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify like this ?
Publisher publishes data to mosquitto broker set in local host. Run Mqtt publisher as

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bin/run-example is also inside spark, I misunderstood that, sorry.

never-mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya it is actually running the Scala publisher inside the Scala streaming word count example

org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing / @tdas, the publisher here is the Scala publisher in the MQTT Scala word count example. Should we keep this here or we should update it to general instruction to run MQTT publisher.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine. We can also add something to highlight this MQTTPublisher is only for demo / example.

and then run the example as
`$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \
tcp://localhost:1883 foo`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)

brokerUrl = sys.argv[1]
topic = sys.argv[2]

lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
102 changes: 102 additions & 0 deletions external/mqtt-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External MQTT Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,19 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
}

/**
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's MQTTUtils.
*/
private class MQTTUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaDStream[String] = {
MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,151 +17,69 @@

package org.apache.spark.streaming.mqtt

import java.net.{URI, ServerSocket}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.activemq.broker.{TransportConnector, BrokerService}
import org.apache.commons.lang3.RandomUtils
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence

import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually

import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {

private val batchDuration = Milliseconds(500)
private val master = "local[2]"
private val framework = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
private val persistenceDir = Utils.createTempDir()

private var ssc: StreamingContext = _
private var broker: BrokerService = _
private var connector: TransportConnector = _
private var MQTTTestUtils: MQTTTestUtils = _

before {
ssc = new StreamingContext(master, framework, batchDuration)
setupMQTT()
MQTTTestUtils = new MQTTTestUtils
MQTTTestUtils.setup()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
Utils.deleteRecursively(persistenceDir)
tearDownMQTT()
if (MQTTTestUtils != null) {
MQTTTestUtils.teardown()
MQTTTestUtils = null
}
}

test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic,
StorageLevel.MEMORY_ONLY)

@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect.length > 0) {
receiveMessage = receiveMessage ::: List(rdd.first)
receiveMessage
}
}

MQTTTestUtils.registerStreamingListener(ssc)

ssc.start()

// wait for the receiver to start before publishing data, or we risk failing
// the test nondeterministically. See SPARK-4631
waitForReceiverToStart()
MQTTTestUtils.waitForReceiverToStart(ssc)

MQTTTestUtils.publishData(topic, sendMessage)

publishData(sendMessage)
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sendMessage.equals(receiveMessage(0)))
}
ssc.stop()
}

private def setupMQTT() {
broker = new BrokerService()
broker.setDataDirectoryFile(Utils.createTempDir())
connector = new TransportConnector()
connector.setName("mqtt")
connector.setUri(new URI("mqtt:" + brokerUri))
broker.addConnector(connector)
broker.start()
}

private def tearDownMQTT() {
if (broker != null) {
broker.stop()
broker = null
}
if (connector != null) {
connector.stop()
connector = null
}
}

private def findFreePort(): Int = {
val candidatePort = RandomUtils.nextInt(1024, 65536)
Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
}, new SparkConf())._2
}

def publishData(data: String): Unit = {
var client: MqttClient = null
try {
val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)

for (i <- 0 to 10) {
try {
msgTopic.publish(message)
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
// wait for Spark streaming to consume something from the message queue
Thread.sleep(50)
}
}
}
} finally {
client.disconnect()
client.close()
client = null
}
}

/**
* Block until at least one receiver has started or timeout occurs.
*/
private def waitForReceiverToStart() = {
val latch = new CountDownLatch(1)
ssc.addStreamingListener(new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
latch.countDown()
}
})

assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
}
}
Loading