## Assignment 2 Part B Streaming Application

**Name: Kang Hong Bo (Student ID:32684673)**

Write a streaming application using the Apache Spark Structured Streaming API which processes data in daily batches. Each batch should contain 1 Climate report and 0 or more Hotspot reports (from Producer 2 or 3). The streaming application should process the data as follows:<br>
- Group the Climate and Hotspots streams based on location and create the data model developed in Part A. Assume that all Hotspot reports in a batch window were generated for the current day. Use geohash precision 3 for Climate-to-Hotspot matching. You should drop any Hotspot reports that are not proximately close to the Climate report. <br>
- If there is no Hotspot report in a batch (only Climate report is present), it implies that there was no fire for that day and we can store the Climate report into MongoDB straight away. <br>
- Due to synchronisation issues in the communication between the satellites and the receiving station, the AQUA and TERRA satellites may unknowingly produce multiple reports for the same Hotspot event. This is characterised as Hotspot reports with similar location (geohash precision 5) and created time (<=10 minutes apart).  In such cases, you should merge these reports by averaging the ‘surface temperature’ and ‘confidence’. You are free to choose the method to merge the other fields in the reports (i.e. averaging, first/last etc.).<br>
- If a fire was detected with an air temperature greater than 20 (°C) and a GHI greater than 180 (W/m2), then report the cause of the fire event as ‘natural’. Otherwise, report the cause of the fire event as ‘other’.


In [1]:
!pip install pygeohash # install pygeohash
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
# import statements 
import pandas as pd
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
import pygeohash as pgh
import json
from pprint import pprint
from datetime import datetime
from pyspark.sql.streaming import StreamingQueryException




In [2]:
topic_name="PartB"
hostip = "192.168.22.110" ## PLEASE CHANGE THIS LINE TO YOUR HOSTIP
client = MongoClient (hostip,27017) 
db = client.fit3182_assignment_db
collection = db.partB
collection.delete_many({})

DeleteResult({'n': 19, 'ok': 1.0}, acknowledged=True)

In [3]:
spark = (
    SparkSession.builder
    # import and use all of the available local processor
    .master('local[*]')
    .appName('Spark Application')
    .getOrCreate()
)

topic_stream_df = (
    spark.readStream.format('kafka')
    # this is set the same environment as the producers
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    .option('subscribe', topic_name)
    .load()
)

output_stream_df = (
    topic_stream_df
    .select(                                      
        topic_stream_df.value
        .alias('data')
    )
)

In [4]:
topic_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
def process_batch(batch_df,batch_id):
    raw_data = batch_df.collect()
    streams = [json.loads(item.asDict()['data']) for item in raw_data]
    allLoc={}
    cData=None
    

    for stream in streams:
        #if the current stream contains climate data then the cData will get the stream this will 
        if stream['producer'] == 'climate_streaming':
            cData = stream
        # get the location with precision = 3
        location =pgh.encode(stream['latitude'],stream['longitude'],precision=3)
        if location in allLoc:
            allLoc[location].append(stream)
        else:
            allLoc[location]=[stream]
    
    if cData is not None:
        cData['date']=datetime.strptime(cData['createdDate'],'%d/%m/%Y')
        del cData['createdDate']
        climateLocation=pgh.encode(cData['latitude'],cData['longitude'],precision=3)
        
        if len(allLoc[climateLocation])>1:
            hotspots={}
            
            for stream in allLoc[climateLocation]:
                producer=stream['producer']
                
                if producer!='climate_streaming':
                    #get location with precesion 5
                    location = pgh.encode(stream['latitude'],stream['longitude'],precision=5)
                    if location in hotspots:
                        hotspots[location].append(stream)
                    else:
                        hotspots[location]=[stream]
                        
            # Calculate fireCause according to climate data
            fireCause = 'natural' if cData['air_temperature_celcius']> 20 and cData['GHI_w/m2']>180 else 'other'


            hotspotsData = []    
            #sort the hotspots by the time of the stream
            for location in hotspots.keys():
                hotspots_df = pd.DataFrame(hotspots[location])
                # sort the DataFrame by the time field
                hotspots_df = hotspots_df.sort_values(by='time')
                # convert the sorted DataFrame back to a list of dictionaries
                hotspots[location] = hotspots_df.to_dict('records')
                #merge the hotspots that are within 10 minutes of each other
                temp = hotspots[location]
                hotspots[location] = []
                i = 0
                # loop through the hotspots of the location
                while i < len(temp):
                    j = i + 1
                    #make the time become datetime format and check if the time is 10mins apart
                    while j < len(temp) and (datetime.strptime(temp[j]['time'], '%H:%M:%S') - datetime.strptime(temp[i]['time'], '%H:%M:%S')).seconds <= 600:
