# Streaming_Application

This Spark streaming application will:
1. receive the streaming data from all three producers
2. process the information with newly created datetime and Geohash, then form the data model
3. pass the data to MongoDB

        Window length = 10s

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

# libraries 
import sys
import time
import json
import pymongo
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash as gh
import datetime as dt
from datetime import timedelta
from datetime import datetime

In [2]:
def sendDataToDB(iter):
    
    client = MongoClient() # connect to MongoDB
    db = client.skill_demo_db 
    collection = db.vic_fire_collection
    
    climate_data={}
    hotspot = {}
    later_process = []
    
    # obtain the newest datetime from mongodb
    date_find= collection.find({}).sort('Date',pymongo.DESCENDING).limit(1)
    for i in date_find:
        new_date = i['Date'] + timedelta(days=1)
            
    for record in iter:
        
        data = json.loads(record[1])
        #add new attribute as geohash, and do the geohasing for each received data
        data['Geohash_5']= gh.encode(float(data['Latitude']),float(data['Longitude']),precision=5)
        #update the date
        data['Date'] = new_date
        if data['Sender_ID'] == 1: # for climate data
            climate_data = data
            store_hash = data['Geohash_5']
        elif data['Sender_ID'] == 2 or data['Sender_ID'] == 3: # for hotspot data
            # update the datetime
            data['Datetime'] = dt.datetime.combine(new_date,datetime.strptime(data['Created_Time'],"%X").time())
            # append to late_process
            later_process.append(data)
      
            
    if any(climate_data):       
        for x in later_process: 
            if x['Geohash_5'] == store_hash: # if the hotspot and the climate are in same location
                if x['Geohash_5'] in hotspot.keys(): # if hotspot already exists, average the temp 
                    hotspot[x['Geohash_5']]['Surface_Temperature_Celcius'] = (hotspot[x['Geohash_5']]['Surface_Temperature_Celcius'] + x['Surface_Temperature_Celcius'])/2
                    hotspot[x['Geohash_5']]['Confidence'] = (hotspot[x['Geohash_5']]['Confidence'] + x['Confidence'])/2
                else: # add new hotspot into the dict
                    hotspot[x['Geohash_5']] = x
        
                
    if any(hotspot) and any(climate_data): # add the hotspot into the climate, create the data model
        climate_data['Hotspot'].append(list(hotspot.values())[0])
        
        
    if any(climate_data): # send the data to MongoDB      
        try:
            collection.insert(climate_data)
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()
        

        
n_secs = 10 # seconds of interval
topic = "vic_fire_producer" #topic name

conf = SparkConf().setAppName("Spark_steaming").setMaster("local[2]")# 2 threads
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs) 
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'vic_fire_steaming', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)