In [None]:
from pyspark.sql.functions 
from pyspark.sql import SparkSession
from pymongo import MongoClient
import col, split, element_at, when
from collections import Counter
import numpy as np
import datetime
import pymongo
import json

#To decode the geohashes, the module pygeohrash will be used.
!pip install pygeohash #uncomment to install the module
import pygeohash as pgh

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

### Set Up

In [None]:
host_ip = "118.139.86.28"

topics = "climateTopic, hotspotTopic"

#create spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Fires Data')
    .getOrCreate()
)

#set up MongoDB
client = MongoClient(host_ip, 27017)
db = client.fit3182_assignment_db

#storing in new collections
climate = db.climate_stream
hotspot = db.hotspot_stream

In [None]:
#set up spark readStream
kafka_sdf = (
    spark.readStream
    .option("failOnDataLoss", "false")
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribe', topics)
    .load()
)

In [None]:
#only retrieve value from the stream output
fire_sdf = kafka_sdf.select('value')

In [None]:
def average_hotspot(hpList):
    """
    hpList: list of hotspots with the same geohash5 location
    
    This function is to calculate the average values of each fields for data 
    collected from AQUA and TERRA that are from the same location.
    """
    datetimes = []
    latitude_lst = []
    longitude_lst = []
    confidence_lst = []
    surfaceTemp_lst = []
    
    for hp in hpList:
        datetimes.append(hp["datetime"])
        latitude_lst.append(hp["latitude"])
        longitude_lst.append(hp["longitude"])
        confidence_lst.append(hp["confidence"])
        surfaceTemp_lst.append(hp["surface_temperature"])
    
    meanDT = (np.array(datetimes, dtype='datetime64[s]')
           .view('i8')
           .mean()
           .astype('datetime64[s]'))
    
    doc = {}
    doc["date"] = hpList[0]["date"]
    doc["datetime"] = print(meanDT, 'HEREEE')
    doc["latitude"] = (sum(latitude_lst))/(len(latitude_lst))
    doc["longitude"] = (sum(longitude_lst))/(len(longitude_lst))
    doc["confidence"] = (sum(confidence_lst))/(len(confidence_lst))
    doc["surface_temperature"] = (sum(surfaceTemp_lst))/(len(surfaceTemp_lst))
    
    return doc

In [15]:
from collections import Counter
def process_batch(df, epoch_id):
    data = df.collect()

    #if the stream is empty
    if len(data) == 0:
        return 
    
    climate_row = json.loads(data[0].value)
    
    #check that the first stream is a climate data
    if climate_row["producer_info"] != "climate":
        return
    
    #make our climate doc 
    climate_record = {}
    climate_record["station"] = int("468802")
    climate_record["date"] = climate_row["date"]
    climate_record["latitude"] = climate_row["latitude"]
    climate_record["longitude"] = climate_row["longitude"]
    climate_record["air_temperature"] = climate_row["air_temperature"]
    climate_record["relative_humidity"] = climate_row["relative_humidity"]
    climate_record["windspeed_knots"] = climate_row["windspeed_knots"]
    climate_record["max_wind_speed"] = climate_row["max_wind_speed"]
    climate_record["precipitation"] = climate_row["precipitation"]
    climate_record["prec_val"] = climate_row["prec_val"]
    climate_record["prec_type"] = climate_row["prec_type"]
    climate_record["GHI"] = climate_row["GHI"]
    
    
    #check that climate data isn't the only one that came in the stream
    if len(data) > 1:
        climate_lat = climate_row["latitude"]
        climate_long = climate_row["longitude"]
        climate_geohash = pgh.encode(climate_lat, climate_long, precision = 3)

        climate_date = climate_row["date"][:10]
                    
        hotspotList = []
        hotspot_geo = []
        
        #examine the other docs
        for i in range(1, len(data)):
            currDoc = json.loads(data[i].value)

            #find the location of the current hotspot data we are examining
            temp_lat = currDoc["latitude"]
            temp_long = currDoc["longitude"]
            temp_geo = pgh.encode(temp_lat, temp_long, precision = 3)
            

            if currDoc["producer_info"] != "climate":  
            
                temp_dt = climate_date + currDoc["datetime"][10:]
                
                #compare that hotspot and climate are in the same location
                if temp_geo == climate_geohash:

                    hotspot_doc = {} 
                    hotspot_doc["date"] = climate_row["date"]
                    hotspot_doc["datetime"] = temp_dt
                    hotspot_doc["latitude"] = currDoc["latitude"]
                    hotspot_doc["longitude"] = currDoc["longitude"]
                    hotspot_doc["confidence"] = currDoc["confidence"]
                    hotspot_doc["surface_temperature"] = currDoc["surface_temperature"]
                    
                    #check if the fire is natural or other
                    if climate_record["air_temperature"] > 20 \
                        and climate_record["GHI"]>180:
                        
                        hotspot_doc["fire_cause"]="natural"
                        
                    else:
                        hotspot_doc["fire_cause"]="other"

                    #find the geohash5 of the hotspot data
                    hotspotList.append(hotspot_doc)
                    hotspot_geo.append(pgh.encode(temp_lat,temp_long, precision=5))

                    
        #retrieve a counter of the geohash5 encodings
        #if count is more than 1, there are multiple records
        #in the same location
        geoCodeCount = Counter(hotspot_geo)
        
        #record the documents with same geohash5 encodings
        duplicates = []
        dup_codes = list([code for code in geoCodeCount if geoCodeCount[code]>1])
        
        for geocode in dup_codes:
            for dupIndex in range(len(hotspot_geo)):
                if hotspot_geo[dupIndex] == geocode:
                    
                    #append it to duplicates
                    duplicates.append(hotspotList[dupIndex])
                    
                    #remove the hotspot_doc from the hotspot array
                    hotspotList.remove(hotspotList[dupIndex])
                    
                    #retrieve an average of the hotspot data values
                    newDoc = average_hotspot(duplicates)
                    hotspotList.append(newDoc)
                    
        
        fires = []   
        for h in hotspotList:
            #insert hotspot document into mongo
            hotspot.insert_one(h)

            #array appends all the corresponding hotspot id
            fires.append(h["_id"])

        climate_record["hotspots"] = fires
        
        #insert climate document into mongo
        climate.insert_one(climate_record)
        
    print(data)
        

In [None]:
writer = (fire_sdf.writeStream.format("console")
         .option("checkpointLocation", "./fires_sdf_checkpoints")
         .outputMode('complete')
         .trigger(processingTime = '10 second')
         .foreachBatch(process_batch))

In [None]:
try: 
    query = writer.start()
    query.awaitTermination()  
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()