# Stream Processing using Apache Spark Streaming

I have written a streaming application in Apache Spark Streaming which has a local streaming context with two execution threads
and a batch interval of 10 seconds. 
Theis streaming application will receive streaming data from three producers. 
If the streaming application has data from all or at least two of the producers, it will do the processing as follows:
>- Join the streams based on the location (i,e, latitude and longitude) and create the data model developed in Task A.
>- Find two locations which are close to each other or not by implementing the geo-hashing algorithm using precision 5. The precision determines the number of characters in the Geohash.
>- If we receive the data from two different satellites AQUA and TERRA for the same location, then average the ‘surface temperature’ and ‘confidence’.
>- If the streaming application has the data from only one producer (Producer 1), it implies that there was no fire at that time and we can store the climate data into MongoDB straight away.

In [1]:
from pymongo import MongoClient
import pymongo
from pymongo import MongoClient
import datetime as dt


def deleteContents(obj):
    obj['date']=obj['timestamp']
    del obj['timestamp']
    return obj

#Task c2 a 3
def only_store(data_received):
        client = MongoClient () # method 1: connect on the default host and port
        db=client['fit5148_assignment_db']
        for each in data_received:
            db['climate_historic'].insert_one(deleteContents(each))
        client.close()
   
    
def full_received(data_received): #task C2 a 2
        client = MongoClient () # method 1: connect on the default host and port
        db=client['fit5148_assignment_db']
        sender2=[]
        sender3=[]
        sender1=[]
        datetimeStr=str(dt.datetime.now()) 
        for elem in data_received:
            if(elem['sender_id']==1):
                sender1.append(elem)
            elif(elem['sender_id']==2):
                sender2.append(elem)
            else:
                sender3.append(elem)
                
        if(sender2[0]['geohash']==sender3[0]['geohash']):
                temperatureAgg=(float(firstObj['surface_temperature_celcius'])+float(secondObj['surface_temperature_celcius']))/2
                confidenceAgg=(float(firstObj['confidence'])+float(secondObj['confidence']))/2
                sender2[0]['surface_temperature_celcius']=temperatureAgg
                sender2[0]['confidence']=confidenceAgg
                sender3=[]
                
                
        sender1Arr=[]
        for item in sender1:
            insertedFlag=0
            if(len(sender3)>1):
                if(item["geohash"]==sender3[0]["geohash"]):
                    hotspot_dict={}
                    hotspot_dict['id']=sender3[0]['_id']
                    hotspot_dict["confidence"]=sender3[0]["confidence"]
                    hotspot_dict['surface_temperature_celcius']=sender3[0]['surface_temperature_celcius']
                    sender1Arr.append(hotspot_dict)
                    item["hotspot"]=sender1Arr
                    insertedFlag=1
                    db['climate_historic'].insert_one(deleteContents(item))
                    sender1Arr=[]
                    sender3['datetime']=datetimeStr
                    sender3['owner']=item['_id']
                    db['hotspot_historic'].insert_one(sender3[0])
                    
            if(item["geohash"]==sender2[0]["geohash"]):
                hotspot_dict={}
                hotspot_dict['id']=sender2[0]['_id']
                hotspot_dict["confidence"]=sender2[0]["confidence"]
                hotspot_dict['surface_temperature_celcius']=sender2[0]['surface_temperature_celcius']
                sender1Arr.append(hotspot_dict)
                item["hotspot"]=sender1Arr
                insertedFlag=1
                db['climate_historic'].insert_one(deleteContents(item))
                sender1Arr=[]
                sender2['owner']=item['_id']
                sender2['datetime']=datetimeStr
                db['hotspot_historic'].insert_one(sender2[0])
                
            if(insertedFlag==0):
                db['climate_historic'].insert_one(deleteContents(item))
                
        client.close()
    
def two_received(data_received):  # Tack c2 a 1
        client = MongoClient () # method 1: connect on the default host and port
        db=client['fit5148_assignment_db']
        sender1=[]
        other=[]
        
        for elem in data_received:
            if(elem['sender_id']==1):
                sender1.append(elem)
            else:
                other.append(elem)
                
        sender1Arr=[]
        for elem in sender1:
            if(elem["geohash"]==other[0]["geohash"]):
                hotspot_dict={}
                hotspot_dict['id']=other[0]['_id']
                hotspot_dict["confidence"]=other[0]["confidence"]
                hotspot_dict['surface_temperature_celcius']=other[0]['surface_temperature_celcius']
                elem["hotspot"]=sender1Arr.append(hotspot_dict)
                db['climate_historic'].insert_one(deleteContents(elem))
                other[0]['datetime']=datetimeStr
                other[0]['owner']=item['_id']
                db['hotspot_historic'].insert_one(other[0])
            else:
                db['climate_historic'].insert_one(deleteContents(elem))
        client.close()
        

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash
from pymongo import MongoClient
import pymongo
import datetime as dt


#sending data to DB
def sendDataToDB(iter):
    data_received =[]
    i=0
    flag_1=0
    flag_2=0
    flag_3=0
    
    for record in iter:
        data = json.loads(record[1])
        data_received.append(data)
        
    for i in range(len(data_received)):
        data_received[i]["geohash"]=geohash.encode(float(data_received[i]["latitude"]),
                                                   float(data_received[i]["longitude"]),5)
        if data_received[i]["sender_id"] ==1:
                flag_1=1
        if  data_received[i]["sender_id"] ==2:
                flag_2=1
        if  data_received[i]["sender_id"] ==3:
                flag_3=1
    if (flag_1 ==1 and flag_2 ==0 and flag_3==0):
        only_store(data_received)
    elif (flag_1==1 and flag_2==1 and flag_3==1):
        full_received(data_received)
    elif (flag_1 ==1 and flag_2 ==1)or (flag_1 ==1 and flag_3==1):
        two_received(data_received)


n_secs = 10
topic = "climate_data"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week12-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)