# FIT5148 - Distributed Databases and Big Data

# Group Assignment - Part C - Spark Straming Application

**Your Details:**
- Name: Roopak Thiyyathuparambil Jayachandran
- StudentID: 29567467
- Email: rthi0002@student.monash.edu

## Introduction 

Processes the data which is pushed by Kafka producers into the stream. Data is joined and pushed to mongodb 
Methodology:
* Creating spark connection for the stream
* Understanding the producer from which data has come and saving it to appropriate table in memory
* Joining the data based on geo hash value and pushing it to mongodb

Methodology for implementation:

    * The data is taken into a list and final element is checked to identify the producer.
    * Hash value of the location is stored irrespective of producer.
    * If data is coming from producer 1, extract the required fields, store to a dictionary and check if the same location data at almost same time is present in hotspot data. If present get that data and return the complete dictionary. Else return the climate data without hotspot data.
    * If data is coming from Aqua or Terra Satellite, check if the similar location is already stored in the list. If present store the average of confidence and surface temperature (as per requirement). Else extract the data to a list, iterate the climate data and check matching hash value, hour and minute value (seconds are avoided) and return the climate dictionary
    * The dictionary is then stored to the mongodb using insert function.

In [13]:
# Creating global list of climate and hotspot. This will be used to store dictionaries of climate and hotspot.
climate = []
hotspot = []

In [None]:
# Imports 
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils
import pandas as pd
import geohash as gh
from bson.objectid import ObjectId

global climate
global hotspot

# Function which does the extraction from stream data and returning a dictionary
def sep(input_list):
    latitude = float(input_list[0])
    longitude = float(input_list[0])
    
    geo = gh.encode(float(input_list[0]),float(input_list[1]), precision = 5)
    if(int(input_list[-1]) == 1): # Producer 1
        # Extracting the date
        date = input_list[7].split(" ")[0]
        # Extracting the hour 
        hour = int(input_list[7].split(" ")[1].split(":")[0])
        climate_dic = {}
        climate_dic["geo"] = geo
        climate_dic["air_temperature"] = int(input_list[2])
        climate_dic["relative_humidity"] = float(input_list[3])
        climate_dic["windspeed_knots"] = float(input_list[4])
        climate_dic["windspeed_max"] = float(input_list[5])
        climate_dic["precipitation"] = input_list[6]
        climate_dic["date"] = date
        climate_dic["hour"] = hour  
        
        fire_present = False # Flag check if data is already stored from the other satelite data
        if len(hotspot) > 1: 
            for each in hotspot:
                if (each["geo"] == geo and each["date"] == date and each["hour"] == hour):
                    climate_dic["historic"] = [each]
                    fire_present = True        
                    break
            if not fire_present:
                each["historic"] = []
        
        climate.append(climate_dic)
        return climate_dic
        
        
    else: # Producer 2
        date = input_list[4].split(" ")[0] # Extract date
        hour = int(input_list[4].split(" ")[1].split(":")[0]) # Extract hour
        minute = int(input_list[4].split(" ")[1].split(":")[1]) # Extract minute
        present = False
        hotspot_dic = {}
        for each in hotspot: # Iterating through the hotspot data and check if simiar data from other satelite is already stored
            if (each["geo"] == geo and each["hour"] == hour and each["minute"] == minute):
                each["confidence"] = (each["confidence"] + int(input_list[2]))/2
                each["surface_temperature_celsius"] = (each["surface_temperature_celsius"] + int(input_list[3]))/2
                for each2 in climate:
                    if (each2["geo"] == geo and each2["date"] == date and each2["hour"] == hour):   
                        each2["historic"] = each
                        return each2
                present = True
                break
        if not present: # Not present, create a hotspot dictionary and 
            hotspot_dic["geo"] = geo
            hotspot_dic["date"] = date
            hotspot_dic["latitude"] = float(input_list[0])
            hotspot_dic["longitude"] = float(input_list[1])
            hotspot_dic["confidence"] = int(input_list[2])
            hotspot_dic["surface_temperature_celsius"] = int(input_list[3])
            hotspot_dic["datetime"] = input_list[4]
            hotspot_dic["hour"] = hour
            hotspot_dic["minute"] = minute
            for each in climate:
                if (each["geo"] == geo and each["date"] == date and each["hour"] == hour):
                    each["historic"] = hotspot_dic
                    return each      
                    break
            hotspot.append(hotspot_dic)
            return {}
            
       
def sendDataToDB(iter): # Initializes the MongoDB connection
    client = MongoClient()
    db = client.fit514
    col = db.climate_data_model_9
    for record in iter:
        # Splitting the incoming stream of data to a list
        data = record[1].split(",")
        dic = {}
        dic = sep(data)
        try:
            if len(dic) == 0: 
                continue
            print(dic)
            dic['_id'] = ObjectId()
            col.replace_one({"_id":dic["_id"]}, dic, True)
            print("Inserted: ", col.count() )
            print()
            print()
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()

