# FIT5148 - Big data management and processing assignment

# Assignment Task C Streaming Application#

#### Team Members:

1. 27033074 - Matthew Yeow Yit Hang

2. 26546736 - Borris Trendy Wiria



#### Import required libraries

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pprint import pprint

In [2]:
# Initialise client and drop collection for no duplication
client = MongoClient()
db = client.fit5148_assignment_db
db = db.drop_collection(db.streaming)

In [3]:
# topic
topic = "climate_hotspot"

# sender_id
climate1 = "climate_1"
hotspotAqua = "hotspot_1"
hotspotTerra = "hotspot_2"

# config data
batch_interval = 10
appName = "StreamingApplication"
threads = "local[2]"
bootstrap_servers = '127.0.0.1:9092'

# config spark and spark streaming
config = SparkConf().setAppName(appName).setMaster(threads)
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=config)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, batch_interval)

In [None]:
# create Dstreams
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
            'bootstrap.servers': bootstrap_servers
        })

def sendDataToDB(iter):
    hotspotList = []
    climateList = []
    
    client = MongoClient()
    db = client.fit5148_assignment_db
    streaming = db.streaming
    
    for record in iter:
        data = record[1].split(',')
        if data[-1] == climate1: # climate data
            jsonData = {
                "geo_hash": record[0],
                "latitude": float(data[0]),
                "longitude": float(data[1]),
                "air_temperature_celcius": int(data[2]),
                "relative_humidity": float(data[3]),
                "windspeed_knots": float(data[4]),
                "max_wind_speed": float(data[5]),
                "precipitation": data[6],
                "hotspot": [],
                "average_confidence": 0,
                "average_surface_temperature": 0
            }
            climateList.append(jsonData)
        else:  # hotspot data
            jsonData = {
                "geo_hash": record[0],
                "latitude": float(data[0]),
                "longitude": float(data[1]),
                "confidence": int(data[2]),
                "surface_temperature_celcius": int(data[3]),
                "arrival_time": data[-2]
            }
            hotspotList.append(jsonData)
        
        
    '''
    EXPLANATION FOR THE FOLLOWING CODE:
    
    1. For every climate in batch, we check for every hotspot that has the same geohash.
    2. If there are corresponding geohash values, we increase the sum of confidence and temperature and append the hotspot document into the climate document.
    3. The average is calculated if there are hotspots.
    '''
    for climate in climateList:
        
        sumConfidence = 0
        sumSurfaceTemperature = 0
        
        
        for hotspot in hotspotList:
            # Join based on geohash
            if climate["geo_hash"] == hotspot["geo_hash"]:  
                sumConfidence += hotspot["confidence"]
                sumSurfaceTemperature += hotspot["surface_temperature_celcius"]
                climate["hotspot"].append(hotspot)
                
        
        if len(climate["hotspot"]) != 0:
            # Finding average
            climate["average_confidence"] = sumConfidence / len(climate["hotspot"])
            climate["average_surface_temperature"] = sumSurfaceTemperature / len(climate["hotspot"])
            
        try:
            streaming.insert_one(climate)
            #streaming.replace_one({"_id":climate["_id"]}, climate, True)
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()
    
lines = kafkaStream.foreachRDD(lambda rdd:rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)