In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash

In [2]:
def send_to_DB(doc):
    client = MongoClient()
    db = client.fit5148_assignment_db
    joined = db.joined
    try:
        joined.insert_one(doc)
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
    #closing the client
    client.close()

In [None]:
def join_stream(iter):
    
    #we merge terra and aqua as hotspots - 2 producers
    terra_aqua_list = []
    #one producer for climate data
    climate_list = []
    for each in iter:
        data = each[1]
        data_json = json.loads(data)
        
        if data_json['sender'] == 'climate':
            climate_list.append(data_json)
        else:
            terra_aqua_list.append(data_json)
    #There are four requirements in Task C.2.a. sequentially we call them ONE, TWO, THREE and FOUR
    ##Implementing the requirement FOUR: If the streaming application has the data from only one producer (Producer 1)
    ##it implies that there was no fire at that time and we can store the climate data into MongoDB straight away
    if len(terra_aqua_list) == 0:
        for doc in climate_list:
            send_to_DB(doc)
    #since climate is streamed every 5 seconds, we'll always have at least one in our stream batch
    ##According to the requirement, if we have data from at least two producers, we join them based on the location and
    ##merge them in the same data model we created in Task B
    ###Implementing the requirement ONE and TWO:
    
    #if we have one data from either aqua or terra, we embed it in the climate data
    #as our model is one-to-many embedded documents.
    elif len(terra_aqua_list) == 1:
        for doc in climate_list:
            if geohash.encode(doc['data']['latitude'],doc['data']['longitude'], precision=5) == geohash.encode(terra_aqua_list[0]['data']['latitude'],terra_aqua_list[0]['data']['longitude'], precision=5):
                doc['data']['hotspot'] = terra_aqua_list[0]
                send_to_DB(doc)
            else:
                send_to_DB(doc)
                
    ###Implementing the requirement THREE:
    elif len(terra_aqua_list) == 2:
        ter_aqua_doc1 = terra_aqua_list[0]
        ter_aqua_doc2 = terra_aqua_list[1]
        for doc in climate_list:
            #climate_location == aqua_location and climate_location == terra_location
            if (geohash.encode(doc['data']['latitude'],doc['data']['longitude'], precision=5) == geohash.encode(ter_aqua_doc1['data']['latitude'],ter_aqua_doc1['data']['longitude'], precision=5) and geohash.encode(doc['data']['latitude'],doc['data']['longitude'], precision=5) == geohash.encode(ter_aqua_doc2['data']['latitude'],ter_aqua_doc2['data']['longitude'], precision=5)):
                ter_aqua_doc_average = ter_aqua_doc1
                ter_aqua_doc_average['data']['confidence'] = (ter_aqua_doc1['data']['confidence']+ter_aqua_doc2['data']['confidence'])/2
                ter_aqua_doc_average['data']['surface_temperature_celcius'] = (ter_aqua_doc1['data']['surface_temperature_celcius']+ter_aqua_doc2['data']['surface_temperature_celcius'])/2
                doc['data']['hotspot'] = ter_aqua_doc_average
                send_to_DB(doc)
            elif geohash.encode(doc['data']['latitude'],doc['data']['longitude'], precision=5) == geohash.encode(ter_aqua_doc1['data']['latitude'],ter_aqua_doc1['data']['longitude'], precision=5):
                doc['data']['hotspot'] = ter_aqua_doc1
                send_to_DB(doc)
            elif geohash.encode(doc['data']['latitude'],doc['data']['longitude'], precision=5) == geohash.encode(ter_aqua_doc2['data']['latitude'],ter_aqua_doc2['data']['longitude'], precision=5):
                doc['data']['hotspot'] = ter_aqua_doc1
                send_to_DB(doc)
            else:
                send_to_DB(doc)
batch_interval = 10
topic = "StopFire"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, batch_interval)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'assignment', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary
kafkaStream.pprint()

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(join_stream))

ssc.start()
time.sleep(6000) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)