In [1]:
! pip install pygeohash



Import stuff

In [2]:
import json
from pymongo import MongoClient
from pyspark.sql import SparkSession
import pygeohash as pgh
from datetime import datetime
import os
from pprint import pprint
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

Setting up

In [3]:
host_ip = "YOUR IP"

spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Parking Data')
    .getOrCreate()
)

In [4]:
##subscribe to 3 topics
topic_1 = "Climate"
topic_2 = "Aqua"
topic_3 = "Terra"

In [5]:
kafka_sdf = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribe', f"{topic_1},{topic_2},{topic_3}")
    .load()
)

In [6]:
climate_fire_sdf = kafka_sdf.select('value')

This is the process batch function we will use

In [7]:
def process_batch(df, epoch_id):
    ##connect to database
    ip_address = "YOUR IP"
    client = MongoClient(f"{ip_address}", 27017)
    db = client.fit3182_assignment_db
    collection = db.overwatch

    climate_data = None
    terra_array = [None] * 5
    aqua_array = [None] * 5
    data_array = df.collect()
    print("#########NEW BATCH #############\n\n")
    
    ##now to store all of them in some temp space first before processing
    look_up_table = {0 : 0, 4 : 1, 9 : 2, 14 : 3, 19 : 4}
    for data in data_array: ##the value is a byte array
        my_data = json.loads(data.value)
        
        if "climate" in my_data:
            climate_data = my_data ##if we have other climate data, we just replace it
            
        elif "aqua" in my_data:
            ##convert into datetime, store into specific index
            new_date = datetime.strptime(my_data["datetime"], "%d/%m/%Y %H:%M:%S")
            if new_date.hour in look_up_table:
                aqua_array[look_up_table.get(new_date.hour)] = my_data
            
        elif "terra" in my_data:
            new_date = datetime.strptime(my_data["datetime"], "%d/%m/%Y %H:%M:%S")
            if new_date.hour in look_up_table:
                terra_array[look_up_table.get(new_date.hour)] = my_data
            
    ##now fo the real processing, we can skip the processing if we don't have climate data
    if climate_data != None:
        ##some initialization
        climate_datetime = datetime.strptime(climate_data["datetime"], "%d/%m/%Y %H:%M:%S")
        climate_data["climate"]["date"] = climate_datetime
        climate_data["climate"]["station"] = "12345"
        climate_data["climate"]["hotspot_data"] = None 
        
        ##get the geohash for climate data
        climate_geohash_p3 = pgh.encode(climate_data["climate"]["latitude"], climate_data["climate"]["longitude"], precision = 3)
        
        ##processing both hotspot data
        for index in range(len(aqua_array)):
            ##skip processing the counterpart as we will not have a match
            if  aqua_array[index] == None: 
                terra_array[index] == None
                
            elif terra_array[index] == None:
                aqua_array[index] == None
                
            else:
                ##get geohash for both the hotspot data
                aqua_lat = aqua_array[index]["aqua"]["latitude"]
                aqua_long = aqua_array[index]["aqua"]["longitude"]
                terra_lat = terra_array[index]["terra"]["latitude"]
                terra_long = terra_array[index]["terra"]["longitude"]
    
                ##short circuit if aqua geohash is not same as climate geohash
                print(f"Aqua Geohash P3: {pgh.encode(aqua_lat, aqua_long, precision = 3)}")
                print(f"Terra Geohash P3: {pgh.encode(terra_lat, terra_long, precision = 3)}")
                print(f"Climate Geohash P3: {climate_geohash_p3}\n\n")
                if pgh.encode(aqua_lat, aqua_long, precision = 3) == climate_geohash_p3 == pgh.encode(terra_lat, terra_long, precision = 3): ##3 way equality
                    
                    ##proceed to check the geohash on precision 5 for both hotspot data
                    print("Match Found, proceed to check on precision 5 on Terra and Aqua\n")
                    aqua_geohash_p5 = pgh.encode(aqua_lat, aqua_long, precision = 5)
                    terra_geohash_p5 = pgh.encode(terra_lat, terra_long, precision = 5)
                    print(f"Aqua Geohash P5: {aqua_geohash_p5}")
                    print(f"Terra Geohash P5: {terra_geohash_p5}")

                    if aqua_geohash_p5 == terra_geohash_p5:
                        ##register as fire event
                        fire_event = {}
                        print(f"Fire occured at Geohash {aqua_geohash_p5}\n")
                        
                        ##initilization of the new fire event
                        fire_event["confidence"] = (aqua_array[index]["aqua"]["confidence"] + terra_array[index]["terra"]["confidence"]) / 2
                        fire_event["latitude"] = aqua_array[index]["aqua"]["latitude"]
                        fire_event["longitude"] = aqua_array[index]["aqua"]["longitude"]
                        fire_event["surface_temperature_celcius"] = (aqua_array[index]["aqua"]["surface_temperature_celcius"] + terra_array[index]["terra"]["surface_temperature_celcius"]) / 2
                        fire_event["date"] = datetime(climate_datetime.year, climate_datetime.month, climate_datetime.day)
                        
                        ##initialization of datetime for this fire event
                        hotspot_datetime = datetime.strptime(aqua_array[index]["datetime"], "%d/%m/%Y %H:%M:%S")
                        fire_event["datetime"] = datetime(climate_datetime.year, climate_datetime.month, climate_datetime.day, hotspot_datetime.hour, hotspot_datetime.minute, hotspot_datetime.second)
                        
                        ##check whether its a natural fire or random fire
                        if climate_data["climate"]["air_temperature_celcius"] > 20 and climate_data["climate"]["GHI_w/m2"] > 180:
                            fire_event["fire_cause"] = "natural"
                        else:
                            fire_event["fire_cause"] = "other"
                        
                        ##throw them into the climate data
                        if climate_data["climate"]["hotspot_data"] != None:
                            climate_data["climate"]["hotspot_data"].append(fire_event) ##add them in if there were other fire event on this day
                        else:
                            climate_data["climate"]["hotspot_data"] = [fire_event] #create a new array
                        
        ##now to send this climate_data into mongodb
        ##take away the latitude and longitude for climate data
        del climate_data["climate"]["latitude"]
        del climate_data["climate"]["longitude"]
        ##use replace instead if you are running this again
        collection.replace_one({"date": climate_data["climate"]["date"]}, climate_data["climate"], upsert = True)
