### Part B


#### Task 1


In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
import pprint
from kafka3 import KafkaConsumer
from pymongo import MongoClient
from datetime import datetime, timedelta
import os

In [None]:
# change directory (local machine only)
# os.chdir("A2")
# os.chdir("data")

# dirs = os.listdir(os.getcwd())
# print(dirs)

In [None]:
!pip install geohash2

In [None]:
import geohash2

In [None]:
hostip = "192.168.10.125"

In [None]:
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell"
)

In [None]:
# encode latitude and longitude into a geohash with specified precision
def geohash_encode(latitude, longitude, precision=3):

    return geohash2.encode(latitude, longitude, precision)


# check if two geohashes are within proximity based on specified precision


def is_within_proximity(geohash1, geohash2, precision):
    # compare the first 'precision' characters of the geohashes

    return geohash1[:precision] == geohash2[:precision]


# determine the cause of the fire based on air temperature and GHI


def determine_fire_cause(air_temperature, ghi):
    # if air temperature > 20 and GHI > 180, fire cause is "natural", otherwise "other"

    return "natural" if air_temperature > 20 and ghi > 180 else "other"


def sync_fix(hotspots):
    # merge hotspots that are within proximity and have similar timestamps

    merged_hotspots = []
    # iterate through length of hotspots

    for i in range(len(hotspots)):
        # if hotspot is None, skip
        if hotspots[i] is None:

            continue
        # create a copy of the hotspot

        merged_hotspot = hotspots[i].copy()

        count = 1
        # create a list of hotspots that are being merged

        merging_hotspots = [hotspots[i]]
        # iterate through the rest of the hotspots

        for j in range(i + 1, len(hotspots)):
            # if hotspot is None, skip

            if hotspots[j] is None:

                continue
            # check if hotspots[i] and hotspots[j] are within proximity (geohash precision 4) and their time difference is within 10 minutes

            if is_within_proximity(
                hotspots[i]["geohash5"], hotspots[j]["geohash5"], 4
            ) and abs(
                datetime.strptime(hotspots[i]["datetime"], "%H:%M:%S")
                - datetime.strptime(hotspots[j]["datetime"], "%H:%M:%S")
            ) <= timedelta(
                minutes=10
            ):
                # if conditions are met, merge the hotspots, add confidence and surface temperature of hotspots[j] to merged_hotspot

                merged_hotspot["confidence"] += hotspots[j]["confidence"]

                merged_hotspot["surface_temperature_celcius"] += hotspots[j][
                    "surface_temperature_celcius"
                ]

                count += 1

                merging_hotspots.append(hotspots[j])

                hotspots[j] = None

        if count > 1:
            # if multiple hotspots were merged, calculate the average confidence and surface temperature

            merged_hotspot["confidence"] = round(merged_hotspot["confidence"] / count)

            merged_hotspot["surface_temperature_celcius"] = round(
                merged_hotspot["surface_temperature_celcius"] / count
            )

            merged_hotspots.append(merged_hotspot)

            print("========== Sync Fix: Merging Hotspots ==========")

            print(f"Merging {count} hotspots:")

            pprint.pprint(merging_hotspots)

            print("Merged Hotspot:")

            pprint.pprint(merged_hotspot)

            print("================================================")

        else:
            # if no merging occurred, add the original hotspot to merged_hotspots

            merged_hotspots.append(hotspots[i])
    return merged_hotspots


def process_climate_data(data):
    # proces the climate data

    climate_data = {
        "station": 948700,
        "date": datetime.strptime(data["created_date"], "%Y-%m-%d"),
        "climate": {
            "air_temperature_celcius": data["air_temperature_celcius"],
            "relative_humidity": data["relative_humidity"],
            "windspeed_knots": data["windspeed_knots"],
            "max_wind_speed": data["max_wind_speed"],
            "precipitation": data["precipitation "],
            "ghi_wm2": data["GHI_w/m2"],
        },
        "geohash3": geohash_encode(data["latitude"], data["longitude"], precision=3),
        "geohash5": geohash_encode(data["latitude"], data["longitude"], precision=5),
        "hotspots": [],
    }

    print("========== Climate Data Processing ==========")

    pprint.pprint(climate_data)

    print("=============================================")
    return climate_data


def process_hotspot_data(data):
    # process the hotspot data

    hotspot_data = {
        "latitude": data["latitude"],
        "longitude": data["longitude"],
        "confidence": data["confidence"],
        "surface_temperature_celcius": data["surface_temperature_celcius"],
        "datetime": data["created_time"],
        "geohash3": geohash_encode(data["latitude"], data["longitude"], precision=3),
        "geohash5": geohash_encode(data["latitude"], data["longitude"], precision=5),
    }

    print("========== Hotspot Data Processing ==========")

    pprint.pprint(hotspot_data)

    print("=============================================")
    return hotspot_data


