In [2]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import * # Import all types

# Point the executor to the same Python interpreter the driver is using
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql.functions import input_file_name

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

In [3]:
spark = SparkSession.builder \
    .appName("StreamingWeather") \
    .master("local[*]") \
    .getOrCreate()

In [None]:

from pyspark.sql.functions import col
from pyspark.sql.types import *

# --- 1. Setup ---
spark = SparkSession.builder.appName("AdvancedWeatherProcessing").getOrCreate()

# Schema reordered to match the header:
# ID|DATE|TMAX|TMIN|EVAP|PRCP|Latitude|Longitude|Elevation|TMAX_actual|TMIN_actual|TRANGE

weather_schema = StructType([
    StructField("ID", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("TMAX", FloatType(), True),
    StructField("TMIN", FloatType(), True),
    StructField("EVAP", FloatType(), True),
    StructField("PRCP", FloatType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True),
    StructField("Elevation", FloatType(), True),
    StructField("TMAX_actual", FloatType(), True),
    StructField("TMIN_actual", FloatType(), True),
    StructField("TRANGE", FloatType(), True)
])


df_stream = spark.readStream \
    .schema(weather_schema) \
    .option("header", True) \
    .option("pathGlobFilter", "*.csv") \
    .csv("hdfs://localhost:9000/streaming_weather/*")

# --- 2. Clean, Transform, and Validate the Data ---
# Step 2.1: Drop rows with null temperature values
# clean_df = df_stream.na.drop(subset=["TMAX", "TMIN"])

# Step 2.2: Transform temperature to actual degrees Celsius and add a 'range' column, Maximum temperature measured in tenths of degrees Celsius.
# transformed_df = clean_df.withColumn("TMAX_actual", col("TMAX") / 10.0) \
#                          .withColumn("TMIN_actual", col("TMIN") / 10.0) \
#                          .withColumn("TRANGE", col("TMAX_actual") - col("TMIN_actual"))

# # Step 2.3: Add a data quality filter to ensure TMAX is greater than or equal to TMIN
# validated_df = transformed_df.filter(col("TMAX_actual") >= col("TMIN_actual"))


# --- 3. Define Anomaly and Alerting Logic ---
alerts_df = df_stream.filter(
    (col("TMAX_actual") > 38.0) |      # Extreme Heat
    (col("TMIN_actual") < -20.0) |     # Extreme Cold
    (col("TRANGE") > 25.0)             # Extreme daily temperature swing
)

# --- 4. Create a Function to Process and Print Alerts ---
def process_advanced_alerts(batch_df, epoch_id):
    
    if batch_df.count() > 0:
        
        print(f"--- Batch {epoch_id}: 🌡️ WEATHER ANOMALIES DETECTED! 🌡️ ---")
        
        # Collect the alerts and print the details
        alerts_to_send = batch_df.select("ID", "DATE", "TMAX_actual", "TMIN_actual", "TRANGE").collect()
        for alert in alerts_to_send:
            print(f"  -> ALERT for Station {alert['ID']} on {alert['DATE']}: " \
                  f"TMAX={alert['TMAX_actual']:.1f}°C, TMIN={alert['TMIN_actual']:.1f}°C, " \
                  f"Range={alert['TRANGE']:.1f}°C")
    else:
        print(f"--- Batch {epoch_id}: No anomalies. Data is clean and within normal parameters. ---")

# --- 5. Start the Streaming Query ---
query = alerts_df.writeStream \
    .foreachBatch(process_advanced_alerts) \
    .outputMode("append") \
    .start()

query.awaitTermination()

--- Batch 0: 🌡️ WEATHER ANOMALIES DETECTED! 🌡️ ---
  -> ALERT for Station USC00326025 on 2/9/2003: TMAX=-13.9°C, TMIN=-24.4°C, Range=10.5°C
--- Batch 1: No anomalies. Data is clean and within normal parameters. ---
--- Batch 2: 🌡️ WEATHER ANOMALIES DETECTED! 🌡️ ---
  -> ALERT for Station USC00509315 on 1/13/2008: TMAX=-24.4°C, TMIN=-28.9°C, Range=4.5°C
--- Batch 3: No anomalies. Data is clean and within normal parameters. ---
--- Batch 4: No anomalies. Data is clean and within normal parameters. ---
--- Batch 5: 🌡️ WEATHER ANOMALIES DETECTED! 🌡️ ---
  -> ALERT for Station USR0000LVER on 6/15/2000: TMAX=19.4°C, TMIN=-8.3°C, Range=27.7°C
