### Task C - Streaming Application

This streaming application accepts data from all the 3 producers and based on various conditions are inserted in Mongo DB.
--The conditions are as follows:

   1. In one batch if the data is received from only Aqua and not Terra, and if the location of Aqua matches with the location of climate insert the data in Mongo DB following an embedded data model.
   2. If data is received from only Terra and not Aqua, and if the location of Terra matches with the location of climate insert the data in Mongo DB following an embedded data model.
   3. If data is received from both Terra and not Aqua, and if the location of Aqua and Terra both matches with the location of climate average the value of confidence and air_temperature and insert the data in Mongo DB following an embedded data model.
   4. In any other case insert climate data into Mongo DB.
    
All the producers are given similar 'Topic' so that the information collected in this application is collected in one kafka stream. The data is then separated based on the sender ID of the data coming from different Producers. 


In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import geohash as g
import sys
import time
import json
from json import loads
#import pygeohash as g
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db #Creating database
    collection = db.fit5148_Ass1 #Creating a collection
    # The below 3 lists are created to collect data coming from different producers
    climate=[]
    aqua=[]
    terra=[]
    for record in iter:
        data=json.loads(record[1])
        if (data['sender_id']=='Climate'):           
            data['hash']=g.encode(data['latitude'],data['longitude'],precision = 5) #Geohash code is generated
            climate.append(data)            
           # print(climate)
        elif (data['sender_id']=='Aqua'):           
            data['hash']=g.encode(data['latitude'],data['longitude'],precision = 5) #Geohash code is generated
            aqua.append(data)            
           # print(aqua)            
        elif (data['sender_id']=='Terra'):           
            data['hash']=g.encode(data['latitude'],data['longitude'],precision = 5) #Geohash code is generated
            terra.append(data)
            
           # print('trip')
        
        
        Aqua2=[]
        Terra2=[]
        Fire=[]
        
        if (len(aqua)!=0 and len(terra)!=0):
            
            for i in range(len(climate)):
                climate[i]['Fire'] = []
                if (climate[i]['hash']==aqua[0]['hash'] and climate[i]['hash']==terra[0]['hash']):
                    fire = {}
                    fire['surf_air_temp']=(aqua[0]['surf_air_temp'] + terra[0]['surf_air_temp'])/2
                    #aqua[0]['surf_air_temp']=(aqua[0]['surf_air_temp'] + terra[0]['surf_air_temp'])/2
                    #aqua[0]['confidence']= (aqua[0]['confidence'] + terra[0]['confidence'])/2
                    fire['confidence']= (aqua[0]['confidence'] + terra[0]['confidence'])/2
                    #Fire=Fire.append(aqua[0])           
                    climate[i]['fire']= [fire]
                    collection.insert(climate[i])
                   # print('a n t')

                elif (climate[i]['hash']==aqua[0]['hash'] and climate[i]['hash']!=terra[0]['hash']):
                    fire = {}
                    fire['surf_air_temp']=aqua[0]['surf_air_temp']
                    fire['confidence']= aqua[0]['confidence']
                    climate[i]['Fire']=[fire]
                    collection.insert(climate[i])
                   # print('a n c')

                elif (climate[i]['hash']==terra[0]['hash'] and  climate[i]['hash']!= aqua[0]['hash']):
                    
                    fire = {}
                    fire['surf_air_temp']=terra[0]['surf_air_temp']
                    fire['confidence']= terra[0]['confidence']#Terra2=Terra2.append(terra[0])
                    climate[i]['Fire']=[fire]
                    collection.insert(climate[i])
                    print('t n c')
                
        elif (len(aqua)>0 and len(terra)==0):
            for i in range(len(climate)):
                climate[i]['Fire'] = []
                if (climate[i]['hash']==aqua[0]['hash']):
                    #Aqua2=Aqua2.append(aqua[0])
                    climate[i]['Fire']=[aqua[0]]
                    collection.insert(climate[i])
                   # print('a')

                else:
                    collection.insert(climate[i])
            
        elif (len(aqua)==0 and len(terra) >0): 
            for i in range(len(climate)):
                climate[i]['Fire'] = []
                if (climate[i]['hash']==terra[0]['hash']):
                    #Terra2=Terra2.append(terra[0])
                    climate[i]['Fire']=[terra[0]]
                    collection.insert(climate[i]) 
                  #  print('t')

                else:
                    collection.insert(climate[i])  
                    
                    
        else:
            #db.collection.drop()
            for i in range(len(climate)):
                climate[i]['Fire'] = []
                collection.insert(climate[i])    
                    
        try:
          
            print('trip')
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
            
    client.close()

n_secs = 10
topic = "S1"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)