In [1]:
#pip install python-geohash

In [16]:
import json
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import geohash
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

#change ip address
host_ip = "10.192.33.112"
#172.16.33.120



mongo_client = MongoClient(
    host=f'{host_ip}',
    port=27017
)
db = mongo_client['fit3182_assignment_db']
collection = db.a2_partB
#to get the latest value after schema change
#collection.drop()

spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Application')
    .getOrCreate()
)

#append stream from multiple topic together
topic = "Producer1|Producer2|Producer3"
#topic_hotspot = 'A2_hotspot_topic'
# topic_AQUA = 'Scenaries02'
# topic_TERRA  = 'Scenaries03'

In [17]:
kafka_sdf = (
    spark.readStream
    .format('kafka')  #auto_offset_reset='earliest' by default 'latest'
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribePattern', topic) #listen to multiple topics
    .load()
)
kafka_sdf.printSchema()

#select the value column in dataframe
climatehotspot_sdf = kafka_sdf.select('value')

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
from datetime import datetime, timedelta

def climate_geohash(lat, lon):
    return geohash.encode(lat, lon, precision=3)

def hotspot_geohash(lat, lon):
    return geohash.encode(lat, lon, precision=5)

def insertOne(result):
    collection.insert_one(result)
    #print("INSERTED !")
    

def func(batch_df,batch_id):
    #retrieve all data from dataframe  (for kafka) into a python list for 
    entries = batch_df.collect()
    #convert each entry into python dictionary
    data = [entry.asDict() for entry in entries]
    
    
    if data is not None:
        #for each entry/row in one batch of 10 seconds
        climate_report = None
        climate_geohash3 = None
        hotspot_hash = {}
        #store the hotspot in precision 3
        climate_hash = {}
        
        #each dict
        for row in data: #select value column from kafka dict
            byte_array = row['value']
            # Decode the bytearray into a string
            json_str = byte_array.decode('utf-8')

            # Parse the JSON string into a dictionary
            data_dict = json.loads(json_str)
            
            #climate report
            if data_dict['producer'] == 'Producer_1':
                #take only the first climate report
                if climate_report is None:
                    climate_report = data_dict
                    
                    
                    climate_geohash3 = climate_geohash(climate_report['latitude'], climate_report['longitude'])
                    #initialise a blank array in set to append hotspot later
                    climate_hash[climate_geohash3] = []
                    climate_report['hotspot'] = []

                    
            #hotspot report
            else:
                hotspot_dict = data_dict
                print("hotspot_dict: ", hotspot_dict)    #climate geohash as key only ke
                
                #hash the geohash among the hotspots
                hotspot_geohash5 = hotspot_geohash(hotspot_dict['latitude'], hotspot_dict['longitude'])
                
                #maintain hotspot for not the same geohash string in precision 5
                if hotspot_geohash5 not in hotspot_hash.keys():
                    #accumulate the hotspot dictionary until cliamte report come
                    hotspot_hash[hotspot_geohash5] = [hotspot_dict]
                    
                #add to the array with the same geohash key 
                else:
                    hotspot_hash[hotspot_geohash5].append(hotspot_dict)
                    
                
        #if climate report exist for each batch
        if climate_report is not None:
                #for each geohash5 hotspots do average of temp within 10 minutes
            for key, value in hotspot_hash.items():
                last_hp = value[-1]
                #datetime string      #do again strptime to turn into datetime object
                latest_min = datetime.strptime(last_hp['created_time'], '%H:%M:%S')
                avg_temp = 0
                avg_conf = 0
                len_array = len(value)
                #print(len_array)

                #for each key average the surface temperature and confidence of the array in value
                #by reducing array 
                for hp in value:
                    # within 10 minutes from latest hotspot in each batch
                    time_differnce = latest_min - datetime.strptime(hp['created_time'], '%H:%M:%S')
                    if  time_differnce < timedelta(minutes = 10):
                        #print(time_differnce)
                        avg_temp += hp['surface_temperature_celcius']
                        avg_conf += hp['confidence']

                avg_temp = avg_temp/len_array
                avg_conf = avg_conf/len_array
                last_hp['surface_temperature_celcius'] = avg_temp
                last_hp['avg_conf'] = avg_conf
                #set it to only hotspot for this key
                hotspot_hash[key] = last_hp


                # Check if the first 3 characters of the key match the climate_geohash3
                if str(key[:3]) == climate_geohash3:
                    # If the key exists in climate_hash, append to it, otherwise create a new list
                    if climate_geohash3 in climate_hash:
                        climate_hash[climate_geohash3].append(hotspot_hash[key])
                    else:
                        climate_hash[climate_geohash3] = [hotspot_hash[key]]
                    # Append the hotspots to the climate report
                    #print(climate_hash[climate_geohash3])
                    
                    
            climate_report['hotspot'] = climate_hash[climate_geohash3]
                    #print("hotspots: " + str(climate_report['hotspot']))
            #print(len_array)
            #CAUSE OF fire event
            if climate_report['air_temperature_celcius'] > 20 and climate_report['GHI_w/m2'] > 180:
                climate_report['fire_cause'] = 'natural'
            else:
                climate_report['fire_cause'] = 'other'
        
            print(climate_report)
            insertOne(climate_report)        

                
db_writer = (
    climatehotspot_sdf.writeStream
    .outputMode('append')
    .trigger(processingTime ='10 seconds') # one day 
    #apply a function to each aach of data with the internal of processing time
    .foreachBatch(func) #batch and batch id and input
)



In [None]:
try:
    query = db_writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()


