In [None]:
import Geohash
import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell"

def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_db
    week11 = db.week11
    climate_stream = db.climate_stream
    
    p1, p2, p3 = [], [], []
    
    for record in iter:
        records = json.loads(record[1])
        sender_id = records["sender_id"]
        if sender_id == "producer 1":
            p1.append(records)
        if sender_id == "producer 2":
            p2.append(records)
        if sender_id == "producer 3":
            p3.append(records)
            
    if len(p1) != 0:
        for i in range(len(p1)):
            tmp = {}
            tmp["latitude"] = p1[i]["latitude"]
            tmp["longitude"] = p1[i]["longitude"]
            tmp["air_temperature_celcius"] = p1[i]["air_temperature_celcius"]
            tmp["relative_humidity"] = p1[i]["relative_humidity"]
            tmp["windspeed_knots"] = p1[i]["windspeed_knots"]
            tmp["max_wind_speed"] = p1[i]["max_wind_speed"]
            tmp["precipitation"] = p1[i]["precipitation"]
            tmp["created_time"] = p1[i]["created_time"]
            tmp["sender_id"] = p1[i]["sender_id"]
            climate_stream.insert(tmp)
            
    p2_inserted = []
    p3_inserted = []
    if len(p2) != 0 and len(p3) != 0:
        for i in range(len(p2)):
            for j in range(len(p3)):
                if Geohash.encode(p2[i]["latitude"], p2[i]["longitude"], precision=5) == Geohash.encode(p3[j]["latitude"], p3[j]["longitude"], precision=5):
                    tmp = {}
                    tmp["latitude"] = (p2[i]["latitude"] + p3[j]["latitude"]) / 2
                    tmp["longitude"] = (p2[i]["longitude"] + p3[j]["longitude"]) / 2
                    tmp["confidence"] = (p2[i]["confidence"] + p3[j]["confidence"]) / 2
                    tmp["surface_temperature_celcius"] = (p2[i]["surface_temperature_celcius"] + p3[j]["surface_temperature_celcius"]) / 2
                    tmp["created_time"] = {"producer 2": p2[i]["created_time"], "producer 3": p3[j]["created_time"]}
                    tmp["sender"] = {"producer 2": 1, "producer 3": 1}
                    p2_inserted.append(p2[i])
                    p3_inserted.append(p3[j])
                    week11.insert(tmp)       
     
    if len(p2) != 0 and len(p3) == 0:
        for i in range(len(p2)):
            if p2[i] not in p2_inserted:
                tmp = {}
                tmp["latitude"] = p2[i]["latitude"]
                tmp["longitude"] = p2[i]["longitude"]
                tmp["confidence"] = p2[i]["confidence"]
                tmp["surface_temperature_celcius"] = p2[i]["surface_temperature_celcius"]
                tmp["created_time"] = p2[i]["created_time"]
                tmp["sender"] = {"producer 3": 0, "producer 2": 1}
                week11.insert(tmp)
        
    if len(p3) != 0 and len(p2) == 0:
        for i in range(len(p3)):
            if p3[i] not in p3_inserted:
                tmp = {}
                tmp["latitude"] = p3[i]["latitude"]
                tmp["longitude"] = p3[i]["longitude"]
                tmp["confidence"] = p3[i]["confidence"]
                tmp["surface_temperature_celcius"] = p3[i]["surface_temperature_celcius"]
                tmp["created_time"] = p3[i]["created_time"]
                tmp["sender"] = {"producer 3": 1, "producer 2": 0}
                week11.insert(tmp)
   
    client.close()
    

n_secs = 10
topic = "Scenario061"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        "bootstrap.servers":"127.0.0.1:9092", 
                        "group.id":"week11-group", 
                        "fetch.message.max.bytes":"15728640",
                        "auto.offset.reset":"largest"})

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
ssc.stop(stopSparkContext=True, stopGraceFully=True)