Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5155] [PySpark] [Streaming] Mqtt streaming support in Python #4229

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b7d42ff
Mqtt streaming support in Python
prabeesh Feb 3, 2015
3aa7fff
Added Python streaming mqtt word count example
prabeesh Feb 3, 2015
b34c3c1
adress comments
prabeesh Feb 5, 2015
3f4df12
updated version
prabeesh Mar 29, 2015
ee387ae
Fix assembly jar location of mqtt-assembly
prabeesh Apr 22, 2015
795ec27
address comments
prabeesh Jun 23, 2015
a11968b
fixed python style
prabeesh Jun 29, 2015
9767d82
implemented Python-friendly class
prabeesh Jul 6, 2015
a5a8f9f
added Python test
prabeesh Jul 10, 2015
e1ee016
scala style fix
prabeesh Jul 11, 2015
1f0cfe9
python style fix
prabeesh Jul 11, 2015
80474d1
fix
prabeesh Jul 11, 2015
97244ec
Make sbt build the assembly test jar for streaming mqtt
zsxwing Jul 22, 2015
87fc677
address the comments:
Jul 23, 2015
a6747cb
wait for starting the receiver before publishing data
prabeesh Jul 24, 2015
d07f454
Register StreamingListerner before starting StreamingContext; Revert …
zsxwing Jul 26, 2015
b90b709
Merge pull request #1 from zsxwing/pr4229
prabeesh Jul 26, 2015
126608a
address the comments
Jul 30, 2015
734db99
Merge branch 'master' into pr4229
zsxwing Jul 31, 2015
5f8a1d4
Make the maven build generate the test jar for Python MQTT tests
zsxwing Jul 31, 2015
478f844
Add unpack
zsxwing Jul 31, 2015
47278c5
Include the project class files
zsxwing Jul 31, 2015
935615c
Fix the flaky MQTT tests
zsxwing Jul 31, 2015
abf5f18
Merge branch 'master' into pr4229
zsxwing Aug 1, 2015
03f3e88
Merge pull request #2 from zsxwing/pr4229
prabeesh Aug 2, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ def build_spark_sbt(hadoop_version):
"assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly",
"streaming-mqtt-assembly/assembly",
"streaming-mqtt/test:assembly",
"streaming-kinesis-asl-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals

Expand Down
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def contains_file(self, filename):
dependencies=[streaming],
source_file_regexes=[
"external/mqtt",
"external/mqtt-assembly",
],
sbt_test_goals=[
"streaming-mqtt/test",
Expand Down Expand Up @@ -306,6 +307,7 @@ def contains_file(self, filename):
streaming,
streaming_kafka,
streaming_flume_assembly,
streaming_mqtt,
streaming_kinesis_asl
],
source_file_regexes=[
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka, Flume and MQTT are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
58 changes: 58 additions & 0 deletions examples/src/main/python/streaming/mqtt_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A sample wordcount with MqttStream stream
Usage: mqtt_wordcount.py <broker url> <topic>

To run this in your local machine, you need to setup a MQTT broker and publisher first,
Mosquitto is one of the open source MQTT Brokers, see
http://mosquitto.org/
Eclipse paho project provides number of clients and utilities for working with MQTT, see
http://www.eclipse.org/paho/#getting-started

and then run the example
`$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \
tcp://localhost:1883 foo`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)

brokerUrl = sys.argv[1]
topic = sys.argv[2]

lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
102 changes: 102 additions & 0 deletions external/mqtt-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External MQTT Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
28 changes: 28 additions & 0 deletions external/mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,33 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>

<plugins>
<!-- Assemble a jar with test dependencies for Python tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>test-jar-with-dependencies</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<!-- Make sure the file path is same as the sbt build -->
<finalName>spark-streaming-mqtt-test-${project.version}</finalName>
<outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
<appendAssemblyId>false</appendAssemblyId>
<!-- Don't publish it since it's only for Python tests -->
<attach>false</attach>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
44 changes: 44 additions & 0 deletions external/mqtt/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly>
<id>test-jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>

<fileSets>
<fileSet>
<directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useTransitiveDependencies>true</useTransitiveDependencies>
<scope>test</scope>
<unpack>true</unpack>
<excludes>
<exclude>org.apache.hadoop:*:jar</exclude>
<exclude>org.apache.zookeeper:*:jar</exclude>
<exclude>org.apache.avro:*:jar</exclude>
</excludes>
</dependencySet>
</dependencySets>

</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,19 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
}

/**
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's MQTTUtils.
*/
private class MQTTUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaDStream[String] = {
MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
}
}
Loading