# FIT3182 - Assignment 3
---
## Part B - Streaming Application

- Filename: Assignment_PartB_Streaming_Application.ipynb
- Student Name: Deeksha Sridhar
- Student ID: 32187998

In [None]:
!pip install pygeohash

In [11]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pygeohash as pgh
import json
from pprint import pprint
from datetime import datetime

# Set up MongoDB connection
mongo_host = '192.168.86.244'
mongo_port = 27017
mongo_db_name = 'fit3182_assignment_db'
mongo_collection_name = 'streaming'

mongo_client = MongoClient(f'mongodb://{mongo_host}:{mongo_port}/')
mongo_db = mongo_client[mongo_db_name]
mongo_collection = mongo_db[mongo_collection_name]
mongo_collection.delete_many({})

# Set up Spark session
spark = SparkSession.builder.master('local[*]').appName('Spark Streaming from Kafka into MongoDB').getOrCreate()

# Set up Kafka connection
kafka_host = '192.168.86.244'
kafka_topic = 'streaming'

topic_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{kafka_host}:9092')
    .option('subscribe', kafka_topic)
    .load()
)

output_stream_df = topic_stream_df.select(col('value').alias('data'))

# Define batch processing function
def process_batch(batch_df, batch_id):
    raw_data = batch_df.collect()
    streams = [json.loads(item.asDict()['data']) for item in raw_data]
    hashMap = {}
    climateData = None
    
    for stream in streams:
        if stream['producer'] == 'climate_streaming':
            climateData = stream
        
        location = pgh.encode(stream['latitude'], stream['longitude'], precision=3)
        if location in hashMap:
            hashMap[location].append(stream)
        else:
            hashMap[location] = [stream]
    
    if climateData is not None:
        climateData['date'] = datetime.strptime(climateData['created_date'], '%d/%m/%Y')
        del climateData['created_date']
        
        climateLocation = pgh.encode(climateData['latitude'], climateData['longitude'], precision=3)
        
        if len(hashMap[climateLocation]) > 1:
            hotspots = {}
            
            for stream in hashMap[climateLocation]:
                producer = stream['producer']
                
                if producer != 'climate_streaming':
                    location = pgh.encode(stream['latitude'], stream['longitude'], precision=5) 
                    if location in hotspots:
                        hotspots[location].append(stream)
                    else:
                        hotspots[location] = [stream]
                        
            if climateData['air_temperature_celcius'] > 20 and climateData['GHI_w/m2'] > 180:
                fireCause = 'natural'
            else:
                fireCause = 'other'
            
            hotspotDocs = []
            
            for location in hotspots.keys():
                hotspotDoc = None
                totalSurfaceTemp = 0
                totalConfidence = 0
                numberOfData = 0
                
                for fire in hotspots[location]:
                    hotspotDoc = fire
                    totalSurfaceTemp += fire['surface_temperature_celcius']
                    totalConfidence += fire['confidence']
                    numberOfData += 1
                
                avgSurfaceTemp = int(totalSurfaceTemp / numberOfData)
                avgConfidence = int(totalConfidence / numberOfData)
                hotspotDoc['surface_temperature_celcius'] = avgSurfaceTemp
                hotspotDoc['confidence'] = avgConfidence
                
                curr_datetime = datetime.strptime(hotspotDoc['created_datetime'], '%d/%m/%Y %H:%M:%S')
                new_datetime = climateData['date'].replace(hour=curr_datetime.hour, minute=curr_datetime.minute)
                hotspotDoc['datetime'] = new_datetime.strftime('%d/%m/%Y %H:%M:%S')
                del hotspotDoc['created_datetime']
                
                hotspotDoc['cause'] = fireCause
                del hotspotDoc['producer']
                hotspotDocs.append(hotspotDoc)
    
            climateData['hotspots'] = hotspotDocs
    
    if climateData is not None:
        del climateData['producer']
        del climateData['latitude']
        del climateData['longitude']
        climateData['station'] = 948701
        pprint(climateData)
        mongo_collection.insert_one(climateData)

# Define streaming queries
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime='10 seconds') 
    .foreachBatch(process_batch) 
)

console_logger = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .format('console')
)

# Start the streaming queries
writer_query = db_writer.start()
console_query = console_logger.start()

# Wait for termination or interruption
try:
    writer_query.awaitTermination()
    console_query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')

# Stop the queries
writer_query.stop()
console_query.stop()


{'GHI_w/m2': 93,
 'air_temperature_celcius': 10,
 'date': datetime.datetime(2023, 6, 12, 0, 0),
 'max_wind_speed': 14.0,
 'precipitation': 0.0,
 'precipitation_flag': 'G',
 'relative_humidity': 39.4,
 'station': 948701,
 'windspeed_knots': 9.6}
{'GHI_w/m2': 180,
 'air_temperature_celcius': 23,
 'date': datetime.datetime(2023, 6, 13, 0, 0),
 'hotspots': [{'cause': 'other',
               'confidence': 100,
               'datetime': '13/06/2023 03:56:00',
               'latitude': -36.5754,
               'longitude': 142.2926,
               'surface_temperature_celcius': 111},
              {'cause': 'other',
               'confidence': 82,
               'datetime': '13/06/2023 08:44:00',
               'latitude': -36.6995,
               'longitude': 142.9146,
               'surface_temperature_celcius': 56},
              {'cause': 'other',
               'confidence': 68,
               'datetime': '13/06/2023 13:32:00',
               'latitude': -37.8865,
               'lon

{'GHI_w/m2': 155,
 'air_temperature_celcius': 18,
 'date': datetime.datetime(2023, 6, 30, 0, 0),
 'max_wind_speed': 13.0,
 'precipitation': 0.0,
 'precipitation_flag': 'I',
 'relative_humidity': 49.4,
 'station': 948701,
 'windspeed_knots': 9.3}
{'GHI_w/m2': 107,
 'air_temperature_celcius': 12,
 'date': datetime.datetime(2023, 7, 1, 0, 0),
 'hotspots': [{'cause': 'other',
               'confidence': 56,
               'datetime': '01/07/2023 23:08:00',
               'latitude': -37.4421,
               'longitude': 148.259,
               'surface_temperature_celcius': 39}],
 'max_wind_speed': 18.1,
 'precipitation': 0.0,
 'precipitation_flag': 'G',
 'relative_humidity': 44.7,
 'station': 948701,
 'windspeed_knots': 11.4}
{'GHI_w/m2': 114,
 'air_temperature_celcius': 13,
 'date': datetime.datetime(2023, 7, 2, 0, 0),
 'hotspots': [{'cause': 'other',
               'confidence': 71,
               'datetime': '02/07/2023 03:56:00',
               'latitude': -37.522,
               'lo

{'GHI_w/m2': 149,
 'air_temperature_celcius': 18,
 'date': datetime.datetime(2023, 7, 17, 0, 0),
 'max_wind_speed': 11.1,
 'precipitation': 0.0,
 'precipitation_flag': 'I',
 'relative_humidity': 53.8,
 'station': 948701,
 'windspeed_knots': 6.4}
{'GHI_w/m2': 93,
 'air_temperature_celcius': 10,
 'date': datetime.datetime(2023, 7, 18, 0, 0),
 'hotspots': [{'cause': 'other',
               'confidence': 64,
               'datetime': '18/07/2023 13:32:00',
               'latitude': -37.46,
               'longitude': 148.113,
               'surface_temperature_celcius': 52}],
 'max_wind_speed': 16.9,
 'precipitation': 0.08,
 'precipitation_flag': 'G',
 'relative_humidity': 39.9,
 'station': 948701,
 'windspeed_knots': 12.6}
{'GHI_w/m2': 112,
 'air_temperature_celcius': 13,
 'date': datetime.datetime(2023, 7, 19, 0, 0),
 'max_wind_speed': 9.9,
 'precipitation': 0.01,
 'precipitation_flag': 'G',
 'relative_humidity': 49.4,
 'station': 948701,
 'windspeed_knots': 3.8}
{'GHI_w/m2': 147,
 'a

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query