n_secs = 2
topic = "Climate"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
print("1")
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'assign-climate', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary
print("2")
lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

The following code is helpful for debugging logic implemented in sep function. Only for development Purpose. Can be used to verify how each of the requirement is satisfied and what value is returned. Can be also used to verify edge cases.

In [5]:
import geohash as gh
def sep(input_list):
    latitude = float(input_list[0])
    longitude = float(input_list[0])
    
    geo = gh.encode(float(input_list[0]),float(input_list[1]), precision = 5)
    if(int(input_list[-1]) == 1):
        date = input_list[7].split(" ")[0]
        hour = int(input_list[7].split(" ")[1].split(":")[0])
        climate_dic = {}
        climate_dic["geo"] = geo
        climate_dic["air_temperature"] = int(input_list[2])
        climate_dic["relative_humidity"] = float(input_list[3])
        climate_dic["windspeed_knots"] = float(input_list[4])
        climate_dic["windspeed_max"] = float(input_list[5])
        climate_dic["precipitation"] = input_list[6]
        climate_dic["date"] = date
        climate_dic["hour"] = hour  
        
        fire_present = False
        if len(hotspot) > 1:
            for each in hotspot:
                if (each["geo"] == geo and each["date"] == date and each["hour"] == hour):
                    climate_dic["historic"] = [each]
                    fire_present = True        
                    break
            if not fire_present:
                each["historic"] = []
        
        climate.append(climate_dic)
        return climate_dic
        
        
    else:
        date = input_list[4].split(" ")[0]
        hour = int(input_list[4].split(" ")[1].split(":")[0])
        minute = int(input_list[4].split(" ")[1].split(":")[1])
        present = False
        hotspot_dic = {}
        for each in hotspot:
            if (each["geo"] == geo and each["hour"] == hour and each["minute"] == minute):
                each["confidence"] = (each["confidence"] + int(input_list[2]))/2
                each["surface_temperature_celsius"] = (each["surface_temperature_celsius"] + int(input_list[3]))/2
                for each2 in climate:
                    if (each2["geo"] == geo and each2["date"] == date and each2["hour"] == hour):   
                        each2["historic"] = each
                        return each2
                present = True
                break
        if not present:
            hotspot_dic["geo"] = geo
            hotspot_dic["date"] = date
            hotspot_dic["latitude"] = float(input_list[0])
            hotspot_dic["longitude"] = float(input_list[1])
            hotspot_dic["confidence"] = int(input_list[2])
            hotspot_dic["surface_temperature_celsius"] = int(input_list[3])
            hotspot_dic["datetime"] = input_list[4]
            hotspot_dic["hour"] = hour
            hotspot_dic["minute"] = minute
            for each in climate:
                if (each["geo"] == geo and each["date"] == date and each["hour"] == hour):
                    each["historic"] = hotspot_dic
                    return each      
                    break
            
            hotspot.append(hotspot_dic)

In [11]:
#input_list = "-36.3634,146.3871,72,47,2019-05-25 19:13:03.746296,2"
input_list = "-36.3634,146.3871,15,41.2,13.8,16.9, 0.00G,2019-05-25 19:13:03.746298,1"
sep(input_list=input_list.split(","))
hotspot


[{'confidence': 72,
  'date': '2019-05-25',
  'datetime': '2019-05-25 19:13:03.746296',
  'geo': 'r1w9j',
  'hour': 19,
  'latitude': -36.3634,
  'longitude': 144.3871,
  'minute': 13,
  'surface_temperature_celsius': 47},
 {'confidence': 72,
  'date': '2019-05-25',
  'datetime': '2019-05-25 19:13:03.746296',
  'geo': 'r3815',
  'hour': 19,
  'latitude': -36.3634,
  'longitude': 146.3871,
  'minute': 13,
  'surface_temperature_celsius': 47},
 {'confidence': 72,
  'date': '2019-05-25',
  'datetime': '2019-05-25 19:13:03.746296',
  'geo': 'r60pg',
  'historic': [],
  'hour': 19,
  'latitude': -32.3634,
  'longitude': 146.3871,
  'minute': 13,
  'surface_temperature_celsius': 47}]

In [12]:
climate

[{'air_temperature': 15,
  'date': '2019-05-25',
  'geo': 'r3374',
  'hour': 18,
  'precipitation': ' 0.00G',
  'relative_humidity': 41.2,
  'windspeed_knots': 13.8,
  'windspeed_max': 16.9},
 {'air_temperature': 15,
  'date': '2019-05-25',
  'geo': 'r3815',
  'historic': [{'confidence': 72,
    'date': '2019-05-25',
    'datetime': '2019-05-25 19:13:03.746296',
    'geo': 'r3815',
    'hour': 19,
    'latitude': -36.3634,
    'longitude': 146.3871,
    'minute': 13,
    'surface_temperature_celsius': 47}],
  'hour': 19,
  'precipitation': ' 0.00G',
  'relative_humidity': 41.2,
  'windspeed_knots': 13.8,
  'windspeed_max': 16.9}]