In [0]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
import pprint
import geohash2
import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [0]:
def get_geohash(latitude, longitude):
    value = geohash2.encode(latitude, longitude, precision = 5)
    return value    

In [0]:
def join_hotspots(hotspot_list):
    if len(hotspot_list) != 2:
        return hotspot_list
    else:
        s1 = hotspot_list[0]
        s2 = hotspot_list[1]
        if s1["geohash"] == s2["geohash"]:
            s_new = s1
            s_new["confidence"] = (s1["confidence"] + s2["confidence"]) /2
            s_new["surface_temperature_celcius"]  = (s1["surface_temperature_celcius"] + s2["surface_temperature_celcius"]) /2
            s_new["sender_id"] = "Producer2&3"
            return [s_new]
        else:
            return hotspot_list

In [0]:
def join_climate_hotspot(hotspot_list, climate_list):
    for climate in climate_list:
        for hotsopt in hotspot_list:
            if climate["geohash"] == hotsopt["geohash"]:
                if "hot_spots" in climate:
                    climate["hot_spots"].append(hotsopt)
                else:
                    climate["hot_spots"] = [hotsopt]
    return climate_list

In [0]:
def join_all_stream(rdd):
    batch = rdd.collect()
    hotspot_list = []
    climate_list = []
    # get records
    for record in batch:
        data = eval(record[1])
        sender_id = data["sender_id"]  # get sender_id

        data["geohash"] = get_geohash(data["latitude"], data["longitude"])

        if sender_id == "Producer1":
            climate_list.append(data)
        elif sender_id == "Producer2" or sender_id == "Producer3":
            hotspot_list.append(data)
    hotspot_list = join_hotspots(hotspot_list)
    climate_list = join_climate_hotspot(hotspot_list, climate_list)
    return climate_list



In [0]:
def sendDataToDB(rdd):
    
    climate_list = join_all_stream(rdd)
    client = MongoClient()
    db = client.fit5148_assignment_db
    collection = db.climate
    for record in climate_list:
        try:
            collection.insert(record)
            #print(record)
            #print("ok")
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()


In [0]:
                           
sc = SparkContext.getOrCreate()
batch_interval = 10
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
topic = ['climate', 'AQUA', 'TERRA']

if (sc is None):
    sc = SparkContext(conf=conf)

sc.setLogLevel("WARN")    
ssc = StreamingContext(sc, batch_interval)
   

kafkaStream = KafkaUtils.createDirectStream(ssc, topic, {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'assignment2', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})

#kafkaStream.pprint()
lines = kafkaStream.foreachRDD(lambda rdd: sendDataToDB(rdd))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

  if __name__ == '__main__':


{'relative_humidity': 37.2, 'windspeed_knots': 6.2, 'precipitation ': ' 0.08G', 'latitude': -36.3328, 'geohash': 'r1xc7', 'sender_id': 'Producer1', '_id': ObjectId('5ce772819343691254b74834'), 'created_time': '14:26:37', 'air_temperature_celcius': 7, 'max_wind_speed': 16.9, 'longitude': 146.0355}
ok
{'relative_humidity': 58.7, 'windspeed_knots': 7.8, 'precipitation ': ' 0.00I', 'latitude': -36.0459, 'geohash': 'r1w6f', 'sender_id': 'Producer1', '_id': ObjectId('5ce7728a9343691254b74836'), 'created_time': '14:26:42', 'air_temperature_celcius': 21, 'max_wind_speed': 13.0, 'longitude': 143.8907}
ok
{'relative_humidity': 43.3, 'windspeed_knots': 3.9, 'precipitation ': ' 0.00I', 'latitude': -37.385999999999996, 'geohash': 'r3372', 'sender_id': 'Producer1', '_id': ObjectId('5ce7728a9343691254b74837'), 'created_time': '14:26:47', 'air_temperature_celcius': 8, 'max_wind_speed': 8.0, 'longitude': 148.043}
ok
{'relative_humidity': 36.3, 'windspeed_knots': 6.3, 'precipitation ': ' 0.01G', 'latitu

KeyboardInterrupt: 

In [0]:
#ssc.stop(stopSparkContext=True,stopGraceFully=True)