# FIT3182 Assignment Part B

### Eu Jia Xin (30881676)


## Task 1d:

### Streaming Application
Write a streaming application using the Apache Spark Structured Streaming API which processes data in batches of 10 seconds. The streaming application will receive streaming data from all three producers and processes it as required.

In [1]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, DoubleType, DateType, IntegerType
from pyspark.sql.streaming import StreamingQueryException
from pyspark.sql.functions import col, from_json

import datetime
import json
import pprint
import time
import geohash2
from pprint import pprint

CLIMATE_TOPIC = 'Climate'
HOTSPOT_AQUA_TOPIC = 'Hotspot_AQUA'
HOTSPOT_TERRA_TOPIC = 'Hotspot_TERRA'
DB_NAME = 'fit3182_assignment_db'

In [2]:
# Create Spark context and streaming context

# create spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Spark Streaming from Kafka into MongoDB')
    .getOrCreate()
)

In [3]:
# create streaming dataframe with optionss providing the bootstrap server and topic name

topics = ",".join([CLIMATE_TOPIC, HOTSPOT_AQUA_TOPIC, HOTSPOT_TERRA_TOPIC])

climate_topic_stream_df = (
    spark.readStream.format('kafka') # specify source
    .option('kafka.bootstrap.servers', 'localhost:9092') # options
    .option('subscribe', topics) # multiple topics are subscribed here
    .load()
)

In [4]:
climate_topic_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
schema = (
    StructType().add("latitude", DoubleType()).add("longitude", DoubleType()).add('air_temperature_celcius', IntegerType())
                    .add('relative_humidity', DoubleType()).add('windspeed_knots', DoubleType()).add('max_wind_speed', DoubleType())
                    .add('precipitation', DoubleType()).add('precipitation_flag', StringType()).add('GHI', IntegerType())
                    .add('date', DateType()).add('time', StringType()).add('producer_id', StringType())
                    .add('confidence', IntegerType()).add('surface_temperature_celcius', IntegerType())
)

In [6]:
climate_output_stream_df = (
    climate_topic_stream_df.select(
        from_json(col("value").cast("string"), schema).alias("climate_parsed_value"))
        .select(col("climate_parsed_value.*"))
)

In [7]:
def foreach_batch_function(df, epoch_id):
    # transform and write batchDF
    batch_records = []
    for row in df.collect():
        row_json = row.asDict(True)
        batch_records.append(row)
    
    cleaned_record = streaming_data_handler(batch_records)

    if cleaned_record is None:
        return 
    
    # insert cleaned_record into mongodb
    mongo_client = MongoClient(host='localhost', port=27017)
    db = mongo_client['fit3182_assignment_db']
    collection = db.climate
    
    inserted_res = collection.insert_one(cleaned_record)
    print(inserted_res)
    pprint(cleaned_record)
    print()
    
    mongo_client.close()


In [8]:
db_writer = (
    climate_output_stream_df
    .writeStream
    .outputMode('append')
    .foreachBatch(foreach_batch_function)
    .trigger(processingTime='10 seconds') # batch and process data in micro-batch intervals of 10 seconds 
)

writer = db_writer

The following are helper functions to process streaming data and clean it before adding to MongoDB. 


In [9]:
def get_geohash(lat, long, precision=3):
    """
    Return geohash characters based on a location. Number of characters determined by precision argument.
    """
    chars = geohash2.encode(lat, long, precision=precision)
    return chars

def clean_record(original):
    """
    Given original streaming data, it is cleaned to fit the designed data model back in Task A.
    Note that hotspots now has an additional 'cause' attribute based on requirements.
    """
    climate = {}
    
    climate['date'] = datetime.datetime.combine(original['date'], datetime.datetime.min.time())
    climate['air_temperature_celcius'] = original['air_temperature_celcius']
    climate['max_wind_speed'] = original['max_wind_speed']
    climate['precipitation'] = original['precipitation']
    climate['precipitation_flag'] = original['precipitation_flag']
    climate['relative_humidity'] = original['relative_humidity']
    climate['station'] = original['station']
    climate['windspeed_knots'] = original['windspeed_knots']
    climate['GHI'] = original['GHI']
    
    if 'hotspot' in original:
        climate['hotspot'] = []
        for h in original['hotspot']:
            hotspot = dict()
            hour, minute = h['time'].split(':')
            hour, minute = int(hour), int(minute)
            hotspot['datetime'] = datetime.datetime.combine(original['date'], datetime.time(hour=hour, minute=minute))
#             hotspot['datetime'] = datetime.datetime.strptime(hotspot['datetime'], '%Y-%m-%dT%H:%M:%S')
            hotspot['latitude'] = h['latitude']
            hotspot['longitude'] = h['longitude']
            hotspot['confidence'] = h['confidence']
            hotspot['surface_temperature_celcius'] = h['surface_temperature_celcius']
            hotspot['cause'] = h['cause']
            climate['hotspot'].append(hotspot)
    
    return climate

def get_avg_confidence(hotspot1, hotspot2):
    return (hotspot1['confidence'] + hotspot2['confidence']) / 2

def get_avg_surface_temperature_celcius(hotspot1, hotspot2):
    return (hotspot1['surface_temperature_celcius'] + hotspot2['surface_temperature_celcius']) / 2

