# Stream Processing using Apache Spark Streaming.

Streaming Application in Apache Spark Streaming which has a local streaming context with two execution threads and a batch interval of 10 seconds. The streaming application will receive streaming data from all three producers. If the streaming application has data from all or at least two of the producers, it does the processing as follows:

* Join the streams based on the location (i,e, latitude and longitude) and create the data model developed in Task A.

* Find if two locations are close to each other or not. 

In [0]:
from  Geohash import encode
# find if locations are close
def is_close (x,y):
    return encode(x['latitude'],x['longitude'], precision=5)  == encode(y['latitude'],y['longitude'], precision=5)

# calculate average of fields in two records
def average (x,y,field):
    return (x[field]+y[field])/2

In [0]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
from json import loads
import pandas as pd
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db
    fire = db.historic
    climate = db.climate
    producer_2 = False
    producer_3 = False
    climate_data= []
    hotspot_data = []
    fire_data = []
    
    for record in iter:
            record = loads(record[1])
            
            if record["sender_id"] == "producer_2":
                producer_2 =  True
                aqua_data = record
                hotspot_data.append(record)
                
            elif record["sender_id"] == "producer_3":
                producer_3 =  True
                terra_data = record
                hotspot_data.append(record)
                
            else:
                climate_data.append(record)       
        
    # average info from both satellites
    if producer_2 == True and producer_3 == True:
        if is_close(aqua_data,terra_data):
            combined = aqua_data
            combined["surface_temperature_celcius"] = average(aqua_data,terra_data,"surface_temperature_celcius")
            combined["confidence"] = average(aqua_data,terra_data,"confidence")
            hotspot_data = [combined]    
       

    for hotspot_record in hotspot_data:
        for climate_record in climate_data:
            # check if hotspot and climate in same location
            if is_close(hotspot_record,climate_record):
                #set datetime key to match model from historic dataset in task A
                hotspot_record["datetime"] = hotspot_record["created_at"]
                # created_at no longer needed
                del hotspot_record["created_at"]
                hotspot_record["climate"] = climate_record
                fire.append(hotspot_record)
         
    try:
#         climate.insert(data1)
        climate.insert_many(climate_data)
    
        if len(fire_data)>0:
            fire.insert_many(fire_data)

    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
            
    client.close()

n_secs = 10 #for batch interval of 10 seconds

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)

sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

    
kafka_stream = KafkaUtils.createDirectStream(ssc, ["hotspot"], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'hotspots', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})

kafka_stream.pprint() 

# process the incoming stream
kafka_stream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))


ssc.start()

time.sleep(60) # Run stream for 1 minute just in case no detection of producer
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

-------------------------------------------
Time: 2019-05-26 19:48:50
-------------------------------------------
(None, '{"windspeed_knots": 8.1, "relative_humidity": 53.6, "max_wind_speed": 15.0, "longitude": 142.366, "sender_id": "producer_1", "precipitation ": " 0.00G", "air_temperature_celcius": 16, "created_at": "19:48:45", "latitude": -37.95}')

-------------------------------------------
Time: 2019-05-26 19:49:00
-------------------------------------------
(None, '{"windspeed_knots": 4.6, "relative_humidity": 47.1, "max_wind_speed": 8.9, "longitude": 148.10299999999998, "sender_id": "producer_1", "precipitation ": " 0.00G", "air_temperature_celcius": 10, "created_at": "19:48:50", "latitude": -37.469}')
(None, '{"confidence": 76, "created_at": "19:48:51", "surface_temperature_celcius": 50, "longitude": 141.6103, "latitude": -37.7126, "sender_id": "producer_3"}')
(None, '{"windspeed_knots": 8.1, "relative_humidity": 48.4, "max_wind_speed": 15.9, "longitude": 149.237, "sender_id":