## TaskC.2 Stream Processing 
Processing stream data from Kafka producers by Apache Spark Streaming.
Part of codes are taken from Week12_Spark_Stream_Processor.ipynb.

This application use Hash Join to implement streaming join function.

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime

# Drop test value
client = MongoClient()
db = client.fit5148_assignment
collStream = db.stream
collStream.drop()
client.close()

In [2]:
# install python-geohash to implement geo-hashing algorithm.
# !pip3 install python-geohash

import Geohash
print ('Geohash for 42.6, -5.6:', Geohash.encode(42.6, -5.6, precision=5))

Geohash for 42.6, -5.6: ezs42


In [3]:
def climate_handler(geohash_dictionary, climate_temp, data):
    climate = {}
    climate['climate_station'] = str(data.get('station'))
    climate['date'] = datetime.strptime(data.get('create_date'),'%Y/%m/%d %H:%M:%S')
    climate['air_temperature_celcius'] = float(data.get('air_temperature_celcius'))
    climate['relative_humidity'] = float(data.get('relative_humidity'))
    climate['windspeed'] = float(data.get('windspeed_knots'))
    climate['max_wind_speed'] = float(data.get('max_wind_speed'))
    climate['precipitation'] = float(data.get('precipitation ')[:-1]) # mind the space ' '
    climate['precipitation_flag'] = data.get('precipitation ')[-1]
    climate['longitude'] = float(data.get('longitude'))
    climate['latitude'] = float(data.get('latitude'))
    climate['fire'] = []
    
    # Map hotspot records based on geohash value
    geohash = Geohash.encode(climate['latitude'], climate['longitude'], precision=5)
    
    # For climate records of the same location in one batch interval, 
    # save to database directly without hotspots to avoid data duplication.        
    if geohash in geohash_dictionary:
        climate_temp.append(jsonData)
    else:
        geohash_dictionary.update({geohash:jsonData})
    
def hotspot_handler(geohash_dictionary, data):
    hotspot = {}
    hotspot['hotspot_station'] = str(data.get('station'))
    hotspot['latitude'] = float(data.get('latitude'))
    hotspot['longitude'] = float(data.get('longitude'))
    hotspot['time'] = data.get('create_date')
    hotspot['confidence'] = float(data.get('confidence'))
    hotspot['surface_temperature_celcius'] = float(data.get('surface_temperature_celcius'))

    # Map hotspot records based on geohash value
    geohash = Geohash.encode(hotspot['latitude'], hotspot['longitude'], precision=5)

    # For hotspots for the same location, average the data and save as one record.
    if geohash in geohash_dictionary:

        # data for the last average records in the same location for one interval
        station = geohash_dictionary[geohash]['hotspot_station']
        confidence = geohash_dictionary[geohash]['confidence']  # last data in the same location
        surface_temperature = geohash_dictionary[geohash]['surface_temperature_celcius']

        # average with the new record. There is a maximum of two fires.
        geohash_dictionary[geohash]['hotspot_station'] = station + ', ' + hotspot['hotspot_station']
        geohash_dictionary[geohash]['confidence'] = (confidence + hotspot['confidence'])/2
        geohash_dictionary[geohash]['surface_temperature_celcius'] = (surface_temperature + \
                                                                     hotspot['surface_temperature_celcius'])/2
    else:
        dictionary.update({geohash:jsonData})

In [4]:
def StoreData(data):
    # start connection to database
    client = MongoClient()
    db = client.fit5148_assignment
    collStream = db.stream
    
    # insert records to mongodb
    try:
        collStream.insert_many(data)
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()

In [5]:
# method to retrieve data and send to MongoDB.
def Process(iter):
    
    # initial control variables
    climate_temp = []
    fire_temp = []
    geohash_climate_dict = {}
    geohash_fire_dict = {}
    
    for record in iter:
        key = record[0]
        data = json.loads(record[1])
        
         # retrieve climate records
        if key == 'climate':
            climate_handler(geohash_climate_dict,climate_temp,data)
            
        # retrieve hotspot data to json
        elif key == 'hotspot':
            hotspot_handler(geohash_fire_dict, data)
        
        else:
            raise Exception("No handler for " + key)
            
    # Embed hotspot records to climates according to geo location. Discard hotspots without a match
    for geohash, hotspot in geohash_fire_dict.items():
        if geohash in geohash_climate_dict:
            geohash_climate_dict[geohash]['fire'].append(hotspot)
    
    # Merge cliamte records
    climate_temp = climate_temp + list(geohash_climate_dict.values())

    StoreData(climate_temp)

In [6]:
# configuration for local streaming context with two execution threads and a batch interval of 10 seconds
n_secs = 10

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

topic = 'ClimateData' # choose the topic set by Kafka producers
    
# initialise the kafka stream
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'Assignment', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})

# manipulate each RDD from the kafka stream
lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(Process))

ssc.start()
print("Data receiving...")
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
print("Timeout")
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

Data receiving...


KeyboardInterrupt: 

In [None]:
# Uncomment to check for results.
# Stop streaming first OR run in a new page.

# from pymongo import MongoClient
# from pprint import pprint

# client = MongoClient()
# db = client.fit5148_assignment
# collStream = db.stream

# count = collStream.find().count()
# results = collStream.aggregate(
#     [
#         {"$project": {"date": 1, "latitude":1, "longitude":1, "fire":1,"total_fire":{"$size": "$fire"},"_id": 0 }},
#         {"$sort": {'total_fire':-1}},
#         {"$limit":10}
#     ]
# )

# print("Total records: " + str(count))
# print("Top 10:")
# for r in results:
#     pprint(r)

# client.close()