def merge_hotspots(aqua_hotspots, terra_hotspots):
    """
    Group hotspots based on location using geohash, for same locations,
    take the average surface temperature and confidence and save as a fire event.
    
    """
    hotspots = []
    
    # check if there are empty hotspots
    if len(aqua_hotspots) + len(terra_hotspots) == 0:
        return hotspots
    if len(aqua_hotspots) == 0:
        return terra_hotspots
    if len(terra_hotspots) == 0:
        return aqua_hotspots
    
    # receive data from 2 different satellites aqua and terra -> check for geohash precision 5 for same location
    # take average surface temp and confidence from the 2 same locations if merged
    used_hotspots = []
    merged_hotspots = []
    for aqua_hotspot in aqua_hotspots:
        found_terra = False
        for terra_hotspot in terra_hotspots:
            if aqua_hotspot['geohash5'] == terra_hotspot['geohash5']:
                # geohash precision 5 -> same location, can merge
                merged_hotspot = aqua_hotspot
                merged_hotspot['confidence'] = get_avg_confidence(aqua_hotspot, terra_hotspot)
                merged_hotspot['surface_temperature_celcius'] = get_avg_surface_temperature_celcius(aqua_hotspot, terra_hotspot)                
                merged_hotspots.append(merged_hotspot)
                used_hotspots.append(terra_hotspot)
                found_terra = True
            
        if not found_terra:
            # cant find any similar terra hotspots, add aqua_hotspot without merging
            merged_hotspots.append(aqua_hotspot)
    
    for terra_hotspot in terra_hotspots:
        # append terra hotspots that were not used from merging with aqua earlier
        if terra_hotspot not in used_hotspots:
            merged_hotspots.append(terra_hotspot)
            
    return merged_hotspots
    

In [10]:
def streaming_data_handler(data_lst):
    """
    data_lst holds the rows from all 3 producers in a single batch (10 seconds).
    The handler will process each row by identifying which datastream it is from based on the producer id, 
    then appropriately handle the climate and hotspots before cleaning.
    """
    hotspots = []
    climate = {}
    aqua_hotspots = []
    terra_hotspots = []
    
    for row in data_lst:
        record = row.asDict(True)
        # prepare geohash attribute 
        record['geohash3'] = get_geohash(record['latitude'], record['longitude'],3)
        record['geohash5'] = get_geohash(record['latitude'], record['longitude'], 5)
        
        if record['producer_id'] == 'producer_climate':
            climate = record
            climate['station'] = 948701 # add station for Task A data model requirements
            climate['hotspot'] = []
            
        elif record['producer_id'] == 'producer_hotspot_aqua':
            aqua_hotspots.append(record)
        elif record['producer_id'] == 'producer_hotspot_terra':
            terra_hotspots.append(record)
    
    hotspots = merge_hotspots(aqua_hotspots, terra_hotspots)
    
    # if streaming application has data from only 1 producer (Producer 1: Climate), it implies
    # that there was no fire at that time and we can store the climate data into MongoDB straight away.
    if len(climate) == 0:
        return
    elif len(hotspots) == 0:
        # only have climate data
        return clean_record(climate)
    else:
        # have both climate and hotspot
        for hotspot in hotspots:
            # check if climate and hotspot location - if not close, ignore hotspot.
            if climate['geohash3'] == hotspot['geohash3']:
                # for valid hotspots that are close, must determine cause of fire
                if climate['air_temperature_celcius'] > 20 and climate['GHI'] > 180:
                    hotspot['cause'] = 'natural'
                else:
                    hotspot['cause'] = 'other'
                
                # embed hotspot into climate
                climate['hotspot'].append(hotspot)
    
    cleaned_climate = clean_record(climate)
    return cleaned_climate

In [None]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

<pymongo.results.InsertOneResult object at 0x7f69bc490fc0>
{'GHI': 147,
 '_id': ObjectId('6282940500d4d500b4235a25'),
 'air_temperature_celcius': 18,
 'date': datetime.datetime(2022, 1, 9, 0, 0),
 'hotspot': [{'cause': 'other',
              'confidence': 84,
              'datetime': datetime.datetime(2022, 1, 9, 4, 48),
              'latitude': -37.596,
              'longitude': 149.319,
              'surface_temperature_celcius': 38}],
 'max_wind_speed': 15.0,
 'precipitation': 0.01,
 'precipitation_flag': 'G',
 'relative_humidity': 55.5,
 'station': 948701,
 'windspeed_knots': 8.9}

<pymongo.results.InsertOneResult object at 0x7f69bc463680>
{'GHI': 102,
 '_id': ObjectId('6282940f00d4d500b4235a27'),
 'air_temperature_celcius': 11,
 'date': datetime.datetime(2022, 1, 10, 0, 0),
 'hotspot': [{'cause': 'other',
              'confidence': 58.0,
              'datetime': datetime.datetime(2022, 1, 10, 19, 12),
              'latitude': -37.7379,
              'longitude': 143.1706,
 

<pymongo.results.InsertOneResult object at 0x7f69bc4f9580>
{'GHI': 153,
 '_id': ObjectId('6282947d00d4d500b4235a3d'),
 'air_temperature_celcius': 20,
 'date': datetime.datetime(2022, 1, 21, 0, 0),
 'hotspot': [],
 'max_wind_speed': 15.0,
 'precipitation': 0.0,
 'precipitation_flag': 'I',
 'relative_humidity': 62.6,
 'station': 948701,
 'windspeed_knots': 10.1}

<pymongo.results.InsertOneResult object at 0x7f69bc465100>
{'GHI': 98,
 '_id': ObjectId('6282948700d4d500b4235a3f'),
 'air_temperature_celcius': 11,
 'date': datetime.datetime(2022, 1, 22, 0, 0),
 'hotspot': [{'cause': 'other',
              'confidence': 97,
              'datetime': datetime.datetime(2022, 1, 22, 19, 12),
              'latitude': -37.0155,
              'longitude': 148.1432,
              'surface_temperature_celcius': 80}],
 'max_wind_speed': 8.9,
 'precipitation': 0.0,
 'precipitation_flag': 'A',
 'relative_humidity': 45.4,
 'station': 948701,
 'windspeed_knots': 5.2}

<pymongo.results.InsertOneResult obje