def process_data(data):
    # process data for insertion into mongodb

    document = {}

    document["date"] = data["date"].isoformat()
    document["station"] = data["station"]

    document["climate"] = {
        "air_temperature_celcius": data["climate"]["air_temperature_celcius"],
        "relative_humidity": data["climate"]["relative_humidity"],
        "windspeed_knots": data["climate"]["windspeed_knots"],
        "max_wind_speed": data["climate"]["max_wind_speed"],
        "precipitation": data["climate"]["precipitation"],
        "ghi_wm2": data["climate"]["ghi_wm2"],
    }

    if "fire_cause" in data:
        document["climate"]["cause"] = data["fire_cause"]

    if "hotspots" in data:
        document["hotspots"] = []

        for each in data["hotspots"]:

            hotspot = {}
            # convert datetime string to time object and format as ISO string

            hotspot["datetime"] = (
                datetime.strptime(each["datetime"], "%H:%M:%S").time().isoformat()
            )
            hotspot["confidence"] = each["confidence"]
            hotspot["latitude"] = each["latitude"]
            hotspot["longitude"] = each["longitude"]
            hotspot["surface_temperature_celcius"] = each["surface_temperature_celcius"]

            document["hotspots"].append(hotspot)

    return document


def process_batch(batch_df, batch_id):

    print(f"=============== Processing Batch: {batch_id} ===============")

    climate_data = None
    hotspot_data = []

    for row in batch_df.collect():

        data = json.loads(row["value"])

        if data["producer_id"] == "climate_producer":

            if climate_data is None:
                # if climate data is not set, process the current data

                climate_data = process_climate_data(data)

            else:
                # if climate data is already set, keep the earliest one and drop the rest

                print(
                    "Multiple climate reports received in the batch. Keeping the earliest one and dropping the rest."
                )

        else:
            # if data is from hotspot producers, process the hotspot data

            hotspot_data.append(process_hotspot_data(data))

    if climate_data:
        # filter hotspots based on proximity to climate data (geohash precision 3)
        climate_data["hotspots"] = [
            hotspot
            for hotspot in hotspot_data
            if is_within_proximity(hotspot["geohash3"], climate_data["geohash3"], 3)
        ]
        # Merge hotspots that are within proximity and time difference

        climate_data["hotspots"] = sync_fix(climate_data["hotspots"])

        if climate_data["hotspots"]:
            # if there are hotspots, determine the fire cause based on air temperature and GHI

            climate_data["fire_cause"] = determine_fire_cause(
                float(climate_data["climate"]["air_temperature_celcius"]),
                float(climate_data["climate"]["ghi_wm2"]),
            )

        print("========== Final Climate Data ==========")

        pprint.pprint(climate_data)

        print("========================================")

        # process for mongo

        processed_data = process_data(climate_data)

        print("========== !Processed Data! ==========")

        pprint.pprint(processed_data)

        print("========================================")

        # insert processed data into MongoDB

        client = MongoClient(hostip, 27017)

        db = client["fit3182_assignment_db"]

        collection = db["processed_data"]

        collection.insert_one(processed_data)

        print("Data inserted into MongoDB")

        client.close()

    else:

        print("No climate data received in the batch.")

    print(f"=============== Batch {batch_id} Processing Completed ===============")


# engine
if __name__ == "__main__":
    # setup spark session
    spark = SparkSession.builder.master("local[*]").appName("StopFire").getOrCreate()

    # read data from kafka
    kafka_sdf = (
        spark.readStream.format("kafka")
        # subscribe to climate, hotspot_aqua, and hotspot_terra topics
        .option("kafka.bootstrap.servers", f"{hostip}:9092")
        .option("subscribe", "climate,hotspot_aqua,hotspot_terra")
        # start from the latest offset instead of earliest
        .option("startingOffsets", "earliest")
        .load()
    )

    # convert value column to string and parse as json
    streaming_df = kafka_sdf.selectExpr("CAST(value AS STRING)")

    # start the query
    query = (
        streaming_df.writeStream.outputMode("append")
        # pass each batch to process_batch function
        .foreachBatch(process_batch).start()
    )

    # wait for the query to terminate
    query.awaitTermination()

In [None]:
from pymongo import MongoClient
from pprint import pprint

# connect to mongodb
client = MongoClient(hostip, 27017)

# get appropriate database
db = client["fit3182_assignment_db"]

# get proper collection
collection = db["processed_data"]

# get all documents in the collection
cursor = collection.find({})

# print each document to see
for document in cursor:
    pprint(document)

client.close()