In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash as gh

def sendDataToDB(climate_data):
    a = 0
    client = MongoClient()
    db = client.fit3182_assignment_db
    collection = db.Mycollection
    
    try:
        collection.insert(climate_data)
        print("Record is inserted successfully")
        print(climate_data)
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()

#this function is used to average out surface temp and confidence 
#once a terra_data has been used for averaging, it shouldn't be reused
def avgSameLocation(aqua_data, terra_data):
    acc = 1
    averaged = [True for _ in range(len(terra_data))]
    for aqua in aqua_data:
        for j in range(len(terra_data)):
            if gh.encode(aqua["latitude"], aqua["longitude"], 5) == gh.encode(terra_data[j]["latitude"], terra_data[j]["longitude"], 5) and averaged[j]:
                aqua["surface_temperature_celcius"] = aqua["surface_temperature_celcius"] + terra_data[j]["surface_temperature_celcius"]
                aqua["confidence"] = aqua["confidence"] + terra_data[j]["confidence"]
                averaged[j] = False
                acc += 1
        
        aqua["surface_temperature_celcius"] = aqua["surface_temperature_celcius"] / acc
        aqua["confidence"] = aqua["confidence"] / acc
        acc = 1
        
    terra_data = [terra_data[i] for i in range(len(terra_data)) if averaged[i]]
    return terra_data

def isNearClimate(climate_data, hotspot_list):
    geohash_climate = gh.encode(climate_data["latitude"], climate_data["longitude"], 3)
    
    for data in hotspot_list:
        if geohash_climate == gh.encode(data["latitude"], data["longitude"], 3):
            data["datetime"] = climate_data["date"] + data["time"] #reformat datetime
            climate_data["hotspot_histories"].append(data)

        
def processData(iterable):
    a = 0
    climate_data = None
    aqua_data = []
    terra_data = []
    
    for data in iterable:
        a += 1
        print("a = ", a)
        if data[0] == '1': 
            climate_data = data[1][-1]  #data from producer 1
        elif data[0] == '2': 
            aqua_data = data[1]  #data from producer 2
        else:
            terra_data = data[1]  #data from producer 3  
    
#   if there's climate data in this batch
    if climate_data != None:
        terra_data = avgSameLocation(aqua_data, terra_data) #average out data from two different satellites with same location
        climate_data["hotspot_histories"] = [] #new attribute for climate data, operations below will populate this list
        isNearClimate(climate_data, aqua_data)  #check if location of hotspot data is near climate data
        isNearClimate(climate_data, terra_data)  #check if location of hotspot data is near climate data
        sendDataToDB(climate_data)
        
        
        
    

