Name : Ng Chen Ting 
# Part B
## Task 1. Processing Data Stream
### Streaming Application

In [None]:
pip install geohash2

In [2]:
import json
import geohash2
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from collections import defaultdict
from datetime import datetime
from bson import ObjectId

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
host_ip = "192.168.1.14"

# Initialize a Spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Climate & Hotspot Data')
    .getOrCreate()
)

# Set up the Spark DataFrame to read from Kafka topics
kafka_sdf = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f"{host_ip}:9092") \
    .option("subscribe", "Assignment_TaskB_Climate, Assignment_TaskB_Hotspots") \
    .load()

climate_sdf = kafka_sdf.select('value')

# Open a mongoClient Connection
mongo_client = MongoClient(
                host=f'{host_ip}',
                port=27017
                )
db = mongo_client['fit3182_assignment_db'] 

In [3]:
def within_10_minutes(time_1, time_2):
    """
    A function to check if the two times are within 10 minutes of each other
    """
    time_format = '%H:%M:%S'  
    time1 = datetime.strptime(time_1, time_format)
    time2 = datetime.strptime(time_2, time_format)
    
    # Calculate time difference
    time_difference = abs((time2 - time1).total_seconds())
    
    # Return True if time difference is less than 10 mins
    return time_difference <= 600       # 600 seconds is 10 mins

In [4]:
def process_data_batch(df, epoch_id):
    """
    A function where it takes in and process a batch every 10 mins.
    """
    print("Batch " + str(epoch_id))
    data = df.collect()
    climate = {}
    hotspots = []

    # Decode each data (Climate and Hotspots from Terra and Aqua) in the batch
    for row in data:
        json_dict = json.loads(row.value.decode("utf-8"))
        
        # Add Geohash Precision 3 and 5 values for all climate and hotspot data
        latitude = json_dict.get('latitude')
        longitude = json_dict.get('longitude')
        geo_precision_3 = geohash2.encode(latitude, longitude, precision=3)
        geo_precision_5 = geohash2.encode(latitude, longitude, precision=5)
        json_dict['geohash3'] = geo_precision_3
        json_dict['geohash5'] = geo_precision_5

        # Append it to it's respective set/list according to producer_id
        producer_id = json_dict.get('producer_id')
        if producer_id == 'producer_climate':
            climate = json_dict
        elif producer_id == 'producer_hotspot_aqua':
            hotspots.append(json_dict)
        elif producer_id == 'producer_hotspot_terra':
            hotspots.append(json_dict)
   
    # Only perform pre-processing for database preparation if climate data exists
    # Else, do nothing for this batch
    if climate != {}:
        # Send data to respective handlers for pre-processing
        hotspots = hotspots_manager(hotspots)
        climate = climate_manager(climate, hotspots)

        # Write to MongoDB
        writeToMongoDB(climate)
    

In [5]:
def climate_manager(climate, hotspots):
    """
    A function to do pre-processing that prepares the climate data to be inserted into MongoDB
    """
    # Initialise hotspots column 
    climate['hotspots'] = []

    # Only perform pre-processing if there are hotspots
    if hotspots is not None:
        for hotspot in hotspots:
            
            # Check if climate & hotspot are nearby using Geohash precision 3
            # Else, ignore the hotspot
            if climate['geohash3'] == hotspot['geohash3']:
                
                # Determine cause of fire
                if climate['air_temperature_celcius'] > 20 and climate['GHI_w/m2'] > 180:
                    hotspot['cause'] = 'natural'
                else:
                    hotspot['cause'] = 'other'

                # Append the hotspot under the climate's hotspot section
                if 'hotspots' in climate:  
                    climate['hotspots'].append(hotspot)
                
    return climate

