In [1]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
import json
import pandas as pd
import pygeohash as pgh
import math

In [2]:
client = MongoClient ()
db = client.fit3182_assignment_db
climate_stream = db.climate_stream
climate_stream.delete_many({})

aqua_stream = db.aqua_stream
aqua_stream.delete_many({})

terra_stream = db.terra_stream
terra_stream.delete_many({})

merged_stream = db.merged_stream
merged_stream.delete_many({})

<pymongo.results.DeleteResult at 0x7f3a5fb42d40>

In [3]:
topic_name = 'Assignment'

In [4]:
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('[Demo] Spark Streaming from Kafka into MongoDB')
    .getOrCreate()
)

In [5]:
topic_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic_name)
    .load()
)

In [6]:
output_stream_df = (
    topic_stream_df
    .select(                                      # 1
            topic_stream_df.value.cast('string')
    )
)

In [7]:
def process_climate_data():
    climate_stream_data = climate_stream.find()
    climate_stream_data = list(climate_stream_data)
    climate_stream_data = pd.DataFrame(climate_stream_data)
    climate_stream_data[["latitude","longitude"]]= climate_stream_data[["latitude","longitude"]].astype(float)
    climate_stream_data["encode_location"] = climate_stream_data.apply(lambda x: pgh.encode(x.latitude, x.longitude, precision=3), axis=1)
    climate_stream_data = climate_stream_data.to_dict(orient='records')
    climate_stream.delete_many({})
    climate_stream.insert_many(climate_stream_data)

In [8]:
def process_aqua_data():
    aqua_stream_data = aqua_stream.find()
    aqua_stream_data = list(aqua_stream_data)
    aqua_stream_data = pd.DataFrame(aqua_stream_data)
    aqua_stream_data[["latitude","longitude"]]= aqua_stream_data[["latitude","longitude"]].astype(float)
    aqua_stream_data["encode_location"] = aqua_stream_data.apply(lambda x: pgh.encode(x.latitude, x.longitude, precision=3), axis=1)
    aqua_stream_data["encode_location_pre"] = aqua_stream_data.apply(lambda x: pgh.encode(x.latitude, x.longitude, precision=5), axis=1)
    aqua_stream_data = aqua_stream_data.to_dict(orient='records')
    aqua_stream.delete_many({})
    aqua_stream.insert_many(aqua_stream_data)

In [9]:
def process_terra_data():
    terra_stream_data = terra_stream.find()
    terra_stream_data = list(terra_stream_data)
    terra_stream_data = pd.DataFrame(terra_stream_data)
    terra_stream_data[["latitude","longitude"]]=terra_stream_data[["latitude","longitude"]].astype(float)
    terra_stream_data["encode_location"] = terra_stream_data.apply(lambda x: pgh.encode(x.latitude, x.longitude, precision=3), axis=1)
    terra_stream_data["encode_location_pre"] = terra_stream_data.apply(lambda x: pgh.encode(x.latitude, x.longitude, precision=5), axis=1)
    terra_stream_data = terra_stream_data.to_dict(orient='records')
    terra_stream.delete_many({})
    terra_stream.insert_many(terra_stream_data)

In [10]:
def process_fire_cause():
    merged_stream_data = merged_stream.find()
    merged_stream_data = list(merged_stream_data)
    merged_stream_data = pd.DataFrame(merged_stream_data)

    for i in range(merged_stream_data.shape[0]):
        if type(merged_stream_data['Hotspot'][i]) == list:
            if merged_stream_data['air_temperature_celcius'][i] > 20 and merged_stream_data['GHI_w/m2'][i] > 180 :
                merged_stream_data.loc[i,'fire_cause'] = "natural"
            else:
                merged_stream_data.loc[i,'fire_cause'] = "other"
        else:
            merged_stream_data.loc[i,'fire_cause'] = None
            
    merged_stream_data = merged_stream_data.to_dict(orient='records')
    merged_stream.delete_many({})
    merged_stream.insert_many(merged_stream_data)

In [11]:
def process_compare():
    merged_stream_data = merged_stream.find()
    merged_stream_data = list(merged_stream_data)
    merged_stream_data = pd.DataFrame(merged_stream_data)


    hotspots = merged_stream_data['Hotspot']


    for i in range(len(hotspots)):
        if type(hotspots[i]) == list:
            hotspots_list = hotspots[i]
            if len(hotspots_list) > 1:
                for j in range(0, len(hotspots_list)-2):
                    if hotspots_list[j]['encode_location_pre'] == hotspots_list[j+1]['encode_location_pre']:
                        hotspots_list[j]['confidence'] = (int(hotspots_list[j]['confidence']) + int(hotspots_list[j+1]['confidence'])) / 2
                        hotspots_list[j]['surface_temperature_celcius'] = (int(hotspots_list[j]['surface_temperature_celcius']) + int(hotspots_list[j+1]['surface_temperature_celcius'])) / 2
                        hotspots_list.pop(j+1)

    merged_stream_data['Hotspot'] = hotspots
    merged_stream_data = merged_stream_data.to_dict(orient='records')
    merged_stream.delete_many({})
    merged_stream.insert_many(merged_stream_data)
    process_fire_cause()
    

In [12]:
def groupdata():
    climate_stream_data = climate_stream.find()
    climate_stream_data = list(climate_stream_data)

    aqua_stream_data = aqua_stream.find()
    aqua_stream_data = list(aqua_stream_data)
    
    terra_stream_data = terra_stream.find()
    terra_stream_data = list(terra_stream_data)
    
    
    if len(climate_stream_data) > 0 and len(aqua_stream_data) and len(terra_stream_data) > 0:
        climate_stream_data = pd.DataFrame(climate_stream_data)
        aqua_stream_data = pd.DataFrame(aqua_stream_data)
        terra_stream_data = pd.DataFrame(terra_stream_data)
        
        merge_hotspot = [aqua_stream_data,terra_stream_data]
        merge_hotspot = pd.concat(merge_hotspot)
        
        grouped = ( merge_hotspot.groupby(['encode_location'])
                              .apply(lambda x: x[['latitude','longitude','datetime', 'confidence','surface_temperature_celcius','key','encode_location_pre']].to_dict('r'))
                              .reset_index()
                              .rename(columns={0:'Hotspot'}))
        
        df = climate_stream_data.merge(grouped, on = 'encode_location', how = 'left')
        
        merged_data = df.to_dict(orient='records')
        merged_stream.delete_many({})
        merged_stream.insert_many(merged_data)
        process_compare()

In [13]:
def process_batch(batch_df,batch_id):
    raw_data = batch_df.collect()
    

    for x in raw_data:
        str_data = x.asDict()
        json_data = json.loads(str_data['value']) 
        if json_data['key'] == 'Producer01':
            climate_stream.insert_one(json_data)
            process_climate_data()
        elif json_data['key'] == 'Producer02':
            aqua_stream.insert_one(json_data)
            process_aqua_data()
            groupdata()

        else:
            terra_stream.insert_one(json_data)
            process_terra_data()
            groupdata()

    
    

In [14]:
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(process_batch)
)

In [None]:
try:
    query = db_writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()