# Part B : Streaming Application

In [1]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json
import pygeohash as pgh
import pymongo
from pprint import pprint

In [2]:
# function to calculate the geohash of two locations to see if they are close to each other
def geohash_handler(latitude, longitude, n):
    return pgh.encode(latitude, longitude, precision=n)

In [3]:
# similar topic name for producers 1, 2,3
topic_name = "Assignment"

In [4]:
# Initialize our spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('[Streaming from Kafka into MongoDB')
    .getOrCreate()
)

In [5]:
# Create a streaming dataframe with options providing the bootstrap server(s) and topic name.
topic_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic_name)
    .load()
)

In [6]:
client = pymongo.MongoClient()
# instantiating the database
db = client.fit3182_db
climate_collection = db.climate # collection named "climate"
hotspot_collection = db.hotspot # collection names "hotspot"
climate_collection.drop() # remove values inserted in Part A
hotspot_collection.drop()

In [7]:
# functoin to process the batch received
def process_batch(batch_df, batch_id):
    aquas = [] #aqua and terra is a list, because in 10 seconds, there will be 5 instances of aqua/terra, and 1 instance of climate
    terras = []
    climate = {}
    raw_data=batch_df.collect() 
    if len(raw_data) > 0: # if data exists
        for i in range(len(raw_data)): 
            my_bytes = raw_data[i].value # values in byte array format
            my_json = my_bytes.decode("utf8").replace("'", '"') # convert to json format, using double quotes
            data = json.loads(my_json)
            # append based on the sender_id
            if data["sender_id"] == "AQUA_producer": 
                aquas.append(data)
            elif data["sender_id"] == "TERRA_producer":
                terras.append(data)
            elif data["sender_id"] == "climate_producer":
                climate = data
    
    # calculate geohash for climate with aqua
    if climate != {}: # if climate is not None
        hotspot = []
        fire_event = []
        geolat1 = climate["latitude"] # calculate geohash for climate area
        geolong1 = climate["longitude"]
        geohash1 = geohash_handler(geolat1,geolong1,3)
        for aqua in aquas:
            geolat2 = aqua["latitude"] # calculate geohash for aqua
            geolong2 = aqua["longitude"]
            geohash2 = geohash_handler(geolat2,geolong2,3)
            if geohash1 == geohash2: # if similar, assign same created_date and append into hotspot list indicating that a fire has occured
                aqua["created_date"] = climate["created_date"]
                hotspot.append(aqua)
            else:
                # else remove from the original list
                aquas.remove(aqua)
        
        # calculate geohash for climate with terra, using same procedures above
        for terra in terras:
            geolat3 = terra["latitude"]
            geolong3 = terra["longitude"]
            geohash3 = geohash_handler(geolat3,geolong3,3)
            if geohash1 == geohash3:
                terra["created_date"] = climate["created_date"]
                hotspot.append(terra)
            else:
                terras.remove(terra)
        
        # if no hotspots occur for that climate data, insert and return for that batch
        if len(hotspot) == 0:
            climate_collection.insert_one(climate)
            return True
        else:
            # if there are still aqua and terra data to compare
            if(len(aquas)>0 and len(terras)>0):
                # compare the geoash for aqua and terra data
                for aqua in aquas:
                    aqua_lat = aqua["latitude"]
                    aqua_long = aqua["longitude"]
                    hash1 = geohash_handler(aqua_lat,aqua_long,5)
                    for terra in terras:
                        terra_lat = terra["latitude"]
                        terra_long = terra["longitude"]
                        hash2 = geohash_handler(terra_lat,terra_long,5)
                        # if similar, compute and update value
                        # remove from terra, aqua, and hotspot
                        # insert new updated hotspot value into hotspot list 
                        if hash1 == hash2:
                            avg_hotspot = aqua
                            avg_hotspot["confidence"] = (aqua["confidence"] + terra["confidence"]) / 2
                            avg_hotspot['surface_temperature_celcius'] = (
                            aqua['surface_temperature_celcius'] + terra['surface_temperature_celcius']) / 2
                            terras.remove(terra)
                            hotspot.remove(terra)
                            hotspot.remove(aqua)
                            hotspot.append(avg_hotspot)
                            
            # identify the cause of fire                
            for fire in hotspot:
                if climate["air_temperature_celcius"] > 20 and climate["GHI_w/m2"] > 180:
                    fire["cause"] = "natural"
                else:
                    fire["cause"] = "other"
                    
            climate_collection.insert_one(climate)
            for x in hotspot:
                hotspot_collection.insert_one(x)
                
    return True
        

In [8]:
output_stream_df = topic_stream_df.select('value')

In [9]:
# class ClimateWriter:
#     # called at the start of processing each partition in each output micro-batch
#     def open(self, partition_id, epoch_id):
#         self.mongo_client = MongoClient(
#             host='localhost',
#             port=27017
#         )
#         self.db = self.mongo_client['fit3182_db']
#         climate = self.db.climate
#         return True
    
#     # called once per row of the result dataframe
#     # the current code DOES NOT handle duplicate processing
#     #   e.g., query fails and restarts just before current micro-batch was fully inserted
#     def process(self, row):
#         climate.insert(row)
# #         self.db[topic_name].insert_one(row.asDict())

#     def close(self, err):
#         self.mongo_client.close()

In [10]:
# class HotspotWriter:
#     # called at the start of processing each partition in each output micro-batch
#     def open(self, partition_id, epoch_id):
#         self.mongo_client = MongoClient(
#             host='localhost',
#             port=27017
#         )
#         self.db = self.mongo_client['fit3182_db']
#         hotspot = self.db.hotspot
#         return True
    
#     # called once per row of the result dataframe
#     # the current code DOES NOT handle duplicate processing
#     #   e.g., query fails and restarts just before current micro-batch was fully inserted
#     def process(self, row):
#         hotspot.insert_one(row.asDict())
# #         self.db[topic_name].insert_one(row.asDict())

#     def close(self, err):
#         self.mongo_client.close()

In [11]:
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime="10 seconds")
    .foreachBatch(process_batch)
)

In [12]:
writer = db_writer

In [13]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/student/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query


In [None]:
client = MongoClient()
db = client.fit3182_db
climate_collection = db.climate 
m=climate_collection.find()
for a in m:
    pprint(a)

In [None]:
hotspot_collection = db.hotspot # collection names "hotspot"
n = hotspot_collection.find()
for b in n:
    pprint(b)