In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pprint import pprint
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash


def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_db
    week13 = db.week13
    climates = []
    fires = []
    print('*'* 40)
    for item in iter.collect():
        if item['sender_id'] == 1:
            climates.append(item)
        else:
            fires.append(item)
            
    
    
    
    for climate in climates:
        climate['Fire'] = []
        climate_geo = geohash.encode(climate['latitude'],climate['longitude'],precision=5)
        for fire in fires:
            fire_geo = geohash.encode(fire['latitude'],fire['longitude'],precision=5)
            if fire_geo in geohash.expand(climate_geo):
                climate['Fire'].append(fire)
        if len(climate['Fire']) >= 2:
            sender_ids = set([x['sender_id'] for x in climate['Fire']])
            confidence_average = 0.0
            surface_temperature_celcius_average = 0.0
            if len(sender_ids) > 1:
                confidence_average = sum([x['confidence'] for x in climate['Fire']])/len(climate['Fire'])
                surface_temperature_celcius_average = sum([x['surface_temperature_celcius'] for x in climate['Fire']])/len(climate['Fire'])
                
                for item in climate['Fire']:
                    item['confidence']  = confidence_average
                    item['surface_temperature_celcius'] = surface_temperature_celcius_average
        week13.insert(climate)
    client.close()
    print('climates = ',  climates)
    print('*'* 40) 
    print('Fires = ',  fires)

n_secs = 60
topic = ['climate','hotspot_AQUA','hotspot_TERRA']


conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc,topic , {
                        'bootstrap.servers':'localhost:9092', 
                        })

lines = kafkaStream.map(lambda x: json.loads(x[1]))
# lines.pprint()
fore = lines.foreachRDD(sendDataToDB)
ssc.start()
ssc.awaitTermination()
# time.sleep(1) # Run stream for 10 minutes just in case no detection of producer
# # ssc.awaitTermination()
# ssc.stop(stopSparkContext=True,stopGraceFully=True)

****************************************




