In [None]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1,org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.2.1 pyspark-shell'

In [None]:
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import UTF8Deserializer
from pyspark.streaming import DStream

#   Subscribe to mqtt broker and follow given topics
def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
    jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

    helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
    helper = helperClass.newInstance()
    jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)

    return DStream(jstream, ssc, UTF8Deserializer())


In [None]:
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(appName="HumAIn Test")
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")

brokerUrl = "tcp://localhost:1883"
topic1 = "humAIn/testTopic1"
topic2 = "humAIn/testTopic2"
topic3 = "humAIn/testTopic3"
topic4 = "humAIn/testTopic4"
topic5 = "humAIn/testTopic5"
mqttPort = 1883

mqttStream1 = createStream(ssc, brokerUrl, topic1)
mqttStream2 = createStream(ssc, brokerUrl, topic2)

In [None]:
import paho.mqtt.publish as p

#   Join the rdd's of both streams
mqttStreamUnion = mqttStream1.union(mqttStream2)

#   For each rdd
#   publish line to mqtt broker using paho
#   do this every time unit where streams are created
def sendRecord(rdd):  
    rdd.foreach(lambda record: p.single(topic3, payload=record, port=mqttPort))

#   For each rdd in newly created joined dstream do above method
mqttStreamUnion.foreachRDD(sendRecord)

In [None]:
#   For each row line in rdd
#   Convert text to JSON
#   Enhance JSON
#   Dump JSON to text and put text in line
def enhanceRecord(record):   
    payload = json.loads(record)
    tagset = payload["tagset"]
    tagset["country"] = "DK"
    tagset["site"] = "TESTsite"
    tagset["area"] = "Floor 5"
    tagset["line"] = payload["fieldset"]["testwaarde"]*-1
    tagset["level1"] = "HardMap"
    return json.dumps(tagset)

mqttStreamEnhanced = mqttStream1.map(lambda x: enhanceRecord(x))

#   Send Enhanced JSON to mqtt broker
def sendEnhancedRecord(rdd):  
    rdd.foreach(lambda record: p.single(topic4, payload=record, port=mqttPort))

mqttStreamEnhanced.foreachRDD(sendEnhancedRecord)

In [None]:
windowedStream1 = mqttStream1.window(20)
windowedStream2 = mqttStream2.window(60)
mqttStreamWindowed = windowedStream1.join(windowedStream2)

#   For each rdd in newly created joined dstream print output
mqttStreamWindowed.print()

In [None]:
ssc.start()
ssc.awaitTermination()