In [212]:
# import statement
import json
import pymongo
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pprint import pprint
import geohash2 as pgh

In [213]:
# create spark context
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Assignment part B Data')
    .getOrCreate()
)

In [214]:
topic = 'partB'

In [215]:
# create streaming context
kafka_sdf = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic)
    .load()
)

In [216]:
partB_sdf = kafka_sdf.select('value')

In [217]:
def process_batch(batch_df, batch_id):
    
    # this is in list form 
    raw_data = batch_df.collect()
   
    dataList =[]
    
    # change the datatype in raw_data into json form
    if len(raw_data) > 0:
        
        for data in raw_data : 
            data = data.value
            data = data.decode('utf-8')
            data = json.loads(data)
            dataList.append(data)
              
    raw_data = dataList

    climate = []
    hotspot= []
    
    # divide the data in raw_data into climate data and hotspot data 
    for data in raw_data : 
        # for producer 1, it is climate data
        if data['producer'] == 1:
            climate.append(data)
        # for producer 2 and 3, it is hotspot data
        else : 
            hotspot.append(data)
    
    # if there is no climate data, return 
    if len(climate) == 0 :
        return
    
    # there could only be one climate data in a 10 second interval
    climate = climate[0]
    
    # compute geo hash for climate data
    climateGeoVal = pgh.encode(climate['latitude'], climate['longitude'], precision = 3)
    
    tempHotspot = []
    
    # compute geohash for each hotspot data and get the hotspot data that has the same geo hash value as climate data
    for data in hotspot :
        geoVal = pgh.encode(data['latitude'], data['longitude'], precision = 3)
    
        if geoVal == climateGeoVal : 
            tempHotspot.append(data)
            
    # now hotpost is a list that contain hotspot data with the same geo hash value as climate data
    hotspot = tempHotspot    
    
    # if there is no hotspot data that has the same location as climate, insert the climate data to mongodb
    if len(hotspot) == 0 : 
        
        # connect with mongodb and get the database
        client = MongoClient() 
        db = client.fit3182_assignment_db
        climate['hotspot'] = []
        db.partB.insert_one(climate)
        return 
    
    GHValues = []
    GHValuesUniq = []
    
    # get the geohash value of each data in hotspot
    for data in hotspot : 
        geoVal = pgh.encode(data['latitude'], data['longitude'], precision = 5)
        GHValues.append(geoVal)
        
        # get unique geo hash value
        if geoVal not in GHValuesUniq : 
            GHValuesUniq.append(geoVal)
    
    hotspotFinal = []
    
    # now, group the hotspot data by location and get the average value for surface temperature and confidence
    # for each unique geohash value
    for value in GHValuesUniq : 
        
        hotspotGroup = []
        
        # get the hotspot data that has the same geo hash value
        for i in range(len(GHValues)) : 
            if GHValues[i] == value : 
                hotspotGroup.append(hotspot[i])
                
        surfaceTemps = []
        confidences = []
        
        # get their surface temperatures and confidences
        for data in hotspotGroup : 
            surfaceTemps.append(data['surface_temperature_celcius'])
            confidences.append(data['confidence'])

        # calculate mean for surface temperature and confidences
        meanSurfaceTemp = sum(surfaceTemps) / len(surfaceTemps)
        meanConfidence = sum(confidences) / len(confidences)
        
        data = hotspotGroup[0]
        data['surface_temperature_celcius'] = meanSurfaceTemp
        data['confidence'] = meanConfidence 
        data['date'] = climate['date']
        
        hotspotFinal.append(data)
        

    # detect the cause of fire
    if climate['air_temperature_celcius'] > 20 and climate['GHI_w/m2'] > 180 : 
        climate['fire_cause'] = 'natural'
    else :
        climate['fire_cause'] = 'other'
    
    # embedded the hotspotFinal array into the climate data
    climate['hotspot'] = hotspotFinal
    
    # store climate and embedded hotspot data to mongodb  
    client = MongoClient() 
    db = client.fit3182_assignment_db
    db.partB.insert_one(climate)
    

In [218]:
writer = (
    partB_sdf
    .writeStream
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(process_batch)
)

In [219]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/student/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopping query.
