In [1]:
# Import required packages
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash

In [2]:
# Decide whether same location or not
def get_geohash(lat1,long1,lat2,long2):
    return geohash.encode(lat1,long1, precision=5) == geohash.encode(lat2,long2, precision=5)

In [3]:
# Save to Mongo DB into assignment db and stream collection
def save_to_db(dictionary):
    client = MongoClient()
    db = client.assignment
    stream = db.stream
    stream.insert_one(dictionary)
    client.close()
    

In [4]:
def join_all_stream(rdd):
    # Turn RDD into iterable
    iter = rdd.collect() 
    
    # Lists to store hotspot and climate entries
    hotspot_list = []
    climate_list = []
    
    # For every delivered record...
    for record in iter:
        ## ...Ignore None part of tuple
        data = record[1]
        
        ## ...Turn into json format
        data=json.loads(data)
        
        ## ...Get sender ID
        sender=data['sender_id']
        
        ## ...If climate data...
        if sender==1:
            ###...... Add to climate list
            climate_list.append(data)

        ##... If not
        else:
            ###...... Add to hotspot list
            hotspot_list.append(data)
            
    
    # If no hotspot record recieved...
    if len(hotspot_list)==0:
        ## ... For every entry in climate list...
        for document in climate_list:
            ###...Directly save to Database
            save_to_db(document)
            
    # If 1 hotspot record recieved...       
    elif len(hotspot_list)==1:
        ## ...For every entry in climate list...
        for document in climate_list:
            hot_spot_doc = hotspot_list[0]
            
            ###....Check whether hotspot and climate location same...
            if get_geohash(document['latitude'],document['longitude']
                        ,hot_spot_doc['latitude'],hot_spot_doc['longitude']):
                
                ####... If yes, embed hotspot into climate record
                document['hotspot']= hot_spot_doc
                ####... save to database
                save_to_db(document)
            ###... If location is not matching  
            else:
                ####... save only the climate record to database
                save_to_db(document)
    
    # If 2 hotspot records recieved...
    elif len(hotspot_list)==2:
        hot_spot_doc1=hotspot_list[0]
        hot_spot_doc2=hotspot_list[1]    
        
        ## ...For every entry in climate list...
        for document in climate_list:
            
            ###....Check whether climate location and 2 of hotspor location same...
            if (get_geohash(document["latitude"], document["longitude"]
                           ,hot_spot_doc1["latitude"], hot_spot_doc1["longitude"]) and get_geohash(document["latitude"]
                                                                                                   , document["longitude"],hot_spot_doc2["latitude"], hot_spot_doc2["longitude"])):
                
                ####.... Take average of confidence and surface temperature of hotspot records
                hot_spot_merged= hot_spot_doc1
                hot_spot_merged["confidence"] = (hot_spot_doc1["confidence"] + hot_spot_doc2["confidence"])/2
                hot_spot_merged["surface_temperature_celcius"] = (hot_spot_doc1["surface_temperature_celcius"] 
                                                              + hot_spot_doc2["surface_temperature_celcius"])/2
                
                ####...Embed averaged hotspot record into climate record
                document["hotspot"]=hot_spot_merged
                ####... save embedded document to database
                save_to_db(document)
                
            ###....Check whether climate location and first one of hotspor location same...    
            elif get_geohash(document["latitude"], document["longitude"]
                           ,hot_spot_doc1["latitude"], hot_spot_doc1["longitude"]):
                
                ####...Embed hotspot record into climate record
                document["hotspot"] = hot_spot_doc1
                ####... save embedded document to database
                save_to_db(document)
            
            ###....Check whether climate location and second one of hotspor location same... 
            elif get_geohash(document["latitude"], document["longitude"]
                           ,hot_spot_doc2["latitude"], hot_spot_doc2["longitude"]):
                
                ####...Embed hotspot record into climate record
                document["hotspot"] = hot_spot_doc2
                ####... save embedded document to database
                save_to_db(document)
            
            ###....if no climate location and hotspot location same...
            else:
                ####... save climate to database
                save_to_db(document)
                

In [5]:
# Set batch Interval
n_secs = 10 
# Assign topic names
topics = ["climate_streaming", "hotspot_AQUA_streaming", "hotspot_TERRA_streaming"] 

# Configure and create spark context
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]") 
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

kafkaStream = KafkaUtils.createDirectStream(ssc, topics, {
       'bootstrap.servers': '127.0.0.1:9092',
       'group.id': 'assignment',
        'fetch.message.max.bytes': '15728640',
       'auto.offset.reset': 'largest'})

records = kafkaStream.foreachRDD(lambda rdd: join_all_stream(rdd))


ssc.start()
time.sleep(60) # Run stream for 10 minutes just in case no detection of producer
ssc.stop(stopSparkContext=True, stopGraceFully=True)

KeyboardInterrupt: 