#         collection.insert_one(climate_data["climate"])
        pprint(climate_data)
                    
    print("#########END BATCH #############\n\n")

In [8]:
writer = (
    climate_fire_sdf.writeStream.format("Console")
    .outputMode('append')
    .trigger(processingTime = "10 seconds")
    .foreachBatch(process_batch)
)

In [9]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()

#########NEW BATCH #############


#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r32
Terra Geohash P3: r1m
Climate Geohash P3: r1m


{'climate': {'GHI_w/m2': 84,
             'air_temperature_celcius': 9,
             'date': datetime.datetime(2023, 1, 11, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 15.0,
             'precipitation ': ' 0.08G',
             'precipitation_type': 'G',
             'precipitation_value': 0.08,
             'relative_humidity': 40.1,
             'station': '12345',
             'windspeed_knots': 12.1},
 'datetime': '11/01/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r33
Terra Geohash P3: r1q
Climate Geohash P3: r1j


Aqua Geohash P3: r1m
Terra Geohash P3: r1q
Climate Geohash P3: r1j


Aqua Geohash P3: r1x
Terra Geohash P3: r32
Climate Geohash P3: r1j


Aqua Geohash P3: r1m
Terra Geohash P3: r1x
Climate Geohash P3: r1j


Aqua G

#########NEW BATCH #############


Aqua Geohash P3: r1k
Terra Geohash P3: r1s
Climate Geohash P3: r1k


Aqua Geohash P3: r1m
Terra Geohash P3: r1k
Climate Geohash P3: r1k


Aqua Geohash P3: r1w
Terra Geohash P3: r1k
Climate Geohash P3: r1k


Aqua Geohash P3: r1t
Terra Geohash P3: r1t
Climate Geohash P3: r1k


Aqua Geohash P3: r1s
Terra Geohash P3: r1k
Climate Geohash P3: r1k


{'climate': {'GHI_w/m2': 111,
             'air_temperature_celcius': 12,
             'date': datetime.datetime(2023, 1, 21, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 8.9,
             'precipitation ': ' 0.00G',
             'precipitation_type': 'G',
             'precipitation_value': 0.0,
             'relative_humidity': 40.4,
             'station': '12345',
             'windspeed_knots': 5.9},
 'datetime': '21/01/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r1u
Terra Geohash P3: r1q
Climate Geohash P3: r36


Aqua Geo

#########NEW BATCH #############


Aqua Geohash P3: r1r
Terra Geohash P3: r1k
Climate Geohash P3: r1s


Aqua Geohash P3: r1p
Terra Geohash P3: r1m
Climate Geohash P3: r1s


Aqua Geohash P3: r1k
Terra Geohash P3: r1s
Climate Geohash P3: r1s


Aqua Geohash P3: r1u
Terra Geohash P3: r33
Climate Geohash P3: r1s


Aqua Geohash P3: r1x
Terra Geohash P3: r1q
Climate Geohash P3: r1s


{'climate': {'GHI_w/m2': 90,
             'air_temperature_celcius': 10,
             'date': datetime.datetime(2023, 1, 31, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 13.0,
             'precipitation ': ' 0.08G',
             'precipitation_type': 'G',
             'precipitation_value': 0.08,
             'relative_humidity': 43.7,
             'station': '12345',
             'windspeed_knots': 9.5},
 'datetime': '31/01/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r1w
Terra Geohash P3: r1m
Climate Geohash P3: r1k


Aqua Ge

#########NEW BATCH #############


Aqua Geohash P3: r1t
Terra Geohash P3: r1m
Climate Geohash P3: r1s


Aqua Geohash P3: r33
Terra Geohash P3: r1w
Climate Geohash P3: r1s


Aqua Geohash P3: r1s
Terra Geohash P3: r1q
Climate Geohash P3: r1s


Aqua Geohash P3: r1m
Terra Geohash P3: r1s
Climate Geohash P3: r1s


Aqua Geohash P3: r1m
Terra Geohash P3: r1m
Climate Geohash P3: r1s


{'climate': {'GHI_w/m2': 196,
             'air_temperature_celcius': 24,
             'date': datetime.datetime(2023, 2, 9, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 15.0,
             'precipitation ': ' 0.00I',
             'precipitation_type': 'I',
             'precipitation_value': 0.0,
             'relative_humidity': 55.5,
             'station': '12345',
             'windspeed_knots': 7.9},
 'datetime': '09/02/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r1m
Terra Geohash P3: r1s
Climate Geohash P3: r1w


Aqua Geo

#########NEW BATCH #############


Aqua Geohash P3: r30
Terra Geohash P3: r1q
Climate Geohash P3: r1m


Aqua Geohash P3: r1r
Terra Geohash P3: r1m
Climate Geohash P3: r1m


Aqua Geohash P3: r1w
Terra Geohash P3: r30
Climate Geohash P3: r1m


Aqua Geohash P3: r1x
Terra Geohash P3: r1x
Climate Geohash P3: r1m


Aqua Geohash P3: r1m
Terra Geohash P3: r1w
Climate Geohash P3: r1m


{'climate': {'GHI_w/m2': 141,
             'air_temperature_celcius': 16,
             'date': datetime.datetime(2023, 2, 19, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 16.9,
             'precipitation ': ' 0.00I',
             'precipitation_type': 'I',
             'precipitation_value': 0.0,
             'relative_humidity': 47.0,
             'station': '12345',
             'windspeed_knots': 12.0},
 'datetime': '19/02/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r1q
Terra Geohash P3: r1t
Climate Geohash P3: r33


Aqua G

#########NEW BATCH #############


Aqua Geohash P3: r1m
Terra Geohash P3: r1n
Climate Geohash P3: r33


Aqua Geohash P3: r1w
Terra Geohash P3: r1m
Climate Geohash P3: r33


Aqua Geohash P3: r1s
Terra Geohash P3: r1q
Climate Geohash P3: r33


Aqua Geohash P3: r1m
Terra Geohash P3: r1q
Climate Geohash P3: r33


Aqua Geohash P3: r1m
Terra Geohash P3: r1k
Climate Geohash P3: r33


{'climate': {'GHI_w/m2': 80,
             'air_temperature_celcius': 9,
             'date': datetime.datetime(2023, 3, 9, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 6.0,
             'precipitation ': ' 0.00G',
             'precipitation_type': 'G',
             'precipitation_value': 0.0,
             'relative_humidity': 45.3,
             'station': '12345',
             'windspeed_knots': 2.5},
 'datetime': '09/03/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r1q
Terra Geohash P3: r1m
Climate Geohash P3: r36


Aqua Geohas

#########NEW BATCH #############


Aqua Geohash P3: r1w
Terra Geohash P3: r1x
Climate Geohash P3: r33


Aqua Geohash P3: r1w
Terra Geohash P3: r1m
Climate Geohash P3: r33


Aqua Geohash P3: r1m
Terra Geohash P3: r1m
Climate Geohash P3: r33


Aqua Geohash P3: r1s
Terra Geohash P3: r1n
Climate Geohash P3: r33


Aqua Geohash P3: r30
Terra Geohash P3: r1s
Climate Geohash P3: r33


{'climate': {'GHI_w/m2': 125,
             'air_temperature_celcius': 14,
             'date': datetime.datetime(2023, 6, 16, 0, 0),
             'hotspot_data': None,
             'max_wind_speed': 15.9,
             'precipitation ': ' 0.03G',
             'precipitation_type': 'G',
             'precipitation_value': 0.03,
             'relative_humidity': 44.6,
             'station': '12345',
             'windspeed_knots': 7.7},
 'datetime': '16/06/2023 00:00:00'}
#########END BATCH #############


#########NEW BATCH #############


Aqua Geohash P3: r1m
Terra Geohash P3: r1x
Climate Geohash P3: r1m


Aqua G

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopping query.
