# Assignment Part B Task 1.d Streaming Application

Student Name: Kuah Jia Chen <br>
Student ID: 32286988

Import necessary libraries

In [1]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
from json import *
import geohash2 as pgh
from datetime import *

Set a global variable to store topic name.

In [2]:
topic_name = 'AssignmentPartB'

Initialize our spark session with `#threads = #logicalCPU` and the given application name.

In [3]:
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Spark Streaming from Kafka into MongoDB for FIT3182 Assignment')
    .getOrCreate()
)

Create a streaming dataframe with options providing the bootstrap server(s) and topic name.

In [4]:
topic_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic_name)
    .load()
)

Print the schema for this dataframe to see what columns I have to work with.

In [5]:
topic_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Generate our output stream. We cast `col('value')` and `cast col('value')` as a string.

In [6]:
output_stream_df = (
    topic_stream_df
    .select(
       topic_stream_df.key.cast('string').alias('key'),
       topic_stream_df.value.cast('string').alias('data')
    )
)

Show the schema of the result dataframe.

In [7]:
output_stream_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- data: string (nullable = true)



Make a connection to mongodb

In [8]:
from pymongo import MongoClient
from pprint import pprint

# Create a MongoClient/Mongodb connection 
client = MongoClient()

# We will use the database: fit3182_assignment_db
db = client.fit3182_assignment_db

# The collection name is assignment_partB
assignment_partB = db.assignment_partB

For each batch, I will process the data using the process_batch function. I will provide a brief explanation regarding the steps of processing data within the process_batch function. Please refer to the inline comments for a more details explanation.

Here are the steps of processing each batch:

1. pre-process the raw data by changing them to a dictionary
2. check if there is climate data in this batch, if not, just return nothing, else continue the following steps
3. compute the geohash for climate data using precision 3
4. filter all the hotspot data such that only the hotspot data with the same location (i.e., same geohash with precision 3) as climate are kept
5. if no hotspots with the same location as the climate data, just insert climate data directly to the MongoDB database
6. otherwise, if there is at least one hotspot data with the same location as climate data, we will continue the following steps
7. add datetime, date, pgh encoding with precision = 5 to the hotspots data
8. group by the hotspot data based on the pgh_encoding and aggregate average of surface temperature and confidence
9. detect the cause of the fire and insert "natural" or "other" to each hotspot data based on the air temperature and GHI
10. delete time and pgh_encoding information for each hotspot data as they should not be part of the data, and also remove the key 
11. create the embedded model by inserting hotspots data into the climate data, which is the same as part A
12. insert the result climate data to MongoDB

