In [1]:
# 04_anomalies.ipynb
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Anomalies").getOrCreate()

SILVER_NOAA = "hdfs://namenode:8020/silver/noaa/"
GOLD_ANOM = "hdfs://namenode:8020/gold/anomalies/"

# load silver temperature data (expect columns: date, temp_c, latitude, longitude, station_id)
df = spark.read.parquet(SILVER_NOAA)

# Add month and year
df = df.withColumn("month", F.month("date")).withColumn("year", F.year("date"))

# Compute historical mean/std per month per station (use many years)
stats = df.groupBy("station_id", "month").agg(
    F.mean("temp_c").alias("mean_temp_month"),
    F.stddev("temp_c").alias("std_temp_month"),
    F.expr("percentile(temp_c, array(0.95))")[0].alias("pct95_temp")
)

# Join back to daily records
df_stats = df.join(stats, on=["station_id", "month"], how="left")

# Define anomaly conditions
df_flags = df_stats.withColumn(
    "is_hot_anomaly",
    F.when(F.col("temp_c") > F.col("mean_temp_month") + 2 * F.col("std_temp_month"), True).otherwise(False)
).withColumn(
    "is_cold_anomaly",
    F.when(F.col("temp_c") < F.col("mean_temp_month") - 2 * F.col("std_temp_month"), True).otherwise(False)
).withColumn(
    "temp_jump_24h",
    F.abs(F.col("temp_c") - F.lag("temp_c").over(Window.partitionBy("station_id").orderBy("date"))) > 10
).withColumn(
    "anomaly_type",
    F.when(F.col("is_hot_anomaly"), F.lit("heat_anomaly"))
     .when(F.col("is_cold_anomaly"), F.lit("cold_anomaly"))
     .when(F.col("temp_jump_24h"), F.lit("temp_jump"))
     .otherwise(F.lit(None))
)

# Wind anomalies: assuming silver has wind_ms or gust_ms
if "wind_ms" in df.columns:
    wind_stats = df.groupBy("station_id", "month").agg(F.mean("wind_ms").alias("mean_wind"), F.stddev("wind_ms").alias("std_wind"))
    df_flags = df_flags.join(wind_stats, on=["station_id", "month"], how="left")
    df_flags = df_flags.withColumn("is_wind_anomaly", F.col("wind_ms") > F.col("mean_wind") + 2 * F.col("std_wind"))

# Select anomalies only and write to GOLD
df_anoms = df_flags.filter(
    (F.col("is_hot_anomaly") == True) |
    (F.col("is_cold_anomaly") == True) |
    (F.col("temp_jump_24h") == True) |
    (F.col("is_wind_anomaly") == True)
)

df_anoms = df_anoms.withColumn("detected_time", F.current_timestamp())

df_anoms.write.mode("overwrite").partitionBy("year", "month").parquet(GOLD_ANOM)
print("Anomalies saved to GOLD")

IllegalArgumentException: java.net.UnknownHostException: namenode