Skip to content

Commit

Permalink
adress comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prabeesh committed Jul 4, 2015
1 parent 3aa7fff commit b34c3c1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 8 deletions.
5 changes: 2 additions & 3 deletions examples/src/main/python/streaming/mqtt_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@
sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)

broker_url = sys.argv[1]
brokerUrl = sys.argv[1]
topic = sys.argv[2]

data = MQTTUtils.createStream(ssc, topic, broker_url)
lines = data.map(lambda x: x[1])
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
Expand Down
8 changes: 3 additions & 5 deletions python/pyspark/streaming/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from py4j.java_gateway import java_import, Py4JError

from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.serializers import UTF8Deserializer
from pyspark.streaming import DStream

__all__ = ['MQTTUtils']
Expand All @@ -28,7 +28,7 @@
class MQTTUtils(object):

@staticmethod
def createStream(ssc, brokerUrl, topic
def createStream(ssc, brokerUrl, topic,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
"""
Create an input stream that pulls messages from a Mqtt Broker.
Expand All @@ -52,6 +52,4 @@ def createStream(ssc, brokerUrl, topic
print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \
"scala-*/spark-streaming-mqtt-assembly-*.jar"
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
return stream
return DStream(jstream, ssc, UTF8Deserializer())

0 comments on commit b34c3c1

Please sign in to comment.