if __name__ == "__main__":
    #2 execution threads and batch interval of 10 seconds
    batch_interval = 10
    conf = SparkConf().setAppName("AssignmentPartB").setMaster("local[2]")
    sc = SparkContext.getOrCreate()
    if sc is None:
        sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")
    
    ssc = StreamingContext(sc, batch_interval)

    
    topics = ["climate", "terra", "aqua"]
    kafkaStream = KafkaUtils.createDirectStream(ssc, topics, {
                        'bootstrap.servers':'localhost:9092', 
                        'group.id':'assignment_partb', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
    
    #DStream         
    groups = kafkaStream.mapValues(json.loads).groupByKey().map(lambda x: (x[0], list(x[1]))) #group each data by producer_id, each data will now look like (id, [data1...datan])
    groups.pprint()
    finalStream = groups.repartition(1).foreachRDD(lambda rdd: rdd.foreachPartition(processData))
    
    ssc.start()
    ssc.awaitTermination()
    ssc.stop(stopSparkContext=True,stopGraceFully=True)

-------------------------------------------
Time: 2020-06-11 11:13:50
-------------------------------------------
('1', [{'latitude': -36.779, 'longitude': 146.108, 'air_temperature_celcius': 15, 'relative_humidity': 51.0, 'windspeed_knots': 9.6, 'max_wind_speed': 15.9, 'precipitation ': ' 0.16G', 'producer': 1, 'date': '2019-1-6'}])
('3', [{'latitude': -37.6505, 'longitude': 149.2134, 'confidence': 68, 'surface_temperature_celcius': 44, 'producer': 3, 'time': 'T11:13:43'}, {'latitude': -37.4343, 'longitude': 147.6327, 'confidence': 100, 'surface_temperature_celcius': 92, 'producer': 3, 'time': 'T11:13:45'}, {'latitude': -37.732, 'longitude': 148.6059, 'confidence': 87, 'surface_temperature_celcius': 62, 'producer': 3, 'time': 'T11:13:47'}, {'latitude': -37.7361, 'longitude': 145.335, 'confidence': 79, 'surface_temperature_celcius': 52, 'producer': 3, 'time': 'T11:13:49'}])
('2', [{'latitude': -36.7791, 'longitude': 141.5711, 'confidence': 72, 'surface_temperature_celcius': 46, 'produc

-------------------------------------------
Time: 2020-06-11 11:14:40
-------------------------------------------
('1', [{'latitude': -37.954, 'longitude': 143.918, 'air_temperature_celcius': 20, 'relative_humidity': 59.7, 'windspeed_knots': 8.6, 'max_wind_speed': 15.9, 'precipitation ': ' 0.01G', 'producer': 1, 'date': '2019-1-11'}])
('3', [{'latitude': -36.2283, 'longitude': 146.3102, 'confidence': 65, 'surface_temperature_celcius': 42, 'producer': 3, 'time': 'T11:14:31'}, {'latitude': -37.9373, 'longitude': 146.0744, 'confidence': 56, 'surface_temperature_celcius': 39, 'producer': 3, 'time': 'T11:14:33'}, {'latitude': -37.639, 'longitude': 142.0648, 'confidence': 79, 'surface_temperature_celcius': 52, 'producer': 3, 'time': 'T11:14:35'}, {'latitude': -37.561, 'longitude': 143.24200000000002, 'confidence': 51, 'surface_temperature_celcius': 38, 'producer': 3, 'time': 'T11:14:37'}, {'latitude': -36.3269, 'longitude': 144.0871, 'confidence': 67, 'surface_temperature_celcius': 43, 'prod

-------------------------------------------
Time: 2020-06-11 11:15:30
-------------------------------------------
('1', [{'latitude': -36.8835, 'longitude': 142.2098, 'air_temperature_celcius': 11, 'relative_humidity': 41.6, 'windspeed_knots': 7.9, 'max_wind_speed': 15.0, 'precipitation ': ' 0.01G', 'producer': 1, 'date': '2019-1-16'}])
('3', [{'latitude': -37.3655, 'longitude': 148.2821, 'confidence': 85, 'surface_temperature_celcius': 59, 'producer': 3, 'time': 'T11:15:21'}, {'latitude': -36.085, 'longitude': 145.9096, 'confidence': 64, 'surface_temperature_celcius': 42, 'producer': 3, 'time': 'T11:15:23'}, {'latitude': -36.4606, 'longitude': 141.0562, 'confidence': 55, 'surface_temperature_celcius': 39, 'producer': 3, 'time': 'T11:15:25'}, {'latitude': -37.7361, 'longitude': 145.335, 'confidence': 79, 'surface_temperature_celcius': 52, 'producer': 3, 'time': 'T11:15:27'}, {'latitude': -37.635, 'longitude': 149.303, 'confidence': 52, 'surface_temperature_celcius': 47, 'producer': 3, 

-------------------------------------------
Time: 2020-06-11 11:16:20
-------------------------------------------
('1', [{'latitude': -36.748000000000005, 'longitude': 144.168, 'air_temperature_celcius': 11, 'relative_humidity': 48.2, 'windspeed_knots': 10.8, 'max_wind_speed': 22.9, 'precipitation ': ' 0.59G', 'producer': 1, 'date': '2019-1-21'}])
('3', [{'latitude': -36.304, 'longitude': 143.7445, 'confidence': 59, 'surface_temperature_celcius': 40, 'producer': 3, 'time': 'T11:16:11'}, {'latitude': -38.0693, 'longitude': 142.9722, 'confidence': 100, 'surface_temperature_celcius': 89, 'producer': 3, 'time': 'T11:16:13'}, {'latitude': -37.9071, 'longitude': 143.53799999999998, 'confidence': 100, 'surface_temperature_celcius': 88, 'producer': 3, 'time': 'T11:16:15'}, {'latitude': -36.0973, 'longitude': 143.4279, 'confidence': 87, 'surface_temperature_celcius': 92, 'producer': 3, 'time': 'T11:16:17'}, {'latitude': -37.368, 'longitude': 148.05, 'confidence': 79, 'surface_temperature_celciu

-------------------------------------------
Time: 2020-06-11 11:17:10
-------------------------------------------
('1', [{'latitude': -37.448, 'longitude': 148.114, 'air_temperature_celcius': 10, 'relative_humidity': 44.4, 'windspeed_knots': 5.6, 'max_wind_speed': 11.1, 'precipitation ': ' 0.12G', 'producer': 1, 'date': '2019-1-26'}])
('3', [{'latitude': -35.9701, 'longitude': 145.7061, 'confidence': 71, 'surface_temperature_celcius': 46, 'producer': 3, 'time': 'T11:17:1'}, {'latitude': -36.4466, 'longitude': 141.2471, 'confidence': 51, 'surface_temperature_celcius': 40, 'producer': 3, 'time': 'T11:17:3'}, {'latitude': -37.4421, 'longitude': 148.259, 'confidence': 56, 'surface_temperature_celcius': 39, 'producer': 3, 'time': 'T11:17:6'}, {'latitude': -36.8264, 'longitude': 142.6138, 'confidence': 77, 'surface_temperature_celcius': 50, 'producer': 3, 'time': 'T11:17:8'}])
('2', [{'latitude': -37.5243, 'longitude': 143.0437, 'confidence': 60, 'surface_temperature_celcius': 40, 'producer'

-------------------------------------------
Time: 2020-06-11 11:18:00
-------------------------------------------
('1', [{'latitude': -38.231, 'longitude': 147.172, 'air_temperature_celcius': 24, 'relative_humidity': 61.6, 'windspeed_knots': 7.7, 'max_wind_speed': 14.0, 'precipitation ': ' 0.00I', 'producer': 1, 'date': '2019-1-31'}])
('3', [{'latitude': -37.8817, 'longitude': 143.7119, 'confidence': 88, 'surface_temperature_celcius': 63, 'producer': 3, 'time': 'T11:17:50'}, {'latitude': -36.4392, 'longitude': 144.2111, 'confidence': 69, 'surface_temperature_celcius': 44, 'producer': 3, 'time': 'T11:17:52'}, {'latitude': -34.8006, 'longitude': 141.6209, 'confidence': 77, 'surface_temperature_celcius': 50, 'producer': 3, 'time': 'T11:17:54'}, {'latitude': -36.0973, 'longitude': 143.4279, 'confidence': 87, 'surface_temperature_celcius': 92, 'producer': 3, 'time': 'T11:17:56'}, {'latitude': -34.8544, 'longitude': 143.1831, 'confidence': 74, 'surface_temperature_celcius': 48, 'producer': 3

-------------------------------------------
Time: 2020-06-11 11:18:50
-------------------------------------------
('1', [{'latitude': -37.382, 'longitude': 149.341, 'air_temperature_celcius': 18, 'relative_humidity': 53.6, 'windspeed_knots': 7.2, 'max_wind_speed': 15.0, 'precipitation ': ' 0.00I', 'producer': 1, 'date': '2019-2-5'}])
('3', [{'latitude': -36.2367, 'longitude': 141.3998, 'confidence': 65, 'surface_temperature_celcius': 42, 'producer': 3, 'time': 'T11:18:40'}, {'latitude': -36.4617, 'longitude': 144.6381, 'confidence': 70, 'surface_temperature_celcius': 48, 'producer': 3, 'time': 'T11:18:42'}, {'latitude': -37.5629, 'longitude': 144.752, 'confidence': 85, 'surface_temperature_celcius': 59, 'producer': 3, 'time': 'T11:18:44'}, {'latitude': -37.8701, 'longitude': 142.8066, 'confidence': 78, 'surface_temperature_celcius': 51, 'producer': 3, 'time': 'T11:18:46'}, {'latitude': -37.375, 'longitude': 148.063, 'confidence': 59, 'surface_temperature_celcius': 35, 'producer': 3, 't

KeyboardInterrupt: 

In [3]:
import geohash as gh

la = -36.7313                   
lo = 141.7398
print(gh.encode(la, lo, 3))

la1 = -37.0669
lo1 =141.0556
print(gh.encode(la1, lo1, 3))




r1k
r1k


In [3]:
client = MongoClient()
db = client.fit3182_assignment_db
collection = db.Mycollection

for i in collection.find():
    print(i)

In [2]:
b = [{'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 55.6, 'surface_temperature_celcius': 62.0, 'producer': 2, 'time': '2020-6-14T9:53:43'}, {'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 12.0, 'surface_temperature_celcius': 53.0, 'producer': 2, 'time': '2020-6-14T9:53:45'}, {'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 41.0, 'surface_temperature_celcius': 64.0, 'producer': 2, 'time': '2020-6-14T9:53:47'}, {'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 56.0, 'surface_temperature_celcius': 12.0, 'producer': 2, 'time': '2020-6-14T9:53:49'}]
print(len(b))
for i in range(len(b)):
    print(b[i])

4
{'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 55.6, 'surface_temperature_celcius': 62.0, 'producer': 2, 'time': '2020-6-14T9:53:43'}
{'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 12.0, 'surface_temperature_celcius': 53.0, 'producer': 2, 'time': '2020-6-14T9:53:45'}
{'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 41.0, 'surface_temperature_celcius': 64.0, 'producer': 2, 'time': '2020-6-14T9:53:47'}
{'latitude': -37.966, 'longitude': 145.05100000000002, 'confidence': 56.0, 'surface_temperature_celcius': 12.0, 'producer': 2, 'time': '2020-6-14T9:53:49'}
-------------------------------------------
Time: 2020-06-11 09:54:30
-------------------------------------------

-------------------------------------------
Time: 2020-06-11 09:54:40
-------------------------------------------

-------------------------------------------
Time: 2020-06-11 09:54:50
-------------------------------------------

-----------------------------