In [9]:
%pip install pygeohash
%pip install kafka

[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.
[0m[31mERROR: Could not find a version that satisfies the requirement pyspark.streaming.kafka (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pyspark.streaming.kafka[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
import datetime
import os
import json
import time
import pprint
import pygeohash as pg
from collections import defaultdict
from kafka3 import KafkaConsumer
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pymongo import MongoClient
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

topic_name = 'PartB'
hostip = "10.192.104.132"

# initialize spark session
spark = (SparkSession.builder
         .master('local[*]')
         # local means that you run as many processors as the cores available 
         # on ur machine
         .appName("ClimateAndHotspotStreaming")
         .getOrCreate())

# connect to mongo
client = MongoClient('10.192.104.132', 27017)
db = client["fit3182_assignment_db"]
climate_data_collection = db["climate_data"]

topic_stream_df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    .option('subscribe', topic_name)
    .load()
)


hnngh = topic_stream_df.select(
    col("key").cast("string").alias("key"),
    col("value").cast("string").alias("data")
)

""" HELPER FUNCTIONS """  
def determine_fire_cause(air_temperature, ghi):
    '''
    helper function done to determine fire causes
    '''
    if air_temperature > 20 and ghi > 180:
        return "natural"
    else:
        return "other"

""" MAIN PROCESSING FUNCTIONS """    
def reading(ugh):
    """
    for each batch, this function will read the streaming data
    and split it based on it's producer, we get this by
    iterating through each row in the dataframe, parsing the json
    string, then extract the information based on the producer
    (in this case, either climate01, aqua or terra). returning a tuple
    containing the parsed climate data and the 
    combined list of hotspot data (aqua and terra combined).
    """
    aqua = []
    terra = []
    climate = {}
    
    for item in ugh:
        value = json.loads(item['data'])
        key = str(value['producer'])
        if key == 'Climate01':
            climate = value
        elif key == 'hotspot_aqua':
            aqua.append(value)
        elif key == 'hotspot_terra':
            terra.append(value)
    return climate, aqua + terra

# group hotspots tgt
def hotpot_merger(hotspots):
    '''        
    Combine hotspots that share the same location and have timestamps within 10 minutes.
    
    We do this by converting the "created_time" string in each hotspot 
    to a datetime object and calculates the geohash for the hotspot location 
    using the pygeohash library (with a precision of 5). Then we use the geohash
    to group the hotspots via defaultdict, sorting them via their timestamps.
    
    Later we iterate through each group, calculating the avg confidence and surface temp
    Returning a list of the merged hotspot dicts
    '''
    # print('hello this works:hotsport merger starter')
    
    # Step 1: Convert time strings to datetime objects and encode geohashes
    for hotspot in hotspots:
        hotspot['datetime'] = datetime.datetime.strptime(hotspot['created_time'], "%H:%M:%S")
        hotspot['geohash'] = pg.encode(hotspot['latitude'], hotspot['longitude'], precision=5)
    
    
    # Step 2: Group hotspots by geohash
    groups = defaultdict(list)
    for hotspot in hotspots:
        groups[hotspot['geohash']].append(hotspot)
    
    merged_hotspots = []
    
    # Step 3: Process each geohash group
    for geohash, records in groups.items():
        records.sort(key=lambda r: r['datetime'])  # Sort records by time
        i = 0
        while i < len(records):
            group = [records[i]]
            j = i + 1
            while j < len(records) and (records[j]['datetime'] - records[i]['datetime']) <= timedelta(minutes=10):
                group.append(records[j])
                j += 1
            
            # Combine records in the group
            combined_record = {
                # Use corrdinates of the first record
                'latitude': group[0]['latitude'],  
                'longitude': group[0]['longitude'],  
                # Average the confidence
                'confidence': sum(r['confidence'] for r in group) / len(group),  
                # Average the surface temp
                'surface_temperature_celcius': sum(r['surface_temperature_celcius'] for r in group) / len(group)  
            }
            merged_hotspots.append(combined_record)
            i = j
            # print('hello this works:hotsport merger ender')
    
    return merged_hotspots

def climate_hotspot_merger(climate, hotspots):
    """
    we take climate data and a list of hotspots and
    group them based on location, while checking if both data
    are available. Then, we filter the hotspots to include only 
    those with the same geohash as the climate data (assuming they are nearby).
    see, precision = 3 and drop hotspots that arent close to climate
    """
    # print('hello this works: climate hotsport starter')
    ret = []
    if not climate or not hotspots:
        return climate, hotspots
    
    climate_geohash = pg.encode(climate['latitude'], climate['longitude'], precision = 3)
    
    nearby_hotspots = [
        hotspot for hotspot in hotspots
        if pg.encode(hotspot['latitude'], hotspot['longitude'], precision=3) == climate_geohash
    ]
    
    # Return the climate data and the list of nearby hotspots
    # print('hello this works: climate hotsport ender')
    return climate, nearby_hotspots

        


def stream_handler(batchdf, batchid):
    """
    this function collections data from the batch df then
    converts each row into a dictionary format, calling the 
    1. reading function to parse the data and get the hotspot and climate data
    2. then checks if climate data is present, if so, initalizes a fire cause and
    a hotspot list
    3. If hotspots are available, it merges them using the hotpot_merger function 
    and then merges climate data with nearby hotspots using the 
    climate_hotspot_merger function.
    4. later calling on the determine_fire_cause function to 
    determines the fire cause based on air temperature and GHI
    5. Cleans the climate and hotspot data by removing unnecessary fields 
    6. inserts the data into mongodb
    """
    raw = batchdf.collect()
    data = [row.asDict() for row in raw]
    climate, hotspot = reading(data)  
    
    if climate:
        # print('hello streaming starting')
        climate['hotspots'] = []
        climate['fire'] = 'NA'
        if hotspot:
            climate, hotspots = climate_hotspot_merger(climate, hotspot)
            hotspots = hotpot_merger(hotspots)
            climate['fire'] = determine_fire_cause(climate['air_temperature_celcius'], climate['GHI_w/m2'])
        
        print(climate['fire'])
        
        # making it equal to the model
        for item in hotspots:
            item.pop('created_time', None) 
            item.pop('producer', None) 
            
        climate.pop('latitude', None)
        climate.pop('longitude', None)
        climate.pop('producer', None)
        
        climate['hotspots'] = hotspot
        print()
        
        print("cleaned:", climate)
        climate_data_collection.insert_one(climate)
        
    
# spark structured streaminnn
"""
    - read from Kafka topics
    - apply processing function to each batch
    - handle exceptions
    - stop streaming query gracefully
"""

writer = (
    hnngh.writeStream.outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(stream_handler)
    
)

try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
except Exception as exc:
    print(exc)
finally:
    query.stop()

other

cleaned: {'air_temperature_celcius': 22, 'relative_humidity': 56.3, 'windspeed_knots': 8.0, 'max_wind_speed': 13.0, 'precipitation ': ' 0.08G', 'GHI_w/m2': 179, 'date': '2024-02-13', 'created_date': '2024-05-27 05:19:26', 'hotspots': [{'latitude': -38.219, 'longitude': 143.7352, 'confidence': 68, 'surface_temperature_celcius': 44, 'producer': 'hotspot_aqua', 'created_time': '03:01:41'}], 'fire': 'other'}
other

cleaned: {'air_temperature_celcius': 19, 'relative_humidity': 54.1, 'windspeed_knots': 11.2, 'max_wind_speed': 18.1, 'precipitation ': ' 0.31G', 'GHI_w/m2': 157, 'date': '2024-02-14', 'created_date': '2024-05-27 05:19:36', 'hotspots': [{'latitude': -37.236, 'longitude': 141.176, 'confidence': 68, 'surface_temperature_celcius': 37, 'producer': 'hotspot_aqua', 'created_time': '21:27:47'}, {'latitude': -35.0722, 'longitude': 141.4325, 'confidence': 76, 'surface_temperature_celcius': 53, 'producer': 'hotspot_aqua', 'created_time': '13:59:05'}, {'latitude': -36.5104, 'longitud

other

cleaned: {'air_temperature_celcius': 17, 'relative_humidity': 50.8, 'windspeed_knots': 9.0, 'max_wind_speed': 15.9, 'precipitation ': ' 0.00G', 'GHI_w/m2': 145, 'date': '2024-02-26', 'created_date': '2024-05-27 05:21:36', 'hotspots': [{'latitude': -36.4212, 'longitude': 141.2381, 'confidence': 75, 'surface_temperature_celcius': 49, 'producer': 'hotspot_aqua', 'created_time': '20:58:44'}, {'latitude': -36.357, 'longitude': 141.2363, 'confidence': 80, 'surface_temperature_celcius': 53, 'producer': 'hotspot_aqua', 'created_time': '19:41:18'}, {'latitude': -37.708, 'longitude': 145.1, 'confidence': 80, 'surface_temperature_celcius': 54, 'producer': 'hotspot_terra', 'created_time': '03:53:11'}], 'fire': 'other'}
other

cleaned: {'air_temperature_celcius': 18, 'relative_humidity': 53.5, 'windspeed_knots': 8.8, 'max_wind_speed': 13.0, 'precipitation ': ' 0.00I', 'GHI_w/m2': 150, 'date': '2024-02-27', 'created_date': '2024-05-27 05:21:46', 'hotspots': [{'latitude': -35.1491, 'longitude'

other

cleaned: {'air_temperature_celcius': 16, 'relative_humidity': 48.1, 'windspeed_knots': 9.3, 'max_wind_speed': 12.0, 'precipitation ': ' 0.00G', 'GHI_w/m2': 139, 'date': '2024-03-09', 'created_date': '2024-05-27 05:23:37', 'hotspots': [{'latitude': -37.227, 'longitude': 142.138, 'confidence': 73, 'surface_temperature_celcius': 39, 'producer': 'hotspot_aqua', 'created_time': '03:19:15'}, {'latitude': -38.0564, 'longitude': 142.8525, 'confidence': 74, 'surface_temperature_celcius': 48, 'producer': 'hotspot_aqua', 'created_time': '17:24:27'}, {'latitude': -38.1719, 'longitude': 143.0705, 'confidence': 68, 'surface_temperature_celcius': 44, 'producer': 'hotspot_terra', 'created_time': '02:28:30'}], 'fire': 'other'}
other

cleaned: {'air_temperature_celcius': 20, 'relative_humidity': 57.4, 'windspeed_knots': 10.9, 'max_wind_speed': 22.0, 'precipitation ': ' 0.00I', 'GHI_w/m2': 161, 'date': '2024-03-10', 'created_date': '2024-05-27 05:23:47', 'hotspots': [{'latitude': -37.7416, 'longit