hotspot_dict:  {'latitude': -36.0295, 'longitude': 143.6409, 'confidence': 89.0, 'surface_temperature_celcius': 65.0, 'producer': 'Producer_3', 'created_time': '09:00:00'}
hotspot_dict:  {'latitude': -36.984, 'longitude': 148.25, 'confidence': 50.0, 'surface_temperature_celcius': 35.0, 'producer': 'Producer_3', 'created_time': '09:02:00'}
hotspot_dict:  {'latitude': -34.3539, 'longitude': 141.5629, 'confidence': 81.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_2', 'created_time': '09:04:00'}
hotspot_dict:  {'latitude': -36.5134, 'longitude': 142.4682, 'confidence': 70.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_3', 'created_time': '09:03:00'}
hotspot_dict:  {'latitude': -34.9194, 'longitude': 140.9678, 'confidence': 66.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_3', 'created_time': '09:05:00'}
hotspot_dict:  {'latitude': -37.853, 'longitude': 142.5253, 'confidence': 94.0, 'surface_temperature_celcius': 104.0, 'producer': 'Producer_2',

hotspot_dict:  {'latitude': -37.3009, 'longitude': 143.4777, 'confidence': 52.0, 'surface_temperature_celcius': 39.0, 'producer': 'Producer_3', 'created_time': '09:40:00'}
hotspot_dict:  {'latitude': -36.4371, 'longitude': 141.7692, 'confidence': 74.0, 'surface_temperature_celcius': 48.0, 'producer': 'Producer_2', 'created_time': '09:42:00'}
hotspot_dict:  {'latitude': -36.5107, 'longitude': 144.4722, 'confidence': 84.0, 'surface_temperature_celcius': 58.0, 'producer': 'Producer_3', 'created_time': '09:42:00'}
hotspot_dict:  {'latitude': -36.5234, 'longitude': 145.4642, 'confidence': 79.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '09:45:00'}
hotspot_dict:  {'latitude': -37.5436, 'longitude': 143.6122, 'confidence': 80.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_3', 'created_time': '09:45:00'}
hotspot_dict:  {'latitude': -36.4125, 'longitude': 143.1189, 'confidence': 100.0, 'surface_temperature_celcius': 92.0, 'producer': 'Producer

hotspot_dict:  {'latitude': -37.876, 'longitude': 143.7804, 'confidence': 89.0, 'surface_temperature_celcius': 65.0, 'producer': 'Producer_3', 'created_time': '10:18:00'}
hotspot_dict:  {'latitude': -37.7068, 'longitude': 141.3149, 'confidence': 76.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_2', 'created_time': '10:20:00'}
hotspot_dict:  {'latitude': -37.8046, 'longitude': 146.0304, 'confidence': 91.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_3', 'created_time': '10:19:00'}
hotspot_dict:  {'latitude': -37.5728, 'longitude': 142.6348, 'confidence': 100.0, 'surface_temperature_celcius': 98.0, 'producer': 'Producer_2', 'created_time': '10:21:00'}
hotspot_dict:  {'latitude': -36.4483, 'longitude': 142.2303, 'confidence': 76.0, 'surface_temperature_celcius': 49.0, 'producer': 'Producer_3', 'created_time': '10:20:00'}
hotspot_dict:  {'latitude': -36.0144, 'longitude': 141.7008, 'confidence': 51.0, 'surface_temperature_celcius': 38.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -36.7508, 'longitude': 147.1254, 'confidence': 66.0, 'surface_temperature_celcius': 39.0, 'producer': 'Producer_3', 'created_time': '10:58:00'}
hotspot_dict:  {'latitude': -37.5406, 'longitude': 142.9301, 'confidence': 59.0, 'surface_temperature_celcius': 46.0, 'producer': 'Producer_2', 'created_time': '11:00:00'}
hotspot_dict:  {'latitude': -37.621, 'longitude': 143.447, 'confidence': 87.0, 'surface_temperature_celcius': 61.0, 'producer': 'Producer_2', 'created_time': '11:02:00'}
hotspot_dict:  {'latitude': -37.9022, 'longitude': 141.1109, 'confidence': 81.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_3', 'created_time': '11:01:00'}
hotspot_dict:  {'latitude': -37.1999, 'longitude': 143.8231, 'confidence': 100.0, 'surface_temperature_celcius': 90.0, 'producer': 'Producer_3', 'created_time': '11:02:00'}
hotspot_dict:  {'latitude': -36.0818, 'longitude': 146.5461, 'confidence': 57.0, 'surface_temperature_celcius': 44.0, 'producer': 'Producer_2

hotspot_dict:  {'latitude': -36.3415, 'longitude': 141.5733, 'confidence': 88.0, 'surface_temperature_celcius': 64.0, 'producer': 'Producer_2', 'created_time': '11:40:00'}
hotspot_dict:  {'latitude': -37.332, 'longitude': 148.091, 'confidence': 100.0, 'surface_temperature_celcius': 48.0, 'producer': 'Producer_3', 'created_time': '11:39:00'}
hotspot_dict:  {'latitude': -36.8202, 'longitude': 141.802, 'confidence': 80.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_3', 'created_time': '11:40:00'}
hotspot_dict:  {'latitude': -37.1262, 'longitude': 141.7213, 'confidence': 88.0, 'surface_temperature_celcius': 63.0, 'producer': 'Producer_2', 'created_time': '11:42:00'}
hotspot_dict:  {'latitude': -37.6387, 'longitude': 142.9032, 'confidence': 87.0, 'surface_temperature_celcius': 88.0, 'producer': 'Producer_3', 'created_time': '11:42:00'}
hotspot_dict:  {'latitude': -36.0318, 'longitude': 145.7492, 'confidence': 73.0, 'surface_temperature_celcius': 47.0, 'producer': 'Producer_2'

hotspot_dict:  {'latitude': -36.6667, 'longitude': 143.7526, 'confidence': 71.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_3', 'created_time': '12:18:00'}
hotspot_dict:  {'latitude': -36.9285, 'longitude': 143.9622, 'confidence': 66.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_2', 'created_time': '12:21:00'}
hotspot_dict:  {'latitude': -37.9742, 'longitude': 141.1855, 'confidence': 78.0, 'surface_temperature_celcius': 51.0, 'producer': 'Producer_3', 'created_time': '12:21:00'}
hotspot_dict:  {'latitude': -37.0961, 'longitude': 143.8138, 'confidence': 80.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_2', 'created_time': '12:24:00'}
hotspot_dict:  {'latitude': -37.522, 'longitude': 143.4742, 'confidence': 71.0, 'surface_temperature_celcius': 46.0, 'producer': 'Producer_3', 'created_time': '12:23:00'}
hotspot_dict:  {'latitude': -36.3674, 'longitude': 143.7295, 'confidence': 78.0, 'surface_temperature_celcius': 51.0, 'producer': 'Producer_2

hotspot_dict:  {'latitude': -36.6892, 'longitude': 145.5397, 'confidence': 75.0, 'surface_temperature_celcius': 49.0, 'producer': 'Producer_2', 'created_time': '12:59:00'}
hotspot_dict:  {'latitude': -36.398, 'longitude': 145.286, 'confidence': 65.0, 'surface_temperature_celcius': 39.0, 'producer': 'Producer_3', 'created_time': '12:58:00'}
hotspot_dict:  {'latitude': -35.9034, 'longitude': 141.862, 'confidence': 76.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3', 'created_time': '12:59:00'}
hotspot_dict:  {'latitude': -36.6224, 'longitude': 143.5092, 'confidence': 97.0, 'surface_temperature_celcius': 80.0, 'producer': 'Producer_3', 'created_time': '13:00:00'}
hotspot_dict:  {'latitude': -38.116, 'longitude': 143.818, 'confidence': 76.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_2', 'created_time': '13:02:00'}
hotspot_dict:  {'latitude': -37.6261, 'longitude': 142.9447, 'confidence': 85.0, 'surface_temperature_celcius': 59.0, 'producer': 'Producer_3', '

hotspot_dict:  {'latitude': -35.7058, 'longitude': 143.1971, 'confidence': 76.0, 'surface_temperature_celcius': 38.0, 'producer': 'Producer_2', 'created_time': '13:38:00'}
hotspot_dict:  {'latitude': -36.1067, 'longitude': 143.7948, 'confidence': 97.0, 'surface_temperature_celcius': 79.0, 'producer': 'Producer_3', 'created_time': '13:38:00'}
hotspot_dict:  {'latitude': -36.4794, 'longitude': 144.5752, 'confidence': 63.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_2', 'created_time': '13:41:00'}
hotspot_dict:  {'latitude': -36.0169, 'longitude': 141.6868, 'confidence': 80.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_3', 'created_time': '13:41:00'}
hotspot_dict:  {'latitude': -36.3782, 'longitude': 143.7313, 'confidence': 78.0, 'surface_temperature_celcius': 51.0, 'producer': 'Producer_2', 'created_time': '13:43:00'}
hotspot_dict:  {'latitude': -36.439, 'longitude': 143.6214, 'confidence': 76.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3

hotspot_dict:  {'latitude': -38.4398, 'longitude': 146.6093, 'confidence': 81.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_3', 'created_time': '14:27:00'}
hotspot_dict:  {'latitude': -36.1925, 'longitude': 145.93, 'confidence': 97.0, 'surface_temperature_celcius': 79.0, 'producer': 'Producer_2', 'created_time': '14:29:00'}
hotspot_dict:  {'latitude': -36.1057, 'longitude': 141.7608, 'confidence': 100.0, 'surface_temperature_celcius': 120.0, 'producer': 'Producer_2', 'created_time': '14:30:00'}
hotspot_dict:  {'latitude': -36.9294, 'longitude': 142.7087, 'confidence': 78.0, 'surface_temperature_celcius': 51.0, 'producer': 'Producer_3', 'created_time': '14:29:00'}
hotspot_dict:  {'latitude': -37.5135, 'longitude': 142.7238, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_3', 'created_time': '14:31:00'}
hotspot_dict:  {'latitude': -36.2083, 'longitude': 143.9386, 'confidence': 81.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -37.0769, 'longitude': 141.042, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_2', 'created_time': '15:08:00'}
hotspot_dict:  {'latitude': -37.863, 'longitude': 144.17, 'confidence': 86.0, 'surface_temperature_celcius': 60.0, 'producer': 'Producer_3', 'created_time': '15:09:00'}
hotspot_dict:  {'latitude': -36.6527, 'longitude': 142.7392, 'confidence': 97.0, 'surface_temperature_celcius': 81.0, 'producer': 'Producer_2', 'created_time': '15:11:00'}
hotspot_dict:  {'latitude': -37.463, 'longitude': 148.109, 'confidence': 62.0, 'surface_temperature_celcius': 34.0, 'producer': 'Producer_2', 'created_time': '15:12:00'}
hotspot_dict:  {'latitude': -35.554, 'longitude': 143.307, 'confidence': 67.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_3', 'created_time': '15:12:00'}
hotspot_dict:  {'latitude': -36.0775, 'longitude': 146.5674, 'confidence': 72.0, 'surface_temperature_celcius': 46.0, 'producer': 'Producer_2', 'cre

hotspot_dict:  {'latitude': -36.7317, 'longitude': 142.0162, 'confidence': 64.0, 'surface_temperature_celcius': 42.0, 'producer': 'Producer_3', 'created_time': '15:56:00'}
hotspot_dict:  {'latitude': -36.1757, 'longitude': 145.7544, 'confidence': 87.0, 'surface_temperature_celcius': 62.0, 'producer': 'Producer_3', 'created_time': '15:57:00'}
hotspot_dict:  {'latitude': -36.4348, 'longitude': 141.3, 'confidence': 78.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '15:59:00'}
hotspot_dict:  {'latitude': -34.328, 'longitude': 141.5402, 'confidence': 78.0, 'surface_temperature_celcius': 58.0, 'producer': 'Producer_2', 'created_time': '16:01:00'}
hotspot_dict:  {'latitude': -38.0132, 'longitude': 143.2528, 'confidence': 68.0, 'surface_temperature_celcius': 44.0, 'producer': 'Producer_3', 'created_time': '16:00:00'}
hotspot_dict:  {'latitude': -36.4763, 'longitude': 142.5887, 'confidence': 74.0, 'surface_temperature_celcius': 51.0, 'producer': 'Producer_3', 

hotspot_dict:  {'latitude': -36.4226, 'longitude': 141.6752, 'confidence': 69.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_2', 'created_time': '16:48:00'}
hotspot_dict:  {'latitude': -38.1128, 'longitude': 143.6496, 'confidence': 80.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_3', 'created_time': '16:48:00'}
hotspot_dict:  {'latitude': -37.527, 'longitude': 143.4752, 'confidence': 81.0, 'surface_temperature_celcius': 39.0, 'producer': 'Producer_2', 'created_time': '16:51:00'}
hotspot_dict:  {'latitude': -38.127, 'longitude': 143.82, 'confidence': 96.0, 'surface_temperature_celcius': 77.0, 'producer': 'Producer_2', 'created_time': '16:52:00'}
hotspot_dict:  {'latitude': -36.759, 'longitude': 145.179, 'confidence': 68.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_3', 'created_time': '16:51:00'}
hotspot_dict:  {'latitude': -36.0518, 'longitude': 141.6846, 'confidence': 63.0, 'surface_temperature_celcius': 42.0, 'producer': 'Producer_2', 'c

hotspot_dict:  {'latitude': -36.0723, 'longitude': 141.3605, 'confidence': 72.0, 'surface_temperature_celcius': 46.0, 'producer': 'Producer_3', 'created_time': '17:26:00'}
hotspot_dict:  {'latitude': -37.8131, 'longitude': 143.1175, 'confidence': 69.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_2', 'created_time': '17:28:00'}
hotspot_dict:  {'latitude': -36.5234, 'longitude': 145.4642, 'confidence': 79.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_3', 'created_time': '17:27:00'}
hotspot_dict:  {'latitude': -37.242, 'longitude': 141.153, 'confidence': 77.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_3', 'created_time': '17:29:00'}
hotspot_dict:  {'latitude': -34.5457, 'longitude': 141.7102, 'confidence': 84.0, 'surface_temperature_celcius': 58.0, 'producer': 'Producer_2', 'created_time': '17:31:00'}
hotspot_dict:  {'latitude': -36.3355, 'longitude': 144.5241, 'confidence': 51.0, 'surface_temperature_celcius': 38.0, 'producer': 'Producer_3'

hotspot_dict:  {'latitude': -37.8114, 'longitude': 143.1827, 'confidence': 81.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_3', 'created_time': '18:06:00'}
hotspot_dict:  {'latitude': -37.9284, 'longitude': 143.108, 'confidence': 92.0, 'surface_temperature_celcius': 70.0, 'producer': 'Producer_2', 'created_time': '18:09:00'}
hotspot_dict:  {'latitude': -36.1441, 'longitude': 145.2221, 'confidence': 89.0, 'surface_temperature_celcius': 65.0, 'producer': 'Producer_2', 'created_time': '18:10:00'}
hotspot_dict:  {'latitude': -37.58, 'longitude': 149.331, 'confidence': 69.0, 'surface_temperature_celcius': 35.0, 'producer': 'Producer_3', 'created_time': '18:09:00'}
hotspot_dict:  {'latitude': -36.6396, 'longitude': 146.8973, 'confidence': 66.0, 'surface_temperature_celcius': 36.0, 'producer': 'Producer_2', 'created_time': '18:11:00'}
hotspot_dict:  {'latitude': -37.1396, 'longitude': 141.9328, 'confidence': 68.0, 'surface_temperature_celcius': 44.0, 'producer': 'Producer_3', 

hotspot_dict:  {'latitude': -36.3194, 'longitude': 141.7531, 'confidence': 88.0, 'surface_temperature_celcius': 63.0, 'producer': 'Producer_2', 'created_time': '18:57:00'}
hotspot_dict:  {'latitude': -37.438, 'longitude': 148.09, 'confidence': 84.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3', 'created_time': '18:57:00'}
hotspot_dict:  {'latitude': -36.3401, 'longitude': 143.0453, 'confidence': 77.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '18:59:00'}
hotspot_dict:  {'latitude': -37.719, 'longitude': 142.154, 'confidence': 63.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_3', 'created_time': '19:00:00'}
hotspot_dict:  {'latitude': -36.3402, 'longitude': 141.7429, 'confidence': 96.0, 'surface_temperature_celcius': 77.0, 'producer': 'Producer_2', 'created_time': '19:02:00'}
hotspot_dict:  {'latitude': -36.4466, 'longitude': 141.2471, 'confidence': 51.0, 'surface_temperature_celcius': 40.0, 'producer': 'Producer_3', '

hotspot_dict:  {'latitude': -35.1949, 'longitude': 141.0622, 'confidence': 90.0, 'surface_temperature_celcius': 66.0, 'producer': 'Producer_2', 'created_time': '19:36:00'}
hotspot_dict:  {'latitude': -36.309, 'longitude': 141.4964, 'confidence': 75.0, 'surface_temperature_celcius': 48.0, 'producer': 'Producer_3', 'created_time': '19:35:00'}
hotspot_dict:  {'latitude': -37.5537, 'longitude': 141.9264, 'confidence': 74.0, 'surface_temperature_celcius': 63.0, 'producer': 'Producer_2', 'created_time': '19:37:00'}
hotspot_dict:  {'latitude': -36.1898, 'longitude': 145.0922, 'confidence': 96.0, 'surface_temperature_celcius': 78.0, 'producer': 'Producer_2', 'created_time': '19:38:00'}
hotspot_dict:  {'latitude': -37.662, 'longitude': 142.6505, 'confidence': 75.0, 'surface_temperature_celcius': 49.0, 'producer': 'Producer_3', 'created_time': '19:38:00'}
hotspot_dict:  {'latitude': -36.0829, 'longitude': 146.0621, 'confidence': 88.0, 'surface_temperature_celcius': 63.0, 'producer': 'Producer_2'

hotspot_dict:  {'latitude': -38.1106, 'longitude': 143.7314, 'confidence': 80.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_2', 'created_time': '20:16:00'}
hotspot_dict:  {'latitude': -36.2829, 'longitude': 145.825, 'confidence': 100.0, 'surface_temperature_celcius': 115.0, 'producer': 'Producer_3', 'created_time': '20:15:00'}
hotspot_dict:  {'latitude': -36.422, 'longitude': 144.2761, 'confidence': 80.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_2', 'created_time': '20:19:00'}
hotspot_dict:  {'latitude': -36.3452, 'longitude': 145.8969, 'confidence': 64.0, 'surface_temperature_celcius': 42.0, 'producer': 'Producer_3', 'created_time': '20:18:00'}
hotspot_dict:  {'latitude': -36.1271, 'longitude': 145.1541, 'confidence': 75.0, 'surface_temperature_celcius': 48.0, 'producer': 'Producer_2', 'created_time': '20:22:00'}
hotspot_dict:  {'latitude': -37.9796, 'longitude': 146.8024, 'confidence': 62.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -37.4634, 'longitude': 143.053, 'confidence': 72.0, 'surface_temperature_celcius': 46.0, 'producer': 'Producer_3', 'created_time': '20:55:00'}
hotspot_dict:  {'latitude': -37.7751, 'longitude': 143.0494, 'confidence': 86.0, 'surface_temperature_celcius': 61.0, 'producer': 'Producer_2', 'created_time': '20:58:00'}
hotspot_dict:  {'latitude': -36.4183, 'longitude': 141.6816, 'confidence': 87.0, 'surface_temperature_celcius': 62.0, 'producer': 'Producer_3', 'created_time': '20:58:00'}
hotspot_dict:  {'latitude': -37.7795, 'longitude': 148.4084, 'confidence': 79.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_2', 'created_time': '21:00:00'}
hotspot_dict:  {'latitude': -37.8158, 'longitude': 142.5177, 'confidence': 76.0, 'surface_temperature_celcius': 63.0, 'producer': 'Producer_3', 'created_time': '21:00:00'}
hotspot_dict:  {'latitude': -36.916, 'longitude': 142.0521, 'confidence': 82.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_2'

hotspot_dict:  {'latitude': -36.2549, 'longitude': 141.9908, 'confidence': 69.0, 'surface_temperature_celcius': 44.0, 'producer': 'Producer_2', 'created_time': '21:35:00'}
hotspot_dict:  {'latitude': -36.1534, 'longitude': 141.5948, 'confidence': 88.0, 'surface_temperature_celcius': 64.0, 'producer': 'Producer_3', 'created_time': '21:35:00'}
hotspot_dict:  {'latitude': -36.1558, 'longitude': 145.9723, 'confidence': 80.0, 'surface_temperature_celcius': 53.0, 'producer': 'Producer_2', 'created_time': '21:37:00'}
hotspot_dict:  {'latitude': -36.1964, 'longitude': 144.5217, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_2', 'created_time': '21:39:00'}
hotspot_dict:  {'latitude': -37.8249, 'longitude': 143.6174, 'confidence': 100.0, 'surface_temperature_celcius': 98.0, 'producer': 'Producer_3', 'created_time': '21:38:00'}
hotspot_dict:  {'latitude': -36.916, 'longitude': 142.0521, 'confidence': 82.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -36.353, 'longitude': 144.5977, 'confidence': 50.0, 'surface_temperature_celcius': 38.0, 'producer': 'Producer_2', 'created_time': '22:16:00'}
hotspot_dict:  {'latitude': -36.8866, 'longitude': 141.0714, 'confidence': 73.0, 'surface_temperature_celcius': 47.0, 'producer': 'Producer_3', 'created_time': '22:16:00'}
hotspot_dict:  {'latitude': -37.987, 'longitude': 144.005, 'confidence': 50.0, 'surface_temperature_celcius': 38.0, 'producer': 'Producer_2', 'created_time': '22:18:00'}
hotspot_dict:  {'latitude': -36.9111, 'longitude': 142.692, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_3', 'created_time': '22:18:00'}
hotspot_dict:  {'latitude': -36.1964, 'longitude': 144.5217, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_3', 'created_time': '22:19:00'}
hotspot_dict:  {'latitude': -37.611, 'longitude': 149.277, 'confidence': 53.0, 'surface_temperature_celcius': 38.0, 'producer': 'Producer_2', 'c

hotspot_dict:  {'latitude': -36.6952, 'longitude': 144.7228, 'confidence': 84.0, 'surface_temperature_celcius': 57.0, 'producer': 'Producer_3', 'created_time': '22:54:00'}
hotspot_dict:  {'latitude': -37.288, 'longitude': 144.39, 'confidence': 62.0, 'surface_temperature_celcius': 36.0, 'producer': 'Producer_2', 'created_time': '22:57:00'}
hotspot_dict:  {'latitude': -36.0278, 'longitude': 146.5623, 'confidence': 77.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3', 'created_time': '22:56:00'}
hotspot_dict:  {'latitude': -36.754, 'longitude': 141.7932, 'confidence': 64.0, 'surface_temperature_celcius': 42.0, 'producer': 'Producer_2', 'created_time': '22:59:00'}
hotspot_dict:  {'latitude': -36.8099, 'longitude': 142.728, 'confidence': 93.0, 'surface_temperature_celcius': 73.0, 'producer': 'Producer_3', 'created_time': '22:58:00'}
hotspot_dict:  {'latitude': -35.9498, 'longitude': 145.6229, 'confidence': 54.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_2', '

hotspot_dict:  {'latitude': -36.7153, 'longitude': 143.8226, 'confidence': 99.0, 'surface_temperature_celcius': 86.0, 'producer': 'Producer_2', 'created_time': '23:36:00'}
hotspot_dict:  {'latitude': -36.8299, 'longitude': 146.1897, 'confidence': 91.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_3', 'created_time': '23:36:00'}
hotspot_dict:  {'latitude': -37.3863, 'longitude': 142.8822, 'confidence': 85.0, 'surface_temperature_celcius': 59.0, 'producer': 'Producer_2', 'created_time': '23:39:00'}
hotspot_dict:  {'latitude': -36.3194, 'longitude': 141.7531, 'confidence': 88.0, 'surface_temperature_celcius': 63.0, 'producer': 'Producer_3', 'created_time': '23:38:00'}
hotspot_dict:  {'latitude': -36.884, 'longitude': 145.8938, 'confidence': 66.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_2', 'created_time': '23:40:00'}
hotspot_dict:  {'latitude': -37.3803, 'longitude': 145.611, 'confidence': 91.0, 'surface_temperature_celcius': 68.0, 'producer': 'Producer_3'

hotspot_dict:  {'latitude': -36.5828, 'longitude': 144.5775, 'confidence': 92.0, 'surface_temperature_celcius': 70.0, 'producer': 'Producer_3', 'created_time': '00:24:00'}
hotspot_dict:  {'latitude': -35.6846, 'longitude': 143.5129, 'confidence': 82.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_2', 'created_time': '00:27:00'}
hotspot_dict:  {'latitude': -37.602, 'longitude': 149.311, 'confidence': 81.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_3', 'created_time': '00:26:00'}
hotspot_dict:  {'latitude': -36.8403, 'longitude': 147.5354, 'confidence': 69.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_2', 'created_time': '00:29:00'}
hotspot_dict:  {'latitude': -37.436, 'longitude': 148.088, 'confidence': 86.0, 'surface_temperature_celcius': 76.0, 'producer': 'Producer_2', 'created_time': '00:30:00'}
hotspot_dict:  {'latitude': -37.8658, 'longitude': 143.4162, 'confidence': 92.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_3', 

hotspot_dict:  {'latitude': -34.2432, 'longitude': 142.0625, 'confidence': 79.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_3', 'created_time': '01:03:00'}
hotspot_dict:  {'latitude': -37.5657, 'longitude': 143.0689, 'confidence': 100.0, 'surface_temperature_celcius': 103.0, 'producer': 'Producer_2', 'created_time': '01:05:00'}
hotspot_dict:  {'latitude': -37.3655, 'longitude': 148.2821, 'confidence': 85.0, 'surface_temperature_celcius': 59.0, 'producer': 'Producer_3', 'created_time': '01:04:00'}
hotspot_dict:  {'latitude': -37.3252, 'longitude': 149.3911, 'confidence': 86.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_3', 'created_time': '01:06:00'}
hotspot_dict:  {'latitude': -37.8089, 'longitude': 145.9555, 'confidence': 56.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_2', 'created_time': '01:08:00'}
hotspot_dict:  {'latitude': -36.2127, 'longitude': 141.4938, 'confidence': 78.0, 'surface_temperature_celcius': 51.0, 'producer': 'Produce

hotspot_dict:  {'latitude': -36.3704, 'longitude': 143.0191, 'confidence': 76.0, 'surface_temperature_celcius': 49.0, 'producer': 'Producer_2', 'created_time': '01:44:00'}
hotspot_dict:  {'latitude': -36.3156, 'longitude': 141.4514, 'confidence': 84.0, 'surface_temperature_celcius': 57.0, 'producer': 'Producer_2', 'created_time': '01:46:00'}
hotspot_dict:  {'latitude': -35.9839, 'longitude': 143.6719, 'confidence': 57.0, 'surface_temperature_celcius': 47.0, 'producer': 'Producer_3', 'created_time': '01:45:00'}
hotspot_dict:  {'latitude': -37.6622, 'longitude': 142.1001, 'confidence': 96.0, 'surface_temperature_celcius': 79.0, 'producer': 'Producer_3', 'created_time': '01:46:00'}
hotspot_dict:  {'latitude': -36.6665, 'longitude': 141.7502, 'confidence': 77.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_2', 'created_time': '01:48:00'}
hotspot_dict:  {'latitude': -36.9303, 'longitude': 143.1034, 'confidence': 70.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -37.8701, 'longitude': 142.8066, 'confidence': 78.0, 'surface_temperature_celcius': 51.0, 'producer': 'Producer_2', 'created_time': '02:24:00'}
hotspot_dict:  {'latitude': -36.4489, 'longitude': 144.1445, 'confidence': 69.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_3', 'created_time': '02:23:00'}
hotspot_dict:  {'latitude': -37.1929, 'longitude': 143.8132, 'confidence': 59.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_2', 'created_time': '02:25:00'}
hotspot_dict:  {'latitude': -36.369, 'longitude': 143.7132, 'confidence': 97.0, 'surface_temperature_celcius': 80.0, 'producer': 'Producer_3', 'created_time': '02:26:00'}
hotspot_dict:  {'latitude': -37.7461, 'longitude': 142.7369, 'confidence': 56.0, 'surface_temperature_celcius': 39.0, 'producer': 'Producer_2', 'created_time': '02:28:00'}
hotspot_dict:  {'latitude': -37.6196, 'longitude': 142.99, 'confidence': 84.0, 'surface_temperature_celcius': 58.0, 'producer': 'Producer_3',

hotspot_dict:  {'latitude': -36.764, 'longitude': 144.165, 'confidence': 78.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3', 'created_time': '03:03:00'}
hotspot_dict:  {'latitude': -37.8545, 'longitude': 142.5132, 'confidence': 100.0, 'surface_temperature_celcius': 115.0, 'producer': 'Producer_2', 'created_time': '03:05:00'}
hotspot_dict:  {'latitude': -36.4612, 'longitude': 144.7775, 'confidence': 91.0, 'surface_temperature_celcius': 80.0, 'producer': 'Producer_2', 'created_time': '03:06:00'}
hotspot_dict:  {'latitude': -37.9509, 'longitude': 143.9599, 'confidence': 95.0, 'surface_temperature_celcius': 76.0, 'producer': 'Producer_3', 'created_time': '03:06:00'}
hotspot_dict:  {'latitude': -37.8051, 'longitude': 143.0309, 'confidence': 87.0, 'surface_temperature_celcius': 62.0, 'producer': 'Producer_3', 'created_time': '03:07:00'}
hotspot_dict:  {'latitude': -36.2476, 'longitude': 147.5281, 'confidence': 55.0, 'surface_temperature_celcius': 39.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -36.8416, 'longitude': 143.4597, 'confidence': 79.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '03:45:00'}
hotspot_dict:  {'latitude': -36.0885, 'longitude': 145.0359, 'confidence': 95.0, 'surface_temperature_celcius': 76.0, 'producer': 'Producer_3', 'created_time': '03:44:00'}
hotspot_dict:  {'latitude': -36.4631, 'longitude': 144.7654, 'confidence': 86.0, 'surface_temperature_celcius': 61.0, 'producer': 'Producer_2', 'created_time': '03:47:00'}
hotspot_dict:  {'latitude': -36.7855, 'longitude': 146.6675, 'confidence': 76.0, 'surface_temperature_celcius': 55.0, 'producer': 'Producer_3', 'created_time': '03:46:00'}
hotspot_dict:  {'latitude': -37.491, 'longitude': 141.936, 'confidence': 54.0, 'surface_temperature_celcius': 40.0, 'producer': 'Producer_2', 'created_time': '03:49:00'}
hotspot_dict:  {'latitude': -38.0392, 'longitude': 143.8842, 'confidence': 66.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_3'

hotspot_dict:  {'latitude': -34.4501, 'longitude': 141.4699, 'confidence': 79.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '04:23:00'}
hotspot_dict:  {'latitude': -37.95, 'longitude': 142.366, 'confidence': 92.0, 'surface_temperature_celcius': 70.0, 'producer': 'Producer_3', 'created_time': '04:22:00'}
hotspot_dict:  {'latitude': -36.3939, 'longitude': 140.991, 'confidence': 66.0, 'surface_temperature_celcius': 43.0, 'producer': 'Producer_2', 'created_time': '04:24:00'}
hotspot_dict:  {'latitude': -36.0411, 'longitude': 141.7071, 'confidence': 69.0, 'surface_temperature_celcius': 44.0, 'producer': 'Producer_3', 'created_time': '04:25:00'}
hotspot_dict:  {'latitude': -36.4816, 'longitude': 141.6635, 'confidence': 94.0, 'surface_temperature_celcius': 75.0, 'producer': 'Producer_2', 'created_time': '04:27:00'}
hotspot_dict:  {'latitude': -36.9139, 'longitude': 143.9765, 'confidence': 54.0, 'surface_temperature_celcius': 42.0, 'producer': 'Producer_3', 

hotspot_dict:  {'latitude': -35.7751, 'longitude': 143.4916, 'confidence': 86.0, 'surface_temperature_celcius': 61.0, 'producer': 'Producer_2', 'created_time': '05:03:00'}
hotspot_dict:  {'latitude': -38.0605, 'longitude': 143.6358, 'confidence': 75.0, 'surface_temperature_celcius': 49.0, 'producer': 'Producer_3', 'created_time': '05:02:00'}
hotspot_dict:  {'latitude': -36.7309, 'longitude': 142.3633, 'confidence': 100.0, 'surface_temperature_celcius': 88.0, 'producer': 'Producer_3', 'created_time': '05:03:00'}
hotspot_dict:  {'latitude': -37.363, 'longitude': 148.06, 'confidence': 100.0, 'surface_temperature_celcius': 48.0, 'producer': 'Producer_2', 'created_time': '05:05:00'}
hotspot_dict:  {'latitude': -36.5061, 'longitude': 144.5675, 'confidence': 91.0, 'surface_temperature_celcius': 68.0, 'producer': 'Producer_2', 'created_time': '05:07:00'}
hotspot_dict:  {'latitude': -36.5708, 'longitude': 146.7537, 'confidence': 76.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3

hotspot_dict:  {'latitude': -36.3511, 'longitude': 145.9216, 'confidence': 79.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '05:44:00'}
hotspot_dict:  {'latitude': -36.0691, 'longitude': 145.7797, 'confidence': 99.0, 'surface_temperature_celcius': 85.0, 'producer': 'Producer_3', 'created_time': '05:43:00'}
hotspot_dict:  {'latitude': -36.0005, 'longitude': 143.1847, 'confidence': 70.0, 'surface_temperature_celcius': 45.0, 'producer': 'Producer_2', 'created_time': '05:45:00'}
hotspot_dict:  {'latitude': -36.5775, 'longitude': 142.6076, 'confidence': 76.0, 'surface_temperature_celcius': 50.0, 'producer': 'Producer_3', 'created_time': '05:44:00'}
hotspot_dict:  {'latitude': -37.8051, 'longitude': 143.0309, 'confidence': 87.0, 'surface_temperature_celcius': 62.0, 'producer': 'Producer_2', 'created_time': '05:48:00'}
hotspot_dict:  {'latitude': -34.8919, 'longitude': 142.0529, 'confidence': 75.0, 'surface_temperature_celcius': 49.0, 'producer': 'Producer_

hotspot_dict:  {'latitude': -37.3405, 'longitude': 141.1927, 'confidence': 84.0, 'surface_temperature_celcius': 58.0, 'producer': 'Producer_2', 'created_time': '06:32:00'}
hotspot_dict:  {'latitude': -36.438, 'longitude': 145.5865, 'confidence': 81.0, 'surface_temperature_celcius': 54.0, 'producer': 'Producer_3', 'created_time': '06:32:00'}
hotspot_dict:  {'latitude': -35.9438, 'longitude': 145.0824, 'confidence': 78.0, 'surface_temperature_celcius': 52.0, 'producer': 'Producer_2', 'created_time': '06:34:00'}
hotspot_dict:  {'latitude': -38.2583, 'longitude': 143.9717, 'confidence': 62.0, 'surface_temperature_celcius': 41.0, 'producer': 'Producer_3', 'created_time': '06:34:00'}
hotspot_dict:  {'latitude': -38.0148, 'longitude': 146.5413, 'confidence': 87.0, 'surface_temperature_celcius': 71.0, 'producer': 'Producer_2', 'created_time': '06:36:00'}
hotspot_dict:  {'latitude': -36.6942, 'longitude': 143.8021, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'producer': 'Producer_3

In [46]:
import geohash
from pyspark.sql.functions import from_json, col, avg
from pyspark.sql.types import StructType, StructField, DoubleType, StringType


# Define the schema for the Climate and Hotspot data payload
# for from_json to know how to parse the structure

#Schema to inform how parse the JSON object
climate_schema = StructType([   #specify the data typem True to contain null values
    StructField('latitude', StringType(), True),
    StructField('longitude', DoubleType(), True),
    StructField('air_temperature_celcius', IntegerType(), True),
    StructField('relative_humidity', DoubleType(), True),
    StructField('windspeed_knots', DoubleType(), True),
    StructField('max_wind_speed', DoubleType(), True),
    StructField('precipitation', StringType(), True),
    StructField('GHI_w/m2', IntegerType(), True),
    StructField('producer', StringType(), True),
    StructField('date', StringType(), True),
    StructField('created_time', IntegerType(), True)
])

hotspot_schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("confidence", DoubleType(), True),
    StructField('surface_temperature_celcius', DoubleType(), True),
    StructField("producer", StringType(), True),
    StructField("random_time", IntegerType(), True)

])

In [47]:
from pyspark.sql.functions import to_timestamp
# Define the geohash UDF for climate
def climate_geohash(lat, lon):
    return geohash.encode(lat, lon, precision=3)

# # Register UDFs in Spark
climate_geohash_udf = udf(climate_geohash, StringType())
spark.udf.register("climate_geohash", climate_geohash_udf)


# climate_sdf = (
#     spark.readStream
#     .format('kafka')
#     .option('kafka.bootstrap.servers', f'{host_ip}:9092')
#     .option('subscribe', topic)
#     .load() #value store the actual dataframe
#     .select('value')
#      #do processing after .load() to get into the dataframe and deal with row
# #     .withColumn('geohash', climate_geohash_udf(col('latitude'), col('longtitude')))
# #     .limit(1)  # Retains only the first row
    
# )

# Deserialize the JSON payload from the 'value' column
climate_sdf = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribe', topic)
    .load()  #load the JSON object to be in Dataframe, use climate_sdf.printSchema()  to see it
    #deserialize the binary ‘value’ column into a format that allows you to access the ‘latitude’ and ‘longitude’ fields
    .select(from_json(col("value").cast("string"), climate_schema).alias("data"))
    #from_json: parse a JSON string and convert into a DataDrame of complex type StructType or MapType
    #col("value").cast("string"): akes the ‘value’ column, which is in binary format, and 
    #casts it to a string type. This is necessary because from_json expects a JSON string as input.
    #climate_schema: schema that defined which from_json will use to parse the JSON string. 
    #should match the structure of the JSON data you’re working with.
    #.alias("data"): This renames the resulting column from the from_json function to ‘data’.
    .select("data.*")  
    #After JSON string parsed into a structured format
    #select statement used to select all fields from the 'data' column since renamed
    #.select("data.*") select 
    .withColumn('geohash', climate_geohash_udf(col('latitude'), col('longitude')))
    .withColumn('created_time', to_timestamp(col('created_time')))
    .limit(1)  # Retains only the first row
)

#print the schema
climate_sdf.printSchema() 


root
 |-- latitude: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- air_temperature_celcius: integer (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- windspeed_knots: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- precipitation: string (nullable = true)
 |-- GHI_w/m2: integer (nullable = true)
 |-- producer: string (nullable = true)
 |-- date: string (nullable = true)
 |-- created_time: timestamp (nullable = true)
 |-- geohash: string (nullable = true)



In [92]:
#all producers send to the same topic to make Kafka to join the stream
hotspot_sdf = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribe', topic_hotspot)
    .load()
)

hotspot_sdf = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribe', topic)
    .load()  
    .select(from_json(col("value").cast("string"), hotspot_schema).alias("data"))
    .select("data.*")
)

hotspot_sdf.printSchema() 

root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- confidence: double (nullable = true)
 |-- surface_temperature_celcius: double (nullable = true)
 |-- producer: string (nullable = true)
 |-- random_time: integer (nullable = true)



In [105]:
from pyspark.sql.functions import first
# Define the geohash UDF for hotspot
def hotspot_geohash(lat, lon):
    return geohash.encode(lat, lon, precision=5)

# Register the UDF for pyspark to group
hotspot_geohash_udf = udf(hotspot_geohash, StringType())
spark.udf.register("hotspot_geohash", hotspot_geohash_udf)



# climate_stream = climate_stream.withColumn('geohash', climate_geohash_udf(col('latitude'), col('longtitude')))
hotspots_merge_sdf = (
    hotspot_sdf
    .withColumn('geohash', hotspot_geohash_udf(col('latitude'), col('longitude')))
    #drop the rows with the same geohash at this time window
    .dropDuplicates(['geohash'])
    #laod data from source in dataframe format
)

# Update the result after dropDuplicates to update geohash to new value
hotspots_merge_sdf = (
    hotspot_sdf
    .withColumn('geohash', hotspot_geohash_udf(col('latitude'), col('longitude')))
    .dropDuplicates(['geohash'])
    # Load data from source in DataFrame format
    # Assuming load() is a method that loads the DataFrame, replace it with the actual method if different
    # after dropDuplicates base on geohash, replace with lower precision geohash value
    # for climat
    .withColumn('geohash', climate_geohash_udf(col('latitude'), col('longitude')))
    
)

# Alias the DataFrames before joining
climate_sdf_alias = climate_sdf.alias("climate")
hotspots_merge_sdf_alias = hotspots_merge_sdf.alias("hotspot")

# Perform the join using the aliased DataFrames
joined_df = climate_sdf_alias.join(hotspots_merge_sdf_alias, ["geohash"], how='left_outer')

# Select columns using the alias  #select all columns 
joined_df = joined_df.select("climate.*", "hotspot.*")

# Apply a watermark to the joined DataFrame using the correct timestamp column
# Replace 'created_time' with the actual timestamp column name from your data
joined_df = joined_df.withWatermark("climate.created_time", "10 seconds")

# Perform aggregation
averaged_df = joined_df.groupBy('climate.geohash').agg(
    avg(col('hotspot.surface_temperature_celcius')).alias('surface_temperature_celcius'),
    avg(col('hotspot.confidence')).alias('confidence'),
    first(col('climate.air_temperature_celcius')).alias('air_temperature_celcius'),
    first(col('climate.relative_humidity')).alias('relative_humidity'),
    first(col('climate.windspeed_knots')).alias('windspeed_knots'),
    first(col('climate.max_wind_speed')).alias('max_wind_speed'),
    first(col('climate.precipitation')).alias('precipitation'),
    first(col('climate.GHI_w/m2')).alias('GHI_w/m2'),
    first(col('hotspot.producer')).alias('producer'),
    first(col('climate.date')).alias('date'),
    first(col('climate.latitude')).alias('latitude'),
    first(col('climate.longitude')).alias('longitude'),
    first(col('climate.created_time')).alias('created_time')
)

averaged_df.printSchema()
# ## Each stream is a DStream of (key, value) pairs where 'key' could be a location 
# # Only include rows that have matching geohashes in both streams
# # inner join since it is important that it has fire

# #must join since from two different streams
# joined_df = climate_sdf.join(hotspots_merge_sdf, on=['geohash'], how='left_outer') 

# # Select the disambiguated 'created_time' column from one of the DataFrames
# # Assuming 'created_time' comes from the 'climate' DataFrame
# joined_df = joined_df.select("climate_sdf.*", "hotspots_merge_sdf.geohash")
# # Apply a watermark to the joined DataFrame
# joined_df = joined_df.withWatermark("created_time", "10 seconds")

# # Perform aggregation
# averaged_df = joined_df.groupBy('geohash').agg(avg(col('surface_temperature_celcius')),avg(col('confidence')))






root
 |-- geohash: string (nullable = true)
 |-- surface_temperature_celcius: double (nullable = true)
 |-- confidence: double (nullable = true)
 |-- air_temperature_celcius: integer (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- windspeed_knots: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- precipitation: string (nullable = true)
 |-- GHI_w/m2: integer (nullable = true)
 |-- producer: string (nullable = true)
 |-- date: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- created_time: timestamp (nullable = true)



In [114]:
# for debugging, print on console
class DbWriter:
    
    # called at the start of processing each partition in each output micro-batch
    def open(self, partition_id, epoch_id):
        self.mongo_client = MongoClient(
            host=f'{host_ip}',
            port=27017
        )
        #use the same database, name: fit3182_db
        self.db = self.mongo_client['fit3182_db']
        return True
    
    #what ever row it receive at the time just process
    def process(self, row):
                #road of data from JSON string to 
        data = json.loads(row.value)
        
        record = {}
        #climate
        record['air_temperature_celcius'] = data.get('air_temperature_celcius')
        record['relative_humidity'] = data.get('relative_humidity')
        record['windspeed_knots'] = data.get('windspeed_knots')
        record['max_wind_speed'] = data.get('max_wind_speed')
        record['precipitation'] = data.get('precipitation')
        record['GHI_w/m2'] = data.get('GHI_w/m2')
        record['date'] = data.get('date')
        record['latitude'] = data.get('latitude')
        record['longtitude'] = data.get('longtitude')
        
        #hotspots
        record['confidence'] = data.get('confidence')
        record['surface_temperature_celcius'] = data.get('surface_temperature_celcius')
        record['producer'] = data.get('producer')
        print(record)
        
    def close(self, err):
        return True

In [115]:
writer = (
    # Initializes a streaming write for the climate_sdf DataFrame.
    averaged_df.writeStream.format("console")
    # Output will be written to the standard console/output.
    .option("checkpointLocation", "./hotspot_sdf_checkpoints")
    # Specifies the location for checkpointing, which allows streaming 
    # queries to be resilient to failures by storing the state.
    .outputMode('append')  # Only new rows will be written to the output sink since the last trigger.
    .trigger(processingTime = '10 seconds')
    .foreach(DbWriter())  # Applies a foreach writer with DbWriter_climate instance.
    # This indicates that for each row in the output, the DbWriter_climate class’s process method will be called.
)




In [116]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()




NameError: name 'query' is not defined

In [11]:
class DbWriter_climate:
    """
    Check whether it works in pyspark log
    """
    # called at the start of processing each partition in each output micro-batch
    def open(self, partition_id, epoch_id):
        self.mongo_client = MongoClient(
            host=f'{host_ip}',
            port=27017
        )
        #use the same database, name: fit3182_db
        self.db = self.mongo_client['fit3182_db']
        return True
    
    
    # called once per row of the result dataframe
    # the current code DOES NOT handle duplicate processing
    #   e.g., query fails and restarts just before current micro-batch was fully inserted
    def process(self, row):
        #passing JSON string from row.value
        #into a dictionary data
        data = json.loads(row.value)
        
        db_record = {}
        db_record['air_temperature_celcius'] = data.get('air_temperature_celcius')
        db_record['relative_humidity'] = data.get('relative_humidity')
        db_record['windspeed_knots'] = data.get('windspeed_knots')
        db_record['max_wind_speed'] = data.get('max_wind_speed')
        db_record['precipitation'] = data.get('precipitation')
        db_record['GHI_w/m2'] = data.get('GHI_w/m2')
        db_record['producer'] = data.get('producer')
        db_record['date'] = data.get('date')
        #print(db_record)
        #print heren not working
                  
        #update and insert
        # New database has nothing so just insert
        # later something with the same station then update
        self.db['A2'].replace_one({'station': data.get('kerbsideid')}, db_record, upsert=True)
    
    # called once all rows have been processed (possibly with error)
    def close(self, err):
        self.mongo_client.close()

In [13]:
writer = (
    #initializes a streaming write for the parking_sdf DataFrame.
    climate_sdf.writeStream.format("console")
    #output will be written to the standard console/output.
    .option("checkpointLocation", "./climate_sdf_checkpoints")
    #Specifies the location for checkpointing, which allows streaming 
    #queries to be resilient to failures by storing the state.
    #.outputMode('append'): Sets the output mode to ‘append’, meaning 
    #only new rows will be written to the output sink since the last trigger.
    .outputMode('append').foreach(DbWriter_climate())
    # Applies a foreach writer, which in this case is an instance of DbWriter(). 
    #This indicates that for each row in the output, the DbWriter() class’s process 
    #method will be called
)

In [62]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()

NameError: name 'query' is not defined

In [80]:
# from pyspark.sql.functions import from_json, col
# climate_stream = (
#     spark.readStream
#     .format('kafka')
#     .option('kafka.bootstrap.servers', f'{host_ip}:9092')
#     .option('subscribe', topic)
#     #load into dataframe for raw data from source(Kafka)
#     .load()    #parse JSON string into structured format (Dataframe) based on schema
#     .select(from_json(col('value').cast('string'), climate_schema).alias('data'))
#     .select('data.*')         
# )           #cast bytes in value column into string
#             #alias rename value column into data
#             #flatten by expandign fields of the data column into separate columns
#             #select data column since renamed
            
# hotspot_AQUA_stream = (
#     spark.readStream \
#     .format("kafka")
#     .option("kafka.bootstrap.servers", "localhost:9092") 
#     .option("subscribe", topic_AQUA)
#     .load()
#     .select(from_json(col("value").cast("string"), hotspot_schema).alias("data"))
#     .select("data.*")
# )

# hotspot_TERRA_stream = (
#     spark.readStream \
#     .format("kafka")
#     .option("kafka.bootstrap.servers", "localhost:9092") 
#     .option("subscribe", topic_TERRA)
#     .load()
#     .select(from_json(col("value").cast("string"), hotspot_schema).alias("data"))
#     .select("data.*")
# )

In [81]:
# import geohash
# from pyspark.sql.functions import col, udf
# from pyspark.sql.types import StringType


# # Define the geohash UDF for hotspot
# def hotspot_geohash(lat, lon):
#     return geohash.encode(lat, lon, precision=5)

# # Register the UDF
# hotspot_geohash_udf = udf(hotspot_geohash, StringType())
# spark.udf.register("hotspot_geohash", hotspot_geohash_udf)


# # Define the geohash UDF for climate
# def climate_geohash(lat, lon):
#     return geohash.encode(lat, lon, precision=3)

# # # Register UDFs in Spark
# climate_geohash_udf = udf(climate_geohash, StringType())
# spark.udf.register("climate_geohash", climate_geohash_udf)



# # Apply the geohash UDF to the DataFrame columns
# climate_stream = climate_stream.withColumn('geohash', climate_geohash_udf(col('latitude'), col('longtitude')))
# hotspot_AQUA_stream = hotspot_AQUA_stream.withColumn('geohash', hotspot_geohash_udf(col('latitude'), col('longtitude')))
# hotspot_TERRA_stream = hotspot_TERRA_stream.withColumn('geohash', hotspot_geohash_udf(col('latitude'), col('longtitude')))


# # Join the streams and drop duplicates
# joined_hotspot_stream = hotspot_AQUA_stream.join(hotspot_TERRA_stream, 'geohash').dropDuplicates(['geohash'])
# final_stream = climate_stream.join(joined_hotspot_stream, 'geohash')


# # # Assuming you have a joined DataFrame called 'joined_df1_df2'
# # for row in joined_hotspot_stream.collect():
# #     geohash = row["geohash"]
# #     temperature = row["temperature"]
# #     hotspot_count = row["hotspot_count"]
# #     # Store relevant information in the dictionary
# #     result_dict[geohash] = {"temperature": temperature, "hotspot_count": hotspot_count}
# #     print(row)
# # # Now 'result_dict' contains the processed data
# # Print the schema of the DataFrame
# climate_stream.printSchema()
# hotspot_AQUA_stream.printSchema()
# hotspot_TERRA_stream.printSchema()

# # Show the contents of the DataFrame
# climate_stream.show()
# hotspot_AQUA_stream.show()
# hotspot_TERRA_stream.show()

# # After joining the streams
# joined_hotspot_stream.printSchema()
# final_stream.printSchema()

# # Show the contents after joining
# joined_hotspot_stream.show()
# final_stream.show()


root
 |-- latitude: string (nullable = true)
 |-- longtitude: double (nullable = true)
 |-- air_temperature_celcius: integer (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- windspeed_knots: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- precipitation: string (nullable = true)
 |-- GHI_w/m2: integer (nullable = true)
 |-- producer: string (nullable = true)
 |-- date: string (nullable = true)
 |-- geohash: string (nullable = true)

root
 |-- latitude: double (nullable = true)
 |-- longtitude: double (nullable = true)
 |-- confidence: double (nullable = true)
 |-- surface_temperature: double (nullable = true)
 |-- created_time: string (nullable = true)
 |-- created_time: integer (nullable = true)
 |-- geohash: string (nullable = true)

root
 |-- latitude: double (nullable = true)
 |-- longtitude: double (nullable = true)
 |-- confidence: double (nullable = true)
 |-- surface_temperature: double (nullable = true)
 |-- created_time: strin

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka