# Spark Streaming Application for Data from Kafka Producers

The answers have been modified for portfolio purpose

Author: Grace Nathania

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash
import datetime as dt

def find_latest_date(col):
    cursor = col.aggregate([{"$sort":{"_id":-1}}, {"$limit":1}])
    
    for doc in cursor:
        date = doc['date']
        date = dt.datetime.strptime(date,'%d/%m/%y').date() + dt.timedelta(days=1)
        
        return date.strftime('%d/%m/%y')
    
def sendDataToDB(iter):
    client = MongoClient()
    db = client.stopfire_db
    collection = db.data
    record_list = [[],[],[]]
    
    for record in iter:
        # Recovering original data type and strucutre
        value = eval(record[1])
        key = int(record[0])

        # Storing data into record_list based on which producer data comes from
        record_list[key-1].append(value)
        
        
    # Early termination if climate data doesn't exist
    if record_list[0] == []:
        pass

    # Data processing
    else:
        climate_data = record_list[0][0]
        climate_date = climate_data['date']
        climate_data.pop('created_date')
        climate_data.pop('producer')

        # Compare climate and hotspot with precision 3
        c_lat = climate_data['latitude']
        c_long = climate_data['longitude']
        climate_val = geohash.encode(c_lat, c_long, precision = 3)

        hotspot_list = []
        aqua_hotspot = record_list[1]
        terra_hotspot = record_list[2]

        # checking hotspot data from AQUA satelite
        for i in range(len(aqua_hotspot)):
            lat = aqua_hotspot[i]['latitude']
            long = aqua_hotspot[i]['longitude']
            aqua_val = geohash.encode(lat, long, precision = 3)

            if aqua_val == climate_val:
                hotspot_list.append(aqua_hotspot[i])

        # checking hotspot data from TERRA satelite
        for i in range(len(terra_hotspot)):
            lat = terra_hotspot[i]['latitude']
            long = terra_hotspot[i]['longitude']
            terra_val = geohash.encode(lat, long, precision = 3)

            if terra_val == climate_val:
                hotspot_list.append(terra_hotspot[i])
                
        # Setting station data on Climate with value of geohash to match the data in part A
        climate_data['station'] = climate_val
        
        # Removing latitude and longitude data from climate_data 
        climate_data.pop("latitude")
        climate_data.pop("longitude")

        # no hotspot is near the climate data, thus only climate data is appended
        if len(hotspot_list) == 0:
            pass

        else:
            # Combine the hotspot data that are close to each other
            hotspot_dict = {}

            for data in hotspot_list:
                lat = data['latitude']
                long = data['longitude']
                val = geohash.encode(lat,long,precision = 5)

                if val in hotspot_dict.keys():
                    dict_val = hotspot_dict[val]
                    surf_dict = dict_val['surface_temperature_celcius']
                    conf_dict = dict_val['confidence']
                    lat_dict = dict_val['latitude']
                    long_dict = dict_val['longitude']

                    surf_data = data['surface_temperature_celcius']
                    conf_data = data['confidence']

                    surf_avg = (surf_dict + surf_data)/2
                    conf_avg = (conf_dict + conf_data)/2
                    lat_avg = (lat_dict + lat)/2
                    long_avg = (long_dict + long)/2
                    
                    # Updating the current data with the average data
                    hotspot_dict[val]['surface_temperature_celcius'] = surf_avg
                    hotspot_dict[val]['confidence'] = conf_avg
                    hotspot_dict[val]['latitude'] = lat_avg
                    hotspot_dict[val]['longitude'] = long_avg

                else:
                    real_date = data['created_date'].split("T")
                    real_time = real_date[1]
                    data['date_time'] = climate_date + "T" + real_time
                    data['date'] = climate_date
                    data.pop('created_date')
                    data.pop('producer')
                    hotspot_dict[val] = data

            # Convert combined hotspots into list and store into climate data
            hotspot_list = list(hotspot_dict.values())
            climate_data['hotspot'] = hotspot_list

            # Decide if the fire event is natural or other
            if (climate_data['air_temperature_celcius'] > 20) and (climate_data['GHI_w/m2'] > 180):
                climate_data['fire_cause'] = 'natural'
            else:
                climate_data['fire_cause'] = 'other'
        
        try:
            collection.insert(climate_data)

        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
                           
    client.close()
    
n_secs = 10
topic = 'StopFire'

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'assignment-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
#time.sleep(11000)
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)