# TaskC-2-Stream Processing using Apache Spark Streaming. 

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
import pymongo
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import datetime

###################################################GeoHash algorithm start#################################
__all__ = ['encode','decode','bbox','neighbors']
_base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
_decode_map = {}
_encode_map = {}
for i in range(len(_base32)):
    _decode_map[_base32[i]] = i
    _encode_map[i]=_base32[i]
del i

def decode(geohash):
    lat_range, lon_range = [-90.0, 90.0], [-180.0, 180.0]
    is_lon=True
    for letter in geohash:
        code=str(bin(_decode_map[letter]))[2:].rjust(5,'0')
        for bi in code:
            if is_lon and bi=='0':
                lon_range[1]=sum(lon_range)/2
            elif is_lon and bi=='1':
                lon_range[0]=sum(lon_range)/2
            elif (not is_lon) and bi=='0':
                lat_range[1]=sum(lat_range)/2
            elif (not is_lon) and bi=='1':
                lat_range[0]=sum(lat_range)/2
            is_lon=not is_lon
    return sum(lat_range)/2,sum(lon_range)/2


def neighbors(geohash):
    neighbors=[]
    lat_range,lon_range=180,360
    x,y=decode(geohash)
    num=len(geohash)*5
    dx=lat_range/(2**(num//2))
    dy=lon_range/(2**(num-num//2))
    for i in range(1,-2,-1):
        for j in range(-1,2):
            neighbors.append(encode(x+i*dx,y+j*dy,num//5))
#neighbors.remove(geohash)
    return neighbors

def encode(lat,lon,precision=12):
    lat_range, lon_range = [-90.0, 90.0], [-180.0, 180.0]
    geohash=[]
    code=[]
    j=0
    while len(geohash)<precision:
#print(code,lat_range,lon_range,geohash)
        j+=1
        lat_mid=sum(lat_range)/2
        lon_mid=sum(lon_range)/2

        if lon<=lon_mid:
            code.append(0)
            lon_range[1]=lon_mid
        else:
            code.append(1)
            lon_range[0]=lon_mid

        if lat<=lat_mid:
            code.append(0)
            lat_range[1]=lat_mid
        else:
            code.append(1)
            lat_range[0]=lat_mid
        ##encode
        if len(code)>=5:
            geohash.append(_encode_map[int(''.join(map(str,code[:5])),2)])
            code=code[5:]
    return ''.join(geohash)

def geohash(lat1,lon1,lat2,lon2,precision):
    cordinate1 = encode(lat1,lon1,precision)
    newneighbors = neighbors(cordinate1)
    cordinate2 = encode(lat2,lon2,precision)
    if cordinate2 in newneighbors:
        return True
    else:
        return False
###################################################GeoHash algorithm end#################################

def inserthotspot(array):
    for hotspotdata in array:
        client = MongoClient('mongodb://localhost:27017/')
        db = client.fit5148_db
        climateCollection = db.climate
        hotspotCollection = db.hotspot
        hotspotcount = hotspotCollection.find().count()
        newstr2 = hotspotdata['Creadted_time'].split('T')[0].split('-')
        newDate2 = newstr2[2] + '/' + newstr2[1] + '/' + newstr2[0]
        newHotspot = {"_id":hotspotcount+1,
                   "DateTime":hotspotdata['Creadted_time'],
                   "Date":newDate2,
                   "Latitude":float(hotspotdata['latitude']),
                   "Longitude":float(hotspotdata['longitude']),
                   "Confidence" : int(hotspotdata['confidence']),
                   "Surface Temperature (Celcius)" : int(hotspotdata['surface_temperature_celcius']),
                   "Climate_id":hotspotdata['climate_id']}
        hotspotCollection.insert_one(newHotspot)
        # push Hot_id 
        climateCollection.update_one({"_id":hotspotdata['climate_id']}, {'$push':{"Hotspot_id":hotspotcount+1}})
        print("when get data from three producers, hotspot insert success")
        client.close()

def inserthotspotaftercompare(producerarray,producerarray2):
    for hotspotdata in producerarray2:
        for climatedata in producerarray:
            if geohash(float(climatedata['latitude']),float(climatedata['longitude']),float(hotspotdata['latitude']),float(hotspotdata['longitude']),5):
                client = MongoClient('mongodb://localhost:27017/')
                db = client.fit5148_db
                climateCollection = db.climate
                hotspotCollection = db.hotspot
                hotspotcount = hotspotCollection.find().count()
                newstr2 = hotspotdata['Creadted_time'].split('T')[0].split('-')
                newDate2 = newstr2[2] + '/' + newstr2[1] + '/' + newstr2[0]
                newHotspot = {"_id":hotspotcount+1,
                           "DateTime":hotspotdata['Creadted_time'],
                           "Date":newDate2,
                           "Latitude":float(hotspotdata['latitude']),
                           "Longitude":float(hotspotdata['longitude']),
                           "Confidence" : int(hotspotdata['confidence']),
                           "Surface Temperature (Celcius)" : int(hotspotdata['surface_temperature_celcius']),
                           "Climate_id":climatedata['id']}
                hotspotCollection.insert_one(newHotspot)
                # push Hot_id 
                climateCollection.update_one({"_id":climatedata['id']}, {'$push':{"Hotspot_id":hotspotcount+1}})
                print("when get data from two producers, hotspot insert success")
                client.close()

# function to handle each rdd 
def printdata(rdd):
    print("function start")
    datas = rdd.collect()
    producer1array = []
    producer2array = []
    producer3array = []
    for data in datas:
        
        newdata = eval(data[1])
        # partition data by producer
        if newdata['sender_id'] == 'producer1':
            producer1array.append(newdata)
        if newdata['sender_id'] == 'producer2':
            producer2array.append(newdata)
        if newdata['sender_id'] == 'producer3':
            producer3array.append(newdata)
    # whenever, insert climate into mongodb
    for climatedata in producer1array:
        client = MongoClient('mongodb://localhost:27017/')
        db = client.fit5148_db
        climateCollection = db.climate
        climate_count = climateCollection.find().count()
        newstr = climatedata['Creadted_time'].split('T')[0].split('-')
        newDate = newstr[2] + '/' + newstr[1] + '/' + newstr[0]
        newClimate = {"_id":climate_count+1,
                  "Date":newDate,
                  "Station":climatedata['sender_id'],
                  "Air Temperature(Celcius)":int(climatedata['air_temperature_celcius']),
                  "Relative Humidity":float(climatedata['relative_humidity']),
                  "WindSpeed  (knots)":float(climatedata['windspeed_knots']),
                  "Max Wind Speed":float(climatedata['max_wind_speed']),
                  "Precipitation":climatedata['precipitation'].strip(),
                  "Hotspot_id":[]}
        climateCollection.insert_one(newClimate)
        client.close()
        print("climate insert success")
        climatedata['id'] = climate_count + 1

    
    if (len(producer2array) != 0) and (len(producer3array) != 0):
        print("get data from three producers")
        producer2arraymatch = []
        producer3arraymatch = []
        averageindex1 = []
        averageindex2 = []
        for climatedata in producer1array:
            for hotspotdata in producer2array:
                if geohash(float(climatedata['latitude']),float(climatedata['longitude']),float(hotspotdata['latitude']),float(hotspotdata['longitude']),5):
                    newhotspotdata = hotspotdata
                    newhotspotdata['climate_id'] = climatedata['id']
                    producer2arraymatch.append(newhotspotdata)
                else:
                    print("this produce2 data doesnt match any data in producer1")
        for climatedata in producer1array:
            for hotspotdata in producer3array:
                if geohash(float(climatedata['latitude']),float(climatedata['longitude']),float(hotspotdata['latitude']),float(hotspotdata['longitude']),5):
                    newhotspotdata = hotspotdata
                    newhotspotdata['climate_id'] = climatedata['id']
                    producer3arraymatch.append(newhotspotdata)
                else:
                    print("this produce3 data doesnt match any data in producer1")
        print("check matched producer2 and producer3")
        print(len(producer2arraymatch))
        print(len(producer3arraymatch))
        for index1 in range(len(producer2arraymatch)):
            for index2 in range(len(producer3arraymatch)):
                hotspotdata = producer2arraymatch[index1]
                hotspotdata2 = producer3arraymatch[index2]
                if hotspotdata['climate_id'] == hotspotdata2['climate_id']:
                    if geohash(float(hotspotdata['latitude']),float(hotspotdata['longitude']),float(hotspotdata2['latitude']),float(hotspotdata2['longitude']),5):
                        if not(index1 in averageindex1):
                            averageindex1.append(index1)
                        if not(index2 in averageindex2):
                            averageindex2.append(index2)
                        newconfidence = int((int(hotspotdata['confidence'])+int(hotspotdata2['confidence']))/2)
                        newtemp = int((int(hotspotdata['surface_temperature_celcius'])+int(hotspotdata2['surface_temperature_celcius']))/2)
                        client = MongoClient('mongodb://localhost:27017/')
                        db = client.fit5148_db
                        climateCollection = db.climate
                        hotspotCollection = db.hotspot
                        hotspotcount = hotspotCollection.find().count()
                        newstr2 = hotspotdata['Creadted_time'].split('T')[0].split('-')
                        newDate2 = newstr2[2] + '/' + newstr2[1] + '/' + newstr2[0]
                        newHotspot = {"_id":hotspotcount+1,
                                   "DateTime":hotspotdata['Creadted_time'],
                                   "Date":newDate2,
                                   "Latitude":float(hotspotdata['latitude']),
                                   "Longitude":float(hotspotdata['longitude']),
                                   "Confidence" : newconfidence,
                                   "Surface Temperature (Celcius)" : newtemp,
                                   "Climate_id":hotspotdata['climate_id']}
                        hotspotCollection.insert_one(newHotspot)
                        # push Hot_id 
                        climateCollection.update_one({"_id":hotspotdata['climate_id']}, {'$push':{"Hotspot_id":hotspotcount+1}})
                        print("hotspot average insert success")
                        client.close()
                    else:
                        print("not same location")
                else:
                    print("not belongs to same climate data")
                      
        print("drop same location")
        producer2left = []
        producer3left = []
        for index in range(len(producer2arraymatch)):
            if not(index in averageindex1):
                producer2left.append(producer2arraymatch[index])
            else:
                print("this index of producer2 is already insert")
        for index in range(len(producer3arraymatch)):
            if not(index in averageindex2):
                producer3left.append(producer3arraymatch[index])
            else:
                print("this index of producer3 is already insert")
        print("save producer2 data which dont have same location in producer3")    
        inserthotspot(producer2left)
        print("save producer3 data which dont have same location in producer2")    
        inserthotspot(producer3left)       
        print("process three producers data end")
    else:
        if len(producer2array) != 0:#only producer2 has data
            inserthotspotaftercompare(producer1array,producer2array)        
        elif len(producer3array) != 0:#only producer3 has data
            inserthotspotaftercompare(producer1array,producer3array) 
        else:
            print("only receive data from producer 1")

n_secs = 10
topic = "topic1"
startnumber = 366
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'fit5148-assignment2', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rec: printdata(rec))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)