From 3aa7fffdd6ce15cc0b1c2625d7fa92f04543e8fb Mon Sep 17 00:00:00 2001 From: prabs Date: Wed, 4 Feb 2015 01:18:17 +0530 Subject: [PATCH] Added Python streaming mqtt word count example --- .../main/python/streaming/mqtt_wordcount.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 examples/src/main/python/streaming/mqtt_wordcount.py diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py new file mode 100644 index 0000000000000..524d177a74e34 --- /dev/null +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + A sample wordcount with MqttStream stream + Usage: mqtt_wordcount.py + To work with Mqtt, Mqtt Message broker/server required. + Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker + In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + Run Mqtt publisher as + `$ bin/run-example \ + org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + and then run the example as + `$ bin/spark-submit --driver-class-path external/mqtt-assembly/target/scala-*/\ + spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \ + tcp://localhost:1883 foo` +""" + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.streaming.mqtt import MQTTUtils + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: mqtt_wordcount.py " + exit(-1) + + sc = SparkContext(appName="PythonStreamingMQTTWordCount") + ssc = StreamingContext(sc, 1) + + broker_url = sys.argv[1] + topic = sys.argv[2] + + data = MQTTUtils.createStream(ssc, topic, broker_url) + lines = data.map(lambda x: x[1]) + counts = lines.flatMap(lambda line: line.split(" ")) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a+b) + counts.pprint() + + ssc.start() + ssc.awaitTermination()