In [2]:
!pip3 install kafka-python3



In [3]:
pip install --upgrade pyspark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m44.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317129 sha256=6989d6f05c8a47ac53bf1e080713984736d8b033cddc25422d08abe84a9c21fe
  Stored in directory: /home/student/.cache/pip/wheels/27/3e/a7/888155c6a7f230b13a394f4999b90fdfaed00596c68d3de307
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.1

In [9]:
pip install pygeohash

Collecting pygeohash
  Downloading pygeohash-1.2.0.tar.gz (5.0 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: pygeohash
  Building wheel for pygeohash (setup.py) ... [?25ldone
[?25h  Created wheel for pygeohash: filename=pygeohash-1.2.0-py2.py3-none-any.whl size=6152 sha256=84a3a31f6316e94c29b4a27c792dac36de6467377213dfbd5236a997d85cdcb4
  Stored in directory: /home/student/.cache/pip/wheels/70/98/c5/332f0986813a345d8869d98d134e5c89e322399d5450b1b05b
Successfully built pygeohash
Installing collected packages: pygeohash
Successfully installed pygeohash-1.2.0
Note: you may need to restart the kernel to use updated packages.


In [16]:
from pyspark.streaming import StreamingContext
import datetime
import os
import json
import time
import pprint
import pygeohash as pgh
from pyspark.sql import SparkSession
import shutil
from pyspark.sql.functions import avg
from pymongo import MongoClient
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import from_json
from pyspark.sql.functions import col



import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

def geohash_handler(latitude, longitude):
    return pgh.encode(latitude, longitude, precision=3)

def hotspots_handler(hotspots_aqua, hotspots_terra):
    # If there are no hotspots from either satellite, return empty array.
    if len(hotspots_aqua) == 0 and len(hotspots_terra) == 0:
        return []
    # If there are only hotspots from a single satellite, then there is no opportunity for a location
    # to be close to another satellite, so return only the satellite that contains data.
    elif len(hotspots_aqua) > 0 and len(hotspots_terra) == 0:
        return hotspots_aqua
    elif len(hotspots_aqua) == 0 and len(hotspots_terra) > 0:
        return hotspots_terra
    else:
        hotspots = []

        for aqua in hotspots_aqua:
            count = 0

            # Loop through both arrays to see if there's a location that is close by.
            # If an aqua satellite location is close to a terra satellite location, take the average for confidence and surface_temp,
            # then append the new hotspot data to the hotspots final array and remove the terra satellite data from the hotspots_terra array.
            # So there are no duplicate matches in the future (the task was only to match two satellite locations).
            while count < len(hotspots_terra):
                terra = hotspots_terra[count]
                if aqua['geo_hash'] == terra['geo_hash']:
                    avg_hotspot = aqua
                    avg_hotspot['confidence'] = (aqua['confidence'] + terra['confidence']) / 2
                    avg_hotspot['surface_temperature_celsius'] = (aqua['surface_temperature_celsius'] + terra['surface_temperature_celsius']) / 2
                    hotspots_terra.pop(count)
                    hotspots.append(avg_hotspot)
                    break
                else:
                    # If no close satellites, append aqua to the final array.
                    hotspots.append(aqua)
                count += 1

        # If there are terra satellites that haven't been removed, append them to the final array.
        if len(hotspots_terra) > 0:
            for terra in hotspots_terra:
                hotspots.append(terra)

        return hotspots

def climate_handler(climate, hotspots):
    # If there was no climate data, return an empty dictionary.
    if len(hotspots) > 0 and climate != {}:
        for hotspot in hotspots:
            # Check if climate and hotspot are close.
            if climate['geo_hash'] == hotspot['geo_hash']:
                # Check if natural or other.
                if climate['air_temperature_celsius'] > 20 and climate['ghi'] > 180:
                    hotspot['cause'] = 'natural'
                else:
                    hotspot['cause'] = 'other'

                if 'hotspots' in climate:
                    # Append hotspot.
                    climate['hotspots'].append(hotspot)
                else:
                    climate['hotspots'] = [hotspot]

    climate['station'] = 948700  # Station number required for DB data model.

    return climate

def stream_handler(batch_df):
    hotspots_aqua = []
    hotspots_terra = []
    climate = {}

    # Collect data from the streaming DataFrame batch and process it.
    data_batch = batch_df.collect()

    for data_row in data_batch:
        data = data_row.value
        # Calculate and set the geohash.
        data['geo_hash'] = geohash_handler(data['latitude'], data['longitude'])
        producer_id = data['producer_id']

        if producer_id == 'producer_climate':
            climate = data
        elif producer_id == 'producer_hotspot_aqua':
            hotspots_aqua.append(data)
        elif producer_id == 'producer_hotspot_terra':
            hotspots_terra.append(data)

    # Analyze hotspot data, find if any are close by and merge them.
    hotspots = hotspots_handler(hotspots_aqua, hotspots_terra)
    # Merge hotspots with climate (depending if close and label if natural or other).
    climate = climate_handler(climate, hotspots)

    return climate

def prepare_for_db(data):
    # Create a new document dictionary (final version for DB) and clean up variables.
    document = {}

    document['date'] = datetime.datetime.fromisoformat(data['created_date'])
    document['station'] = data['station']
    document["air_temperature_celsius"] = data['air_temperature_celsius']
    document['relative_humidity'] = data['relative_humidity']
    document['windspeed_knots'] = data['windspeed_knots']
    document['max_wind_speed'] = data['max_wind_speed']
    document['precipitation'] = data['precipitation']
    document['precipitation_type'] = data['precipitation_type']
    document['ghi'] = data['ghi']

    if 'hotspots' in data:
        document['hotspots'] = []
        for hotspot in data['hotspots']:
            hotspot_doc = {}
            hotspot_doc['time'] = datetime.datetime.fromisoformat(hotspot['created_time'])
            hotspot_doc['cause'] = hotspot['cause']
            hotspot_doc['confidence'] = hotspot['confidence']
            hotspot_doc['latitude'] = hotspot['latitude']
            hotspot_doc['longitude'] = hotspot['longitude']
            hotspot_doc['surface_temperature_celsius'] = hotspot['surface_temperature_celsius']
            document['hotspots'].append(hotspot_doc)

    return document

def send_data_to_db(batch_df, batch_id):
    # Collect data from the streaming DataFrame batch.
    data_batch = batch_df.collect()

    # Sometimes batches may have no data, so we ensure that it isn't saved to the database.
    if len(data_batch) > 0:
        # Send data to be transformed and analyzed.
        climate_data = stream_handler(data_batch[0])  # Assuming one document per batch.

        if len(climate_data) > 1:
            # Send to remove key values that aren't in the data model, such as 'geo_hash'.
            database_data = prepare_for_db(climate_data)

            client = MongoClient()
            db = client.fit3182_assignment_db
            collection = db.climate

            # Insert climate data into the database.
            collection.insert_one(database_data)
            pprint.pprint(database_data)

            client.close()

batch_interval = 10
topic = "Climate,Hotspot_AQUA,Hotspot_TERRA"
checkpoint_dir = "/tmp/spark_streaming_checkpoint"

spark = (SparkSession.builder
         .master('local[*]')
         .appName("KafkaStreamProcessor")
         .getOrCreate())
host_ip = "192.168.1.5"
df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option('subscribe', topic)
    .load()
)

# Define the schema for the data received from Kafka.
schema = StructType([
    StructField("created_date", StringType()),
    StructField("station", IntegerType()),
    StructField("producer_id", StringType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("air_temperature_celsius", IntegerType()),
    StructField("relative_humidity", FloatType()),
    StructField("windspeed_knots", FloatType()),
    StructField("max_wind_speed", FloatType()),
    StructField("precipitation", FloatType()),
    StructField("precipitation_type", StringType()),
    StructField("ghi", FloatType()),
    StructField("confidence", IntegerType()),
    StructField("surface_temperature_celsius", IntegerType()),
])

# Parse the JSON data and apply the defined schema to create a DataFrame with structured data.
parsed_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

# Create a processing query that triggers the processing of each batch of data in 10-second intervals.
query = parsed_df \
    .writeStream \
    .foreachBatch(send_data_to_db) \
    .trigger(processingTime=f"{batch_interval} seconds") \
    .option("checkpointLocation", checkpoint_dir) \
    .start()

query.awaitTermination()

# Clean up the checkpoint directory after the query finishes.
shutil.rmtree(checkpoint_dir)


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3442, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_201/206431194.py", line 212, in <module>
    query.awaitTermination()
  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/streaming.py", line 107, in awaitTermination
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1320, in __call__
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call la