In [9]:
def process_batch(batch_df,batch_id):
    
    # raw data
    raw_data = batch_df.collect()

    # pre-process the raw data
    data = []
    for raw in raw_data:
        current = raw.asDict()
        current['data'] = loads(current['data']) # change it to dictionary
        data.append(current)
    
    # check if there is climate data in this batch, if no, just return nothing
    has_climate = False
    climate = []
    hotspots = []
    for i in range(len(data)):
        if data[i]['key'] == 'climate':  # check the key
            has_climate = True
            climate.append(data[i])      # append climate data to climate list
        else:
            hotspots.append(data[i])     # append hotspot data to hotspots list
            
    if has_climate == False:             # if no climate data in this batch, just return nothing
        return
    
    # compute the geohash for climate data and filter hotspots data with the same location as climate
    climate_geohash = pgh.encode(climate[0]['data']['latitude'],climate[0]['data']['longitude'],precision=3)

    filtered_hotspots = []
    
    for i in range(len(hotspots)):
        
        # find the geohash for each hotspot data
        current_hotspots_geo_hash = pgh.encode(hotspots[i]['data']['latitude'],hotspots[i]['data']['longitude'],precision=3)
        
        # if it has the same geohash encoding as the climate data, add it to the filtered_hotspots list
        if climate_geohash == current_hotspots_geo_hash:
            filtered_hotspots.append(hotspots[i])
            
    # if no hotspots with the same location, just insert climate data directly to the mongodb database and return nothing
    if len(filtered_hotspots) == 0:
        # insert to the mongodb
        climate[0]['data']['hotspots'] = []
        assignment_partB.insert_one(climate[0]['data'])
        return
    
    # otherwise there is at least one hotspot data with the same location as climate data
    # hence add datetime, date, pgh encoding with precision = 5 to the hotspots data
    for i in range(len(filtered_hotspots)):
        
        # add datetime for each hotspot data
        
        # combine the date from climate data and time from hotspot data to generate datetime information
        current_date = datetime.strptime(climate[0]['data']['date'], '%d-%m-%Y')
        current_time = datetime.strptime(filtered_hotspots[i]['data']['time'], '%H:%M:%S').time()
        date_time = datetime.combine(current_date, current_time)
        date_time = date_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        filtered_hotspots[i]['data']['datetime'] = date_time
        
        
        # add date for each hotspot data by using the date from climate data
        filtered_hotspots[i]['data']['date'] = climate[0]['data']['date']
        #print(filtered_hotspots[i]['data']['date'])
        
        # add pgh encoding for each hotspot data with a precision of 5 using its latitude and longitude
        current_latitude = filtered_hotspots[i]['data']['latitude']
        current_longitude = filtered_hotspots[i]['data']['longitude']
        filtered_hotspots[i]['data']['pgh_encoding'] = pgh.encode(current_latitude,current_longitude,precision=5)
        
        
    # group by the hotspot data based on the pgh_encoding and aggregate average of surface temperature and confidence
    
    """
    Therefore, for each group of hotspots data with same pgh_encoding, after grouping them, the data would be:
    
    latitude: the value of the first hotspot data of the group
    longitude: the value of the first hotspot data of the group
    confidence: the average of all hotspot data with the same pgh_encoding
    surface_temperature_celcius: the average of all hotspot data with the same pgh_encoding
    date: the value of the first hotspot data of the group
    datetime: the value of the first hotspot data of the group
    pgh_encoding: will be remove after group by
    
    Please note that I did not check whether the hotspot is come from AQUA or TERRA as the Q4 2) in FAQ said that I am allow
    to just find the average values for each data that are at the same location regardless of the key (i.e., AQUA, TERRA) 
    and also use the latitude, longitude, date and datetime of the first hotspot data for each group.
    """
    
    # find the distinct pgh_encoding and store it in all_pgh_encoding
    all_pgh_encoding = []
    for i in range(len(filtered_hotspots)):
        current_encoding = filtered_hotspots[i]['data']['pgh_encoding']
        if current_encoding not in all_pgh_encoding:
            all_pgh_encoding.append(current_encoding)
    
    
    # group hotspot data with same encoding in a same sublist
    grouped_hotspots_data = []
    for pgh_encoding in all_pgh_encoding:
        temp = []
        for i in range(len(filtered_hotspots)):
            if filtered_hotspots[i]['data']['pgh_encoding'] == pgh_encoding:
                temp.append(filtered_hotspots[i])
        grouped_hotspots_data.append(temp)
        
    
    # for each group, find the average of confidence and surface_temperature 
    # and store it in the first document of each group
    for group in grouped_hotspots_data:
        sum_confidence = 0
        sum_surface_temperature = 0
        for i in range(len(group)):
            sum_confidence += group[i]['data']['confidence']
            sum_surface_temperature += group[i]['data']['surface_temperature_celcius']
        average_confidence = sum_confidence/len(group)
        average_surface_temperature = sum_surface_temperature/len(group)
        
        # store the value in the first document as eventually we only get the first document for each group as it stores
        # all the required information
        group[0]['data']['confidence'] = average_confidence
        group[0]['data']['surface_temperature_celcius'] = average_surface_temperature
        
        
    # filter the list such that the result list only contains the first document of each group as it stores all the 
    # required information
    hotspots = []
    for group in grouped_hotspots_data:
        hotspots.append(group[0])
    
    
    # detect the cause of fire and insert "natural" or "other" to each hotspot data based on the air temperature and GHI
    if climate[0]['data']['air_temperature_celcius'] > 20 and climate[0]['data']['GHI_w/m2'] > 180:
        cause_of_fire = "natural"
    else:
        cause_of_fire = "other"
    
    for i in range(len(hotspots)):
        hotspots[i]['data']['cause_of_fire'] = cause_of_fire

    
    # delete time and pgh_encoding as they should not be part of the data, and also remove the key so that hotspots list
    # only store the dictionary of the data
    for i in range(len(hotspots)):
        del hotspots[i]['data']['time']
        del hotspots[i]['data']['pgh_encoding']
        hotspots[i] = hotspots[i]['data']

    
    # create the embedded model by inserting hotspots data to the climate data, which is same as part A
    climate_data = climate[0]['data']
    climate_data['hotspots'] = hotspots

    # insert the result climate data to mongodb
    assignment_partB.insert_one(climate_data)
    return True

Define our stream writer for the MongoDB database sink.

In [10]:
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(process_batch)  # batch and process data in micro-batch intervals of 10 seconds 
)

In [11]:
db_writer.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fbb28f5a3d0>