Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prabeesh committed Jul 4, 2015
1 parent ee387ae commit 795ec27
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
2 changes: 1 addition & 1 deletion examples/src/main/python/streaming/mqtt_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
`$ bin/run-example \
org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
and then run the example as
`$ bin/spark-submit --driver-class-path external/mqtt-assembly/target/scala-*/\
`$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \
tcp://localhost:1883 foo`
"""
Expand Down
4 changes: 2 additions & 2 deletions external/mqtt-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External Kafka Assembly</name>
<name>Spark Project External MQTT Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
Expand Down
28 changes: 22 additions & 6 deletions python/pyspark/streaming/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,28 @@ def createStream(ssc, brokerUrl, topic,

try:
jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel)

except Py4JError, e:
# TODO: use --jar once it also work on driver
if not e.message or 'call a package' in e.message:
print "No Mqtt package, please put the assembly jar into classpath:"
print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \
"scala-*/spark-streaming-mqtt-assembly-*.jar"
if 'ClassNotFoundException' in str(e.java_exception):
MQTTUtils._printErrorMsg(ssc.sparkContext)
raise e
return DStream(jstream, ssc, UTF8Deserializer())

@staticmethod
def _printErrorMsg(sc):
print("""
________________________________________________________________________________________________
Spark Streaming's MQTT libraries not found in class path. Try one of the following.
1. Include the MQTT library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s.
Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
________________________________________________________________________________________________
""" % (sc.version, sc.version))

0 comments on commit 795ec27

Please sign in to comment.