# FIT5148 Assignment 2
#### Task C2

* Due: 24/05/2019
* Tutor: Paras Sitoula, Wednesday 12-2pm
<br>

| Student 1 | Student 2|
|-----------|----------|
| Hitesh Get | Samuel Campbell |

<br>

<dl>
    <dt><u>Please Note:</u></dt>
    <dd>Obviously, please make sure the producers have been run.</dd>
</dl>

## 1. Import Necessary Modules

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
from datetime import datetime
import json
import geohash as gh
from ast import literal_eval 
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pprint import pprint

## 2. Streaming Application

In [None]:
def sendDataToDB(iter):
    """
    Connects to the database and runs through the data.
    
    First bins the different streams as appropriate.
    
    Then, iterates through the climate data, joining satellites where valid.
    """
    
    client = MongoClient()
    db = client.fit5148_db
    the_bin = db.the_bin
    
    list_climate = []
    list_aqua = []
    list_terra = []
    
    for record in iter:
        data = json.loads(record[1])
        
        if 'Climate' in data.keys():
            data['_id'] = the_bin.count()
            list_climate.append(data)
        elif 'hotspot_aqua' in data.keys():
            list_aqua.append(data)
        elif 'hotspot_terra' in data.keys():
            list_terra.append(data)
            
    # If only the climate stream is encountered, write straight to the database
    if len(list_aqua) + len(list_terra) == 0:
        for climate in list_climate:
            try:
                the_bin.insert(climate)
                #the_bin.insert({'temp': list_temp, 'conf': list_conf})
            except Exception as ex:
                print("Exception Occured. Message: {0}".format(str(ex)))
                
    # Otherwise, for each encountered satellite, checks if a join is valid (using geohash)
    # If so, bins its surface temperature and confidence
    # If a different kind of satellite is encountered, alters those values to the mean of the pair
    else:
        for climate in list_climate:
            climate_geohash = gh.encode(climate['Climate']["latitude"], climate['Climate']["longitude"], precision = 5)
            list_temp = []
            list_conf = []
            hotspot_iter = 0
                        
            for aqua in list_aqua:
                if gh.encode(aqua['hotspot_aqua']["latitude"], aqua['hotspot_aqua']["longitude"], precision = 5) == climate_geohash:
                    list_temp.append(aqua['hotspot_aqua']['surface_temperature_celcius'])
                    list_conf.append(aqua['hotspot_aqua']['confidence'])
                                                                                
            for terra in list_terra:
                if gh.encode(terra['hotspot_terra']["latitude"], terra['hotspot_terra']["longitude"], precision = 5) == climate_geohash:
                    if len(list_temp) == hotspot_iter:
                        list_temp.append(terra['hotspot_terra']['surface_temperature_celcius'])
                        list_conf.append(terra['hotspot_terra']['confidence'])
                    else:
                        list_temp[hotspot_iter] = (list_temp[hotspot_iter] + terra['hotspot_terra']['surface_temperature_celcius']) / 2
                        list_conf[hotspot_iter] = (list_conf[hotspot_iter] + terra['hotspot_terra']['confidence']) / 2
                    
                    hotspot_iter += 1
            
            # Appends hotspot values
            for i in range(0, len(list_temp)):
                climate['hotspot'].append({
                    'surface_temperature_celcius': list_temp[i],
                    'confidence': list_conf[i]
                })
            
            # Writes to database
            try:
                the_bin.insert(climate)
                #the_bin.insert({'temp': list_temp, 'conf': list_conf})
            except Exception as ex:
                print("Exception Occured. Message: {0}".format(str(ex)))
                
    client.close()

# Set window and topic
n_secs = 10
topic = "climate"

# Instantiates Spark processes
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf = conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

# Connects to Kafka
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'climate-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'earliest'})
                        # Group ID is completely arbitrary
    
# Processes RDDs
lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()

time.sleep(60) # Run stream for 1 minute just in case no detection of producer
ssc.stop(stopSparkContext=True, stopGraceFully=True)