#                         print("hi")
                        j += 1
                    #calculate the average surface temperature and confidence
                    
                    avgSurfaceTemp = sum([x['surface_temperature_celcius'] for x in temp[i:j]]) / (j - i)
                    avgConfidence = sum([x['confidence'] for x in temp[i:j]]) / (j - i)
                    
                    # get the actual datetime for the hotspots
                    specificDateTime = cData['date'].strftime("%Y/%m/%d ")+temp[i]['time']
#                     print(specificDateTime)
                    hotspots[location].append({
                        'latitude': temp[i]['latitude'],
                        'longitude': temp[i]['longitude'],
                        # make the time to datetime format
                        'time': datetime.strptime(specificDateTime,'%Y/%m/%d %H:%M:%S'),
                        'confidence': avgConfidence,
                        'surface_temperature_celcius': avgSurfaceTemp
                    })
                    i = j
                

                #create the hotspot document
                for hotspot in hotspots[location]:
                    hotspotsData.append({
                        'latitude': hotspot['latitude'],
                        'longitude': hotspot['longitude'],
                        'time': hotspot['time'],
                        'confidence': hotspot['confidence'],
                        'surface_temperature_celcius': hotspot['surface_temperature_celcius']
                    })
            #add the single location hotspot to the climate data
            cData['hotspots'] = hotspotsData
            cData['fireCause'] = fireCause
                
    #before insert the data to the database, we need to check if the data is already in the database
    if cData is not None:
        if collection.find_one({'date': cData['date']}):
            print('Data already exists in the database')
        else:
            pprint(cData)
            collection.insert_one(cData)
            print('Data inserted into the database')
    else:
        print('No climate data in the batch')





In [6]:
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime = '10 seconds') 
    .foreachBatch(process_batch) 
)

console_logger = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .format('console')
)

In [7]:
writer = db_writer
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

No climate data in the batch
No climate data in the batch
{'GHI_w/m2': 125,
 'air_temperature_celcius': 14,
 'date': datetime.datetime(2024, 1, 4, 0, 0),
 'fireCause': 'other',
 'hotspots': [{'confidence': 64.0,
               'latitude': -37,
               'longitude': 149,
               'surface_temperature_celcius': 44.0,
               'time': datetime.datetime(2024, 1, 4, 20, 39, 55)}],
 'latitude': -37.391,
 'longitude': 148.066,
 'max_wind_speed': 15.9,
 'precipitation': 0.03,
 'precipitationFlag': 'G',
 'producer': 'climate_streaming',
 'relative_humidity': 44.6,
 'windspeed_knots': 7.7}
Data inserted into the database
{'GHI_w/m2': 83,
 'air_temperature_celcius': 9,
 'date': datetime.datetime(2024, 1, 5, 0, 0),
 'fireCause': 'other',
 'hotspots': [{'confidence': 68.0,
               'latitude': -37,
               'longitude': 149,
               'surface_temperature_celcius': 43.0,
               'time': datetime.datetime(2024, 1, 5, 6, 45, 35)},
              {'confidence':

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted. Stopped query