In [6]:
def hotspots_manager(hotspots):
        
    # Dictionary to store merged JSON objects
    merged_objects = defaultdict(lambda: {'latitude': '', 'longitude': '', 'confidence': 0, 'surface_temperature_celcius': 0, 'producer_id': '', 'created_time': '', 'geohash3': '', 'geohash5': ''})

    # Iterate over the JSON data
    for item in hotspots:
        
        geohash5 = item['geohash5']
        created_time = item['created_time']

        # Check if there's already a record with the same geohash5
        if geohash5 in merged_objects:
            # Check if the created_time is within 10 minutes of any existing record
            found_match = False
            for existing_item in merged_objects[geohash5]:
                existing_time = existing_item['created_time']
                
                if within_10_minutes(created_time, existing_time):
                    # Calculate the new values and update/replace
                    existing_item['surface_temperature_celcius'] = (existing_item['surface_temperature_celcius'] + int(item['surface_temperature_celcius'])) / 2
                    existing_item['confidence'] = (existing_item['confidence'] + item['confidence']) / 2
                    for key in ['latitude', 'longitude', 'producer_id', 'geohash3', 'geohash5']:
                        if existing_item['created_time'] > created_time:
                            existing_item[key] = item[key]
                    found_match = True
                    break

            # If no match is found within 10 minutes, add this record as a new entry
            if not found_match:
                merged_objects[geohash5].append(item)
        else:
            # If no existing record with the same geohash5, add this record as a new entry
            merged_objects[geohash5] = [item]

    # Convert merged_objects dictionary to a list of merged JSON objects
    merged_list = []
    for key, value in merged_objects.items():
        for item in value:
            merged_list.append(item)

    return merged_list

In [7]:
def writeToMongoDB(data):
    """
    A function to prepare the document according the data model format and insert to MongoDB Database
    """
    date_obj = datetime.strptime(data['date'], '%d/%m/%Y')
    
    document = {}
    document['_id'] = ObjectId()
    document['date'] = date_obj
    document['station'] = 9999        # Self-set station number 
    document["air_temperature_celcius"] = data['air_temperature_celcius']
    document['relative_humidity'] = data['relative_humidity']
    document['windspeed_knots'] = data['windspeed_knots']
    document['max_wind_speed'] = data['max_wind_speed']
    document['precipitation'] = data['precipitation']
    document['GHI_w/m2'] = data['GHI_w/m2']
    document['hotspots'] = []
    
    for each in data['hotspots']:
        time_obj = datetime.strptime(each['created_time'], '%H:%M:%S')
        combined_datetime = datetime(date_obj.year, date_obj.month, date_obj.day, time_obj.hour, time_obj.minute, time_obj.second)

        hotspot = {}
        hotspot['latitude'] = each['latitude']
        hotspot['longitude'] = each['longitude']
        hotspot['time'] = combined_datetime
        hotspot['confidence'] = each['confidence']
        hotspot['surface_temperature_celcius'] = each['surface_temperature_celcius']
        hotspot['cause'] = each['cause']
        document['hotspots'].append(hotspot)

    print(document)

    # Insert climate data into collection 
    db['Assignment_2_Climates'].insert_one(document)


In [8]:
writer = (
    climate_sdf.writeStream
    .option("checkpointLocation", "./climate_sdf_checkpoints")
    .outputMode('append')
    .trigger(processingTime='10 seconds') # Process data every 10 secs
    .foreachBatch(process_data_batch)
)

try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    mongo_client.close()
    query.stop()

Batch 169
{'_id': ObjectId('664f2a1b2a5f61900b329a10'), 'date': datetime.datetime(2024, 6, 17, 0, 0), 'station': 9999, 'air_temperature_celcius': 16, 'relative_humidity': 48.4, 'windspeed_knots': 8.1, 'max_wind_speed': 15.9, 'precipitation': ' 0.00G', 'GHI_w/m2': 139, 'hotspots': [{'latitude': -37.4368, 'longitude': 149.1191, 'time': datetime.datetime(2024, 6, 17, 8, 17, 7), 'confidence': 56, 'surface_temperature_celcius': 52, 'cause': 'other'}, {'latitude': -37.3493, 'longitude': 149.3691, 'time': datetime.datetime(2024, 6, 17, 19, 14, 34), 'confidence': 66, 'surface_temperature_celcius': 80, 'cause': 'other'}, {'latitude': -37.624, 'longitude': 149.314, 'time': datetime.datetime(2024, 6, 17, 0, 7, 26), 'confidence': 90, 'surface_temperature_celcius': 66, 'cause': 'other'}]}
Batch 170
{'_id': ObjectId('664f2a202a5f61900b329a11'), 'date': datetime.datetime(2024, 6, 18, 0, 0), 'station': 9999, 'air_temperature_celcius': 21, 'relative_humidity': 58.7, 'windspeed_knots': 7.8, 'max_wind_sp

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopping query.