climates =  [{'latitude': -36.293, 'longitude': 146.148, 'air_temperature_celcius': 16, 'relative_humidity': 50.8, 'windspeed_knots': 5.8, 'max_wind_speed': 12, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:21:06', 'Fire': [], '_id': ObjectId('5ceb9e31fdaf38077c996c38')}, {'latitude': -34.282, 'longitude': 142.121, 'air_temperature_celcius': 15, 'relative_humidity': 49.1, 'windspeed_knots': 9.6, 'max_wind_speed': 16.9, 'precipitation ': ' 0.01G', 'sender_id': 1, 'created_at': '18:21:11', 'Fire': [], '_id': ObjectId('5ceb9e31fdaf38077c996c39')}, {'latitude': -37.559, 'longitude': 148.037, 'air_temperature_celcius': 9, 'relative_humidity': 41.1, 'windspeed_knots': 12.7, 'max_wind_speed': 19, 'precipitation ': ' 0.24G', 'sender_id': 1, 'created_at': '18:21:16', 'Fire': [], '_id': ObjectId('5ceb9e31fdaf38077c996c3a')}, {'latitude': -37.294, 'longitude': 141.232, 'air_temperature_celcius': 12, 'relative_humidity': 49, 'windspeed_knots': 4.6, 'max_wind_speed': 8.9, 'precipita

****************************************
climates =  [{'latitude': -36.942, 'longitude': 143.282, 'air_temperature_celcius': 25, 'relative_humidity': 58.3, 'windspeed_knots': 7.1, 'max_wind_speed': 13, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:23:01', 'Fire': [], '_id': ObjectId('5ceb9ea4fdaf38077c996c51')}, {'latitude': -37.805, 'longitude': 144.15, 'air_temperature_celcius': 12, 'relative_humidity': 45, 'windspeed_knots': 10.5, 'max_wind_speed': 16.9, 'precipitation ': ' 0.16G', 'sender_id': 1, 'created_at': '18:23:06', 'Fire': [], '_id': ObjectId('5ceb9ea4fdaf38077c996c52')}, {'latitude': -36.3114, 'longitude': 142.7605, 'air_temperature_celcius': 28, 'relative_humidity': 56.7, 'windspeed_knots': 9.3, 'max_wind_speed': 16.9, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:23:11', 'Fire': [], '_id': ObjectId('5ceb9ea4fdaf38077c996c53')}, {'latitude': -37.444, 'longitude': 148.101, 'air_temperature_celcius': 9, 'relative_humidity': 42, 'windspeed_knot

****************************************
climates =  [{'latitude': -37.719, 'longitude': 142.154, 'air_temperature_celcius': 13, 'relative_humidity': 47, 'windspeed_knots': 7.9, 'max_wind_speed': 15, 'precipitation ': ' 0.02G', 'sender_id': 1, 'created_at': '18:25:01', 'Fire': [], '_id': ObjectId('5ceb9f1cfdaf38077c996c6b')}, {'latitude': -37.642, 'longitude': 149.263, 'air_temperature_celcius': 20, 'relative_humidity': 55.8, 'windspeed_knots': 10.5, 'max_wind_speed': 15.9, 'precipitation ': ' 0.01G', 'sender_id': 1, 'created_at': '18:25:06', 'Fire': [], '_id': ObjectId('5ceb9f1cfdaf38077c996c6c')}, {'latitude': -37.329, 'longitude': 143.136, 'air_temperature_celcius': 21, 'relative_humidity': 47, 'windspeed_knots': 13.2, 'max_wind_speed': 20, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:25:11', 'Fire': [], '_id': ObjectId('5ceb9f1cfdaf38077c996c6d')}, {'latitude': -37.461, 'longitude': 148.103, 'air_temperature_celcius': 10, 'relative_humidity': 45.9, 'windspeed_knots

****************************************
climates =  [{'latitude': -35.954, 'longitude': 141.076, 'air_temperature_celcius': 10, 'relative_humidity': 42.1, 'windspeed_knots': 9.4, 'max_wind_speed': 15, 'precipitation ': ' 0.13B', 'sender_id': 1, 'created_at': '18:27:01', 'Fire': [], '_id': ObjectId('5ceb9f94fdaf38077c996c85')}, {'latitude': -37.7052, 'longitude': 144.6926, 'air_temperature_celcius': 17, 'relative_humidity': 52.7, 'windspeed_knots': 7.7, 'max_wind_speed': 14, 'precipitation ': ' 0.00G', 'sender_id': 1, 'created_at': '18:27:06', 'Fire': [], '_id': ObjectId('5ceb9f94fdaf38077c996c86')}, {'latitude': -36.4553, 'longitude': 142.8786, 'air_temperature_celcius': 19, 'relative_humidity': 55.3, 'windspeed_knots': 6.2, 'max_wind_speed': 12, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:27:11', 'Fire': [], '_id': ObjectId('5ceb9f94fdaf38077c996c87')}, {'latitude': -36.341, 'longitude': 143.636, 'air_temperature_celcius': 16, 'relative_humidity': 47.5, 'windspeed_k

****************************************
climates =  [{'latitude': -37.332, 'longitude': 148.091, 'air_temperature_celcius': 8, 'relative_humidity': 39.3, 'windspeed_knots': 7.7, 'max_wind_speed': 14, 'precipitation ': ' 0.00G', 'sender_id': 1, 'created_at': '18:29:01', 'Fire': [], '_id': ObjectId('5ceba00cfdaf38077c996c9f')}, {'latitude': -37.978, 'longitude': 145.623, 'air_temperature_celcius': 21, 'relative_humidity': 59.5, 'windspeed_knots': 12.4, 'max_wind_speed': 21, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:29:06', 'Fire': [], '_id': ObjectId('5ceba00cfdaf38077c996ca0')}, {'latitude': -37.608, 'longitude': 149.292, 'air_temperature_celcius': 15, 'relative_humidity': 44.6, 'windspeed_knots': 9.7, 'max_wind_speed': 12, 'precipitation ': ' 0.02G', 'sender_id': 1, 'created_at': '18:29:11', 'Fire': [], '_id': ObjectId('5ceba00cfdaf38077c996ca1')}, {'latitude': -36.152, 'longitude': 143.578, 'air_temperature_celcius': 15, 'relative_humidity': 56.1, 'windspeed_knots

****************************************
climates =  [{'latitude': -37.406, 'longitude': 148.088, 'air_temperature_celcius': 12, 'relative_humidity': 43.9, 'windspeed_knots': 12, 'max_wind_speed': 15.9, 'precipitation ': ' 0.08G', 'sender_id': 1, 'created_at': '18:31:01', 'Fire': [], '_id': ObjectId('5ceba084fdaf38077c996cb9')}, {'latitude': -37.858, 'longitude': 143.428, 'air_temperature_celcius': 13, 'relative_humidity': 50.4, 'windspeed_knots': 7.9, 'max_wind_speed': 14, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:31:06', 'Fire': [], '_id': ObjectId('5ceba084fdaf38077c996cba')}, {'latitude': -37.367, 'longitude': 148.04, 'air_temperature_celcius': 12, 'relative_humidity': 47.6, 'windspeed_knots': 9.1, 'max_wind_speed': 13, 'precipitation ': ' 0.12G', 'sender_id': 1, 'created_at': '18:31:11', 'Fire': [], '_id': ObjectId('5ceba084fdaf38077c996cbb')}, {'latitude': -36.758, 'longitude': 145.19, 'air_temperature_celcius': 20, 'relative_humidity': 58.4, 'windspeed_knots'

****************************************
climates =  [{'latitude': -37.399, 'longitude': 148.081, 'air_temperature_celcius': 14, 'relative_humidity': 49.9, 'windspeed_knots': 13.7, 'max_wind_speed': 22, 'precipitation ': ' 0.03G', 'sender_id': 1, 'created_at': '18:33:01', 'Fire': [{'latitude': -37.406, 'longitude': 148.088, 'confidence': 100, 'surface_temperature_celcius': 48, 'sender_id': 2, 'created_at': '18:33:39'}], '_id': ObjectId('5ceba0fcfdaf38077c996cd3')}, {'latitude': -37.335, 'longitude': 148.064, 'air_temperature_celcius': 8, 'relative_humidity': 41, 'windspeed_knots': 11, 'max_wind_speed': 16.9, 'precipitation ': ' 0.47G', 'sender_id': 1, 'created_at': '18:33:06', 'Fire': [], '_id': ObjectId('5ceba0fcfdaf38077c996cd4')}, {'latitude': -37.609, 'longitude': 149.32, 'air_temperature_celcius': 16, 'relative_humidity': 48.3, 'windspeed_knots': 9.4, 'max_wind_speed': 14, 'precipitation ': ' 0.01G', 'sender_id': 1, 'created_at': '18:33:11', 'Fire': [], '_id': ObjectId('5ceba0fcfd

****************************************
climates =  [{'latitude': -36.098, 'longitude': 143.735, 'air_temperature_celcius': 17, 'relative_humidity': 58.1, 'windspeed_knots': 11.7, 'max_wind_speed': 19, 'precipitation ': ' 0.04G', 'sender_id': 1, 'created_at': '18:35:01', 'Fire': [], '_id': ObjectId('5ceba174fdaf38077c996ced')}, {'latitude': -36.8264, 'longitude': 142.6138, 'air_temperature_celcius': 15, 'relative_humidity': 48.2, 'windspeed_knots': 14.7, 'max_wind_speed': 25.1, 'precipitation ': ' 0.43G', 'sender_id': 1, 'created_at': '18:35:06', 'Fire': [], '_id': ObjectId('5ceba174fdaf38077c996cee')}, {'latitude': -37.384, 'longitude': 148.056, 'air_temperature_celcius': 9, 'relative_humidity': 43.2, 'windspeed_knots': 4.9, 'max_wind_speed': 8, 'precipitation ': ' 0.00I', 'sender_id': 1, 'created_at': '18:35:11', 'Fire': [], '_id': ObjectId('5ceba174fdaf38077c996cef')}, {'latitude': -37.45, 'longitude': 148.097, 'air_temperature_celcius': 9, 'relative_humidity': 41, 'windspeed_knots

****************************************
climates =  [{'latitude': -37.238, 'longitude': 141.145, 'air_temperature_celcius': 8, 'relative_humidity': 41.6, 'windspeed_knots': 8.3, 'max_wind_speed': 15.9, 'precipitation ': ' 0.24G', 'sender_id': 1, 'created_at': '18:37:01', 'Fire': [], '_id': ObjectId('5ceba1ecfdaf38077c996d07')}, {'latitude': -37.448, 'longitude': 148.114, 'air_temperature_celcius': 10, 'relative_humidity': 44.4, 'windspeed_knots': 5.6, 'max_wind_speed': 11.1, 'precipitation ': ' 0.12G', 'sender_id': 1, 'created_at': '18:37:06', 'Fire': [], '_id': ObjectId('5ceba1ecfdaf38077c996d08')}, {'latitude': -36.8867, 'longitude': 142.1873, 'air_temperature_celcius': 14, 'relative_humidity': 39.3, 'windspeed_knots': 17.7, 'max_wind_speed': 30.9, 'precipitation ': ' 0.01G', 'sender_id': 1, 'created_at': '18:37:11', 'Fire': [], '_id': ObjectId('5ceba1ecfdaf38077c996d09')}, {'latitude': -37.62, 'longitude': 149.294, 'air_temperature_celcius': 21, 'relative_humidity': 60.4, 'windspee

****************************************
climates =  []
****************************************
Fires =  [{'latitude': -37.164, 'longitude': 148.915, 'confidence': 55, 'surface_temperature_celcius': 42, 'sender_id': 2, 'created_at': '18:45:02'}, {'latitude': -35.6469, 'longitude': 142.376, 'confidence': 100, 'surface_temperature_celcius': 93, 'sender_id': 2, 'created_at': '18:45:13'}, {'latitude': -36.6612, 'longitude': 143.8726, 'confidence': 76, 'surface_temperature_celcius': 50, 'sender_id': 2, 'created_at': '18:45:35'}, {'latitude': -36.9827, 'longitude': 141.4064, 'confidence': 62, 'surface_temperature_celcius': 41, 'sender_id': 3, 'created_at': '18:45:04'}, {'latitude': -37.4796, 'longitude': 141.9403, 'confidence': 94, 'surface_temperature_celcius': 87, 'sender_id': 3, 'created_at': '18:45:23'}, {'latitude': -36.6368, 'longitude': 144.8346, 'confidence': 77, 'surface_temperature_celcius': 50, 'sender_id': 3, 'created_at': '18:45:36'}, {'latitude': -36.797, 'longitude': 141.3621

****************************************
climates =  []
****************************************
Fires =  [{'latitude': -37.4463, 'longitude': 142.7829, 'confidence': 92, 'surface_temperature_celcius': 70, 'sender_id': 2, 'created_at': '18:53:08'}, {'latitude': -36.6329, 'longitude': 142.9903, 'confidence': 57, 'surface_temperature_celcius': 43, 'sender_id': 2, 'created_at': '18:53:22'}, {'latitude': -37.463, 'longitude': 148.109, 'confidence': 62, 'surface_temperature_celcius': 34, 'sender_id': 2, 'created_at': '18:53:47'}, {'latitude': -37.903, 'longitude': 145.25, 'confidence': 53, 'surface_temperature_celcius': 44, 'sender_id': 2, 'created_at': '18:53:57'}, {'latitude': -37.8131, 'longitude': 143.1175, 'confidence': 69, 'surface_temperature_celcius': 45, 'sender_id': 3, 'created_at': '18:53:07'}, {'latitude': -37.6439, 'longitude': 142.913, 'confidence': 93, 'surface_temperature_celcius': 83, 'sender_id': 3, 'created_at': '18:53:22'}, {'latitude': -36.3739, 'longitude': 141.108, 'c

****************************************
climates =  []
****************************************
Fires =  [{'latitude': -36.3296, 'longitude': 141.7522, 'confidence': 63, 'surface_temperature_celcius': 41, 'sender_id': 2, 'created_at': '19:02:14'}, {'latitude': -36.4347, 'longitude': 143.5704, 'confidence': 63, 'surface_temperature_celcius': 42, 'sender_id': 2, 'created_at': '19:02:37'}, {'latitude': -36.6986, 'longitude': 142.7259, 'confidence': 86, 'surface_temperature_celcius': 72, 'sender_id': 2, 'created_at': '19:02:53'}, {'latitude': -37.5158, 'longitude': 142.8257, 'confidence': 64, 'surface_temperature_celcius': 42, 'sender_id': 3, 'created_at': '19:02:21'}, {'latitude': -36.3984, 'longitude': 144.0271, 'confidence': 85, 'surface_temperature_celcius': 60, 'sender_id': 3, 'created_at': '19:02:47'}, {'latitude': -36.1925, 'longitude': 145.93, 'confidence': 97, 'surface_temperature_celcius': 79, 'sender_id': 3, 'created_at': '19:02:59'}]
****************************************
cl