In [25]:
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
from pyspark.sql.functions import col
import pandas as pd

In [2]:
  spark = SparkSession.builder \
    .appName("DataProcessing") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/30 13:42:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
geometry_schema = T.StructType(
      [
        T.StructField("coordinates", T.ArrayType(  
            T.ArrayType(                         
                T.ArrayType(T.DoubleType())        
            )
        ), True),
        T.StructField("type", T.StringType(), True),
      ])

In [4]:
properties_schema = T.StructType([
      # T.StructField("@id", T.StringType(), True),
      # T.StructField("@type", T.StringType(), True),
      T.StructField("affectedZones", T.ArrayType(T.StringType()), True),
      T.StructField("areaDesc", T.StringType(), True),
      T.StructField("category", T.StringType(), True),
      T.StructField("certainty", T.StringType(), True),
      T.StructField("description", T.StringType(), True),
      T.StructField("effective", T.StringType(), True),
      T.StructField("ends", T.StringType(), True),
      T.StructField("event", T.StringType(), True),
      T.StructField("expires", T.StringType(), True),
      # T.StructField("geocode", T.StructType([
      #     T.StructField("SAME", T.ArrayType(T.StringType()), True),
      #     T.StructField("UGC", T.ArrayType(T.StringType()), True),
      # ]), True),
      T.StructField("headline", T.StringType(), True),
      T.StructField("id", T.StringType(), True),
      T.StructField("instruction", T.StringType(), True),
      T.StructField("messageType", T.StringType(), True),
      T.StructField("onset", T.StringType(), True),
      # T.StructField("parameters", T.StructType([
      #     T.StructField("AWIPSidentifier", T.ArrayType(T.StringType()), True),
      #     T.StructField("BLOCKCHANNEL", T.ArrayType(T.StringType()), True),
      #     T.StructField("CMAMlongtext", T.ArrayType(T.StringType()), True),
      #     T.StructField("CMAMtext", T.ArrayType(T.StringType()), True),
      #     T.StructField("EAS-ORG", T.ArrayType(T.StringType()), True),
      #     T.StructField("NWSheadline", T.ArrayType(T.StringType()), True),
      #     T.StructField("VTEC", T.ArrayType(T.StringType()), True),
      #     T.StructField("WEAHandling", T.ArrayType(T.StringType()), True),
      #     T.StructField("WMOidentifier", T.ArrayType(T.StringType()), True),
      #     T.StructField("eventEndingTime", T.ArrayType(T.StringType()), True),
      #     T.StructField("eventMotionDescription", T.ArrayType(T.StringType()), True),
      #     T.StructField("expiredReferences", T.ArrayType(T.StringType()), True),
      #     T.StructField("flashFloodDetection", T.ArrayType(T.StringType()), True),
      #     T.StructField("hailThreat", T.ArrayType(T.StringType()), True),
      #     T.StructField("maxHailSize", T.ArrayType(T.StringType()), True),
      #     T.StructField("maxWindGust", T.ArrayType(T.StringType()), True),
      #     T.StructField("thunderstormDamageThreat", T.ArrayType(T.StringType()), True),
      #     T.StructField("timezone", T.ArrayType(T.StringType()), True),
      #     T.StructField("tornadoDetection", T.ArrayType(T.StringType()), True),
      #     T.StructField("windThreat", T.ArrayType(T.StringType()), True),
      # ]), True),
      # T.StructField("references", T.ArrayType(T.StructType([
      #     # T.StructField("@id", T.StringType(), True),
      #     T.StructField("identifier", T.StringType(), True),
      #     T.StructField("sender", T.StringType(), True),
      #     T.StructField("sent", T.StringType(), True),
      # ])), True),

      T.StructField("replacedAt", T.StringType(), True),
      T.StructField("replacedBy", T.StringType(), True),
      T.StructField("response", T.StringType(), True),
      T.StructField("sender", T.StringType(), True),
      T.StructField("senderName", T.StringType(), True),
      T.StructField("sent", T.StringType(), True),
      T.StructField("severity", T.StringType(), True),
      T.StructField("status", T.StringType(), True),
      T.StructField("urgency", T.StringType(), True),
    ])

In [5]:
  data_schema = T.StructType([
      T.StructField("geometry", geometry_schema, True),
      T.StructField("id", T.StringType(), True),
      T.StructField("properties", properties_schema, True),
      T.StructField("type", T.StringType(), True),
  ])

In [96]:
  df = spark.read \
    .option("multiline", "true") \
    .schema(data_schema) \
    .json(f"data/raw_json/2025-03-30.json")

In [16]:

def coords_to_wkt(geom_type, coords):
    if geom_type == "Polygon":
        # Flatten into "x y, x y, ..."
        if coords and isinstance(coords, list):
            rings = []
            for ring in coords:
                ring_str = ", ".join([f"{lon} {lat}" for lon, lat in ring])
                rings.append(f"({ring_str})")
            return f"POLYGON({', '.join(rings)})"
    return None

In [17]:
wkt_udf = F.udf(coords_to_wkt, T.StringType())

In [97]:
alerts = df.select(
    wkt_udf("geometry.type", "geometry.coordinates").alias("geometry_wkt"),
    F.explode(col("properties.affectedZones")).alias("affectedZones"),
    col("properties.areaDesc").alias("areaDesc"),
    col("properties.category").alias("category"),
    col("properties.certainty").alias("certainty"),
    col("properties.description").alias("description"),
    F.to_timestamp(col("properties.effective")).alias("effective"),
    F.to_timestamp(col("properties.ends")).alias("ends"),
    col("properties.event").alias("event"),
    F.to_timestamp(col("properties.expires")).alias("expires"),
    col("properties.headline").alias("headline"),
    col("properties.id").alias("alert_id"),
    col("properties.instruction").alias("instruction"),
    col("properties.messageType").alias("messageType"),
    col("properties.onset").alias("onset"),
    F.to_timestamp(col("properties.replacedAt")).alias("replacedAt"),
    col("properties.replacedBy").alias("replacedBy"),
    col("properties.response").alias("response"),
    col("properties.sender").alias("sender"),
    col("properties.senderName").alias("senderName"),
    F.to_timestamp(col("properties.sent")).alias("sent"),
    col("properties.severity").alias("severity"),
    col("properties.status").alias("status"),
    col("properties.urgency").alias("urgency")
)

In [107]:
df = alerts.select(
    F.md5(F.concat(F.coalesce(col('alert_id'),F.lit("")),F.coalesce(col('affectedZones'),F.lit("")))).alias("id"),
    "*"
)

+--------------------+
|                  id|
+--------------------+
|848d1742dcceda570...|
|2918220fc489bfb4b...|
|15ea353360c8b8c07...|
|ca11d4cf9179c842d...|
|3802610b3659a38e7...|
|5d599a898835d1137...|
|02c4337e91d6a9aee...|
|1538552547309e586...|
|fa5813b48bb092eb3...|
|7d51e4ab8f8fe8ae0...|
|08f8d89369c664832...|
|bac574808e85c3a96...|
|11cc82c468fd2d746...|
|5ac449bc036c97d16...|
|5b61850348423b72f...|
|a8fc8932a2b7411c7...|
|a3b1034a7c5e2eacf...|
|febc86e73f071ed76...|
|c8ddb2e17ffb12522...|
|54567ea6a7e52dcd6...|
+--------------------+
only showing top 20 rows



In [92]:
zone_properties_schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("state", T.StringType(), True)
    ])

zone_data_schema = T.StructType([
    T.StructField("id", T.StringType(), True),
    T.StructField("properties", zone_properties_schema, True)
    ])

In [93]:
zone = spark.read \
    .option("multiline", "true") \
    .schema(zone_data_schema) \
    .json("data/zones.json")

In [94]:
zone = zone \
    .withColumn('name', col('properties.name')) \
    .withColumn('state', col('properties.state')) \
    .drop('properties')

In [99]:
joined = alerts
    .join(zone, alerts["affectedZones"] == zone["id"], "left") \
    .

In [100]:
joined.printSchema()

root
 |-- geometry_wkt: string (nullable = true)
 |-- affectedZones: string (nullable = true)
 |-- areaDesc: string (nullable = true)
 |-- category: string (nullable = true)
 |-- certainty: string (nullable = true)
 |-- description: string (nullable = true)
 |-- effective: timestamp (nullable = true)
 |-- ends: timestamp (nullable = true)
 |-- event: string (nullable = true)
 |-- expires: timestamp (nullable = true)
 |-- headline: string (nullable = true)
 |-- alert_id: string (nullable = true)
 |-- instruction: string (nullable = true)
 |-- messageType: string (nullable = true)
 |-- onset: string (nullable = true)
 |-- replacedAt: timestamp (nullable = true)
 |-- replacedBy: string (nullable = true)
 |-- response: string (nullable = true)
 |-- sender: string (nullable = true)
 |-- senderName: string (nullable = true)
 |-- sent: timestamp (nullable = true)
 |-- severity: string (nullable = true)
 |-- status: string (nullable = true)
 |-- urgency: string (nullable = true)
 |-- id: strin