# Wherobots Astronomer Workshop Draft

In [None]:
# User input
CATALOG="org_catalog"
DATABASE="workshop"
US_POSTCODE = "73034"


# based on Dag run date
END_DATE = "2026-01-19"

# set on our end
lookback_months = 3
COUNTRY = "US"

# from within Airflow this will be passed in as CLI args to the run_python dict
# and retrieved via argparse: 

# import argparse
# from datetime import datetime
# from dateutil.relativedelta import relativedelta
# from pyspark.sql.functions import expr, col, to_timestamp

# parser = argparse.ArgumentParser()
# parser.add_argument('--catalog', type=str, default='org_catalog')
# parser.add_argument('--database', type=str, default='workshop')
# parser.add_argument('--postcode', type=str, default='75001')
# parser.add_argument('--country', type=str, default='US')
# parser.add_argument('--end-date', type=str, default=None)
# parser.add_argument('--lookback-months', type=int, default=3)
# args = parser.parse_args()

# CATALOG = args.catalog
# DATABASE = args.database
# US_POSTCODE = args.postcode
# COUNTRY = args.country
# LOOKBACK_MONTHS = args.lookback_months

In [None]:
# Imports
import argparse
from sedona.spark import *
from pyspark.sql.functions import expr, col, to_timestamp
from datetime import datetime
from dateutil.relativedelta import relativedelta

## Set up an Apache Sedona context

The context, `sedona`, is the machine that runs in the Wherobots Cloud compute environment. To connect to the SWDI data on AWS, 
we add anonymous S3 access credentials when we call `SedonaContext.builder().getOrCreate()`. 
You can read [our documentation](https://docs.wherobots.com/latest/develop/notebook-management/notebook-instance-management/) 
about how to further configure the Sedona context.

In [None]:
try:
    sedona
except NameError:
    config = SedonaContext.builder() \
    .config("fs.s3a.bucket.noaa-swdi-pds.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.bucket.noaa-swdi-pds.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") \
    .getOrCreate()
    sedona = SedonaContext.create(config)

## Setup DB

-> separate WherobotsSqlOperator s for each table in the Dag as a task group

In [None]:
sedona.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG}.{DATABASE}")

In [None]:
sedona.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.hail_raw (
        ZTIME TIMESTAMP,
        LON STRING,
        LAT STRING,
        WSR_ID STRING,
        CELL_ID STRING,
        RANGE STRING,
        AZIMUTH STRING,
        SEVPROB STRING,
        PROB STRING,
        MAXSIZE STRING,
        geometry GEOMETRY
    )
""")

sedona.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.postcode_areas (
        postcode STRING,
        country STRING,
        area GEOMETRY,
        created_at TIMESTAMP
    )
""")

sedona.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.state_boundary (
        id STRING,
        state_name STRING,
        state_code STRING,
        geometry GEOMETRY
    )
""")

sedona.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.hail_state (
        ZTIME TIMESTAMP,
        LON STRING,
        LAT STRING,
        WSR_ID STRING,
        CELL_ID STRING,
        RANGE STRING,
        AZIMUTH STRING,
        SEVPROB STRING,
        PROB STRING,
        MAXSIZE STRING,
        geometry GEOMETRY,
        state_code STRING
    )
""")

sedona.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.hail_neighborhood (
        ZTIME TIMESTAMP,
        LON STRING,
        LAT STRING,
        WSR_ID STRING,
        CELL_ID STRING,
        RANGE STRING,
        AZIMUTH STRING,
        SEVPROB STRING,
        PROB STRING,
        MAXSIZE STRING,
        geometry GEOMETRY,
        postcode STRING
    )
""")

sedona.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.risk_comparison (
        area_type STRING,
        area_id STRING,
        event_count BIGINT,
        events_per_sqmi DOUBLE,
        avg_hail_size DOUBLE,
        max_hail_size DOUBLE,
        avg_severe_prob DOUBLE,
        avg_hail_prob DOUBLE,
        damaging_events BIGINT,
        damaging_per_sqmi DOUBLE,
        high_risk_events BIGINT,
        high_risk_per_sqmi DOUBLE,
        area_sqmi DOUBLE,
        updated_at TIMESTAMP
    )
""")

### Cleanup

This will be a separate Dag that resets everything to 0

In [None]:
## Delete everything
# sedona.sql(f"SHOW TABLES IN {CATALOG}.{DATABASE}").show(truncate=False)

# tables = sedona.sql(f"SHOW TABLES IN {CATALOG}.{DATABASE}").collect()

# for table in tables:
#     table_name = table['tableName']
#     print(f"Dropping {table_name}...")
#     sedona.sql(f"DROP TABLE IF EXISTS org_catalog.workshop.{table_name}")

# sedona.sql(f"SHOW TABLES IN {CATALOG}.{DATABASE}").show()

In [None]:
## Empty everything but hail_raw
# tables = sedona.sql(f"SHOW TABLES IN {CATALOG}.{DATABASE}").collect()

# for table in tables:
#     table_name = table['tableName']
#     sedona.sql(f"DELETE FROM {CATALOG}.{DATABASE}.{table_name}")

# sedona.sql(f"""
#     SELECT 'hail_state', COUNT(*) FROM {CATALOG}.{DATABASE}.hail_state
#     UNION ALL SELECT 'hail_neighborhood', COUNT(*) FROM {CATALOG}.{DATABASE}.hail_neighborhood
#     UNION ALL SELECT 'postcode_areas', COUNT(*) FROM {CATALOG}.{DATABASE}.postcode_areas
#     UNION ALL SELECT 'state_boundary', COUNT(*) FROM {CATALOG}.{DATABASE}.state_boundary
#     UNION ALL SELECT 'risk_comparison', COUNT(*) FROM {CATALOG}.{DATABASE}.risk_comparison
# """).show()

## Load Hail data past 3 months 

Dates templated by Airflow

Available years: 1995 - 2025

Could run this Dag weekly?

Question: will it be possible to pass information into the script? 

In [None]:
end_date = datetime.strptime(END_DATE, '%Y-%m-%d').replace(day=1) - relativedelta(months=1) # first day of the last completed month

start_date = end_date - relativedelta(months=lookback_months)

years_needed = set()
current = start_date
while current <= end_date:
    years_needed.add(current.year)
    current += relativedelta(months=1)

years_needed = sorted(years_needed)
column_names = ['ZTIME', 'LON', 'LAT', 'WSR_ID', 'CELL_ID', 'RANGE', 'AZIMUTH', 'SEVPROB', 'PROB', 'MAXSIZE']

dfs = []
for year in years_needed:
    s3_uri = f"s3://noaa-swdi-pds/hail-{year}.csv"
    df = sedona.read.option("comment", "#")\
        .csv(s3_uri)\
        .toDF(*column_names)\
        .withColumn("ZTIME", to_timestamp(col("ZTIME"), "yyyyMMddHHmmss"))\
        .withColumn("geometry", expr("ST_Point(LON, LAT)"))
    dfs.append(df)

hail_df = dfs[0]
for df in dfs[1:]:
    hail_df = hail_df.union(df)

hail_df = hail_df.filter(
    (col("ZTIME") >= start_date) & (col("ZTIME") <= end_date)
)

hail_df.createOrReplaceTempView("hail_staging")

# upsert assuming sevprob, prob and maxsize can change for entries in the past 3 months
sedona.sql(f"""
    MERGE INTO {CATALOG}.{DATABASE}.hail_raw AS target
    USING (
        SELECT * FROM (
            SELECT *,
                ROW_NUMBER() OVER (
                    PARTITION BY ZTIME, LON, LAT, WSR_ID, CELL_ID 
                    ORDER BY ZTIME
                ) AS rn
            FROM hail_staging
        ) WHERE rn = 1
    ) AS source
    ON target.ZTIME = source.ZTIME 
       AND target.LON = source.LON 
       AND target.LAT = source.LAT
       AND target.WSR_ID = source.WSR_ID
       AND target.CELL_ID = source.CELL_ID
    WHEN MATCHED THEN UPDATE SET
        SEVPROB = source.SEVPROB,
        PROB = source.PROB,
        MAXSIZE = source.MAXSIZE
    WHEN NOT MATCHED THEN INSERT (
        ZTIME, LON, LAT, WSR_ID, CELL_ID, RANGE, AZIMUTH, 
        SEVPROB, PROB, MAXSIZE, geometry
    ) VALUES (
        source.ZTIME, source.LON, source.LAT, source.WSR_ID, source.CELL_ID, 
        source.RANGE, source.AZIMUTH, source.SEVPROB, source.PROB, 
        source.MAXSIZE, source.geometry
    )
""")

### Debug checking

This will be a side task.

In [None]:
sedona.sql(f"""
    SELECT 
        COUNT(*) AS total_records,
        MIN(ZTIME) AS earliest_date,
        MAX(ZTIME) AS latest_date,
        COUNT(DISTINCT DATE(ZTIME)) AS unique_days,
        COUNT(DISTINCT WSR_ID) AS unique_stations,
        COUNT(CASE WHEN CAST(SEVPROB AS INT) = -999 THEN 1 END) AS missing_sevprob,
        COUNT(CASE WHEN CAST(PROB AS INT) = -999 THEN 1 END) AS missing_prob,
        COUNT(CASE WHEN CAST(MAXSIZE AS DOUBLE) = -999 THEN 1 END) AS missing_maxsize,
        ROUND(AVG(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)), 3) AS avg_hail_size,
        MAX(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)) AS max_hail_size
    FROM {CATALOG}.{DATABASE}.hail_raw
""").show(truncate=False)

sedona.sql(f"""
    SELECT DATE(ZTIME) AS date, COUNT(*) AS events
    FROM {CATALOG}.{DATABASE}.hail_raw
    GROUP BY DATE(ZTIME)
    ORDER BY date DESC
    LIMIT 10
""").show()

sedona.sql(f"SELECT * FROM {CATALOG}.{DATABASE}.hail_raw LIMIT 5").show()

## Get Overture Maps data


### Create state boundary

State based on the postcode

In [None]:
sedona.sql(f"""
    MERGE INTO {CATALOG}.{DATABASE}.state_boundary AS target
    USING (
        SELECT 
            da.id,
            d.names.primary AS state_name,
            d.region AS state_code,
            da.geometry
        FROM wherobots_open_data.overture_maps_foundation.divisions_division d
        JOIN wherobots_open_data.overture_maps_foundation.divisions_division_area da
            ON d.id = da.division_id
        WHERE d.subtype = 'region' 
          AND d.country = '{COUNTRY}'
          AND d.region = CONCAT('{COUNTRY}-', (
              SELECT address_levels[0].value
              FROM wherobots_open_data.overture_maps_foundation.addresses_address
              WHERE postcode = '{US_POSTCODE}' AND country = '{COUNTRY}'
              LIMIT 1
          ))
        LIMIT 1
    ) AS source
    ON target.id = source.id
    WHEN MATCHED THEN UPDATE SET 
        state_name = source.state_name,
        state_code = source.state_code,
        geometry = source.geometry
    WHEN NOT MATCHED THEN INSERT *
""")

sedona.sql(f"SELECT state_name, state_code FROM {CATALOG}.{DATABASE}.state_boundary").show()

### Creating the county area

Based on the provided postcode. I tried neighborhood but the areas were too small

In [None]:
# First check if the postcode exists  

sedona.sql(f"""
    SELECT COUNT(*) as cnt
    FROM wherobots_open_data.overture_maps_foundation.addresses_address
    WHERE postcode = '{US_POSTCODE}' AND country = 'US'
""").show()

In [None]:
center = sedona.sql(f"""
    SELECT ST_AsText(ST_Centroid(ST_Union_Aggr(geometry))) AS center_wkt
    FROM wherobots_open_data.overture_maps_foundation.addresses_address
    WHERE postcode = '{US_POSTCODE}' AND country = 'US'
""").collect()[0]['center_wkt']

sedona.sql(f"""
    MERGE INTO {CATALOG}.{DATABASE}.postcode_areas AS target
    USING (
        SELECT 
            '{US_POSTCODE}' AS postcode,
            'US' AS country,
            geometry AS area,
            CURRENT_TIMESTAMP() AS created_at
        FROM wherobots_open_data.overture_maps_foundation.divisions_division_area
        WHERE country = 'US'
          AND subtype = 'county'
          AND ST_Contains(geometry, ST_GeomFromText('{center}'))
        LIMIT 1
    ) AS source
    ON target.postcode = source.postcode AND target.country = source.country
    WHEN MATCHED THEN UPDATE SET area = source.area, created_at = source.created_at
    WHEN NOT MATCHED THEN INSERT *
""")




In [None]:
sedona.sql(f"""
    SELECT 
        '{US_POSTCODE}' as postcode,
        da.names.primary as county_name,
        ST_Area(ST_Transform(da.geometry, 'EPSG:4326', 'EPSG:3857')) / 2589988 as area_sqmi
    FROM wherobots_open_data.overture_maps_foundation.divisions_division_area da
    WHERE da.country = 'US'
      AND da.subtype = 'county'
      AND ST_Contains(da.geometry, ST_GeomFromText('{center}'))
""").show()

## Subset the hail data



### Subset state first

In [None]:
sedona.sql(f"""
    MERGE INTO {CATALOG}.{DATABASE}.hail_state AS target
    USING (
        SELECT h.*, s.state_code
        FROM {CATALOG}.{DATABASE}.hail_raw h, {CATALOG}.{DATABASE}.state_boundary s
        WHERE ST_Contains(s.geometry, h.geometry)
    ) AS source
    ON target.ZTIME = source.ZTIME 
       AND target.LON = source.LON 
       AND target.LAT = source.LAT
       AND target.WSR_ID = source.WSR_ID
       AND target.CELL_ID = source.CELL_ID
    WHEN MATCHED THEN UPDATE SET
        SEVPROB = source.SEVPROB,
        PROB = source.PROB,
        MAXSIZE = source.MAXSIZE
    WHEN NOT MATCHED THEN INSERT *
""")

In [None]:
sedona.sql(f"SELECT DISTINCT(state_code), COUNT(*) as state_hail_count FROM {CATALOG}.{DATABASE}.hail_state GROUP BY state_code").show()

### Subset county

I noticed some postcodes have no hail entries -> We can fail the Airflow task in this case and ask users to pick a different code?

In [None]:
sedona.sql(f"""
    MERGE INTO {CATALOG}.{DATABASE}.hail_neighborhood AS target
    USING (
        SELECT h.ZTIME, h.LON, h.LAT, h.WSR_ID, h.CELL_ID, h.RANGE, 
               h.AZIMUTH, h.SEVPROB, h.PROB, h.MAXSIZE, h.geometry,
               p.postcode
        FROM {CATALOG}.{DATABASE}.hail_state h, {CATALOG}.{DATABASE}.postcode_areas p
        WHERE ST_Contains(p.area, h.geometry)
          AND p.postcode = '{US_POSTCODE}'
    ) AS source
    ON target.ZTIME = source.ZTIME 
       AND target.LON = source.LON 
       AND target.LAT = source.LAT
       AND target.postcode = source.postcode
    WHEN MATCHED THEN UPDATE SET
        SEVPROB = source.SEVPROB,
        PROB = source.PROB,
        MAXSIZE = source.MAXSIZE
    WHEN NOT MATCHED THEN INSERT *
""")

sedona.sql(f"SELECT count(*) FROM {CATALOG}.{DATABASE}.hail_neighborhood WHERE postcode = '{US_POSTCODE}'").show()

## Calculate comparison state vs neighborhood

damaging_events = max_size > 1

high_risk_events = sevprob > 50

filters -999 as 

In [None]:
sedona.sql(f"""
    MERGE INTO {CATALOG}.{DATABASE}.risk_comparison AS target
    USING (
        WITH areas AS (
            SELECT 
                p.postcode,
                s.state_code,
                ST_Area(ST_Transform(p.area, 'EPSG:4326', 'EPSG:3857')) / 2589988 AS neighborhood_area_sqmi,
                ST_Area(ST_Transform(s.geometry, 'EPSG:4326', 'EPSG:3857')) / 2589988 AS state_area_sqmi
            FROM {CATALOG}.{DATABASE}.postcode_areas p, {CATALOG}.{DATABASE}.state_boundary s
            WHERE p.postcode = '{US_POSTCODE}'
        ),
        
        neighborhood_stats AS (
            SELECT 
                'neighborhood' AS area_type,
                '{US_POSTCODE}' AS area_id,
                COUNT(*) AS event_count,
                AVG(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)) AS avg_hail_size,
                MAX(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)) AS max_hail_size,
                AVG(NULLIF(CAST(SEVPROB AS DOUBLE), -999)) AS avg_severe_prob,
                AVG(NULLIF(CAST(PROB AS DOUBLE), -999)) AS avg_hail_prob,
                COUNT(CASE WHEN NULLIF(CAST(MAXSIZE AS DOUBLE), -999) >= 1.0 THEN 1 END) AS damaging_events,
                COUNT(CASE WHEN NULLIF(CAST(SEVPROB AS DOUBLE), -999) >= 50 THEN 1 END) AS high_risk_events
            FROM {CATALOG}.{DATABASE}.hail_neighborhood
            WHERE postcode = '{US_POSTCODE}'
              AND ZTIME >= DATE_SUB(CURRENT_DATE(), 180)
        ),
        
        state_stats AS (
            SELECT 
                'state' AS area_type,
                a.state_code AS area_id,
                COUNT(*) AS event_count,
                AVG(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)) AS avg_hail_size,
                MAX(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)) AS max_hail_size,
                AVG(NULLIF(CAST(SEVPROB AS DOUBLE), -999)) AS avg_severe_prob,
                AVG(NULLIF(CAST(PROB AS DOUBLE), -999)) AS avg_hail_prob,
                COUNT(CASE WHEN NULLIF(CAST(MAXSIZE AS DOUBLE), -999) >= 1.0 THEN 1 END) AS damaging_events,
                COUNT(CASE WHEN NULLIF(CAST(SEVPROB AS DOUBLE), -999) >= 50 THEN 1 END) AS high_risk_events
            FROM {CATALOG}.{DATABASE}.hail_state h, areas a
            WHERE ZTIME >= DATE_SUB(CURRENT_DATE(), 180)
            GROUP BY a.state_code
        )
        
        SELECT 
            n.area_type,
            n.area_id,
            n.event_count,
            ROUND(n.event_count / a.neighborhood_area_sqmi, 4) AS events_per_sqmi,
            ROUND(n.avg_hail_size, 4) AS avg_hail_size,
            n.max_hail_size,
            ROUND(n.avg_severe_prob, 4) AS avg_severe_prob,
            ROUND(n.avg_hail_prob, 4) AS avg_hail_prob,
            n.damaging_events,
            ROUND(n.damaging_events / a.neighborhood_area_sqmi, 4) AS damaging_per_sqmi,
            n.high_risk_events,
            ROUND(n.high_risk_events / a.neighborhood_area_sqmi, 4) AS high_risk_per_sqmi,
            ROUND(a.neighborhood_area_sqmi, 4) AS area_sqmi,
            CURRENT_TIMESTAMP() AS updated_at
        FROM neighborhood_stats n, areas a
        
        UNION ALL
        
        SELECT 
            s.area_type,
            s.area_id,
            s.event_count,
            ROUND(s.event_count / a.state_area_sqmi, 4) AS events_per_sqmi,
            ROUND(s.avg_hail_size, 4) AS avg_hail_size,
            s.max_hail_size,
            ROUND(s.avg_severe_prob, 4) AS avg_severe_prob,
            ROUND(s.avg_hail_prob, 4) AS avg_hail_prob,
            s.damaging_events,
            ROUND(s.damaging_events / a.state_area_sqmi, 4) AS damaging_per_sqmi,
            s.high_risk_events,
            ROUND(s.high_risk_events / a.state_area_sqmi, 4) AS high_risk_per_sqmi,
            ROUND(a.state_area_sqmi, 4) AS area_sqmi,
            CURRENT_TIMESTAMP() AS updated_at
        FROM state_stats s, areas a
    ) AS source
    ON target.area_type = source.area_type AND target.area_id = source.area_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

sedona.sql(f"SELECT * FROM {CATALOG}.{DATABASE}.risk_comparison").show(truncate=False)

## Calculate premium adjustment

The idea is to have this run in Airflow s

In [None]:
risk_df = sedona.sql(f"""
    SELECT *
    FROM {CATALOG}.{DATABASE}.risk_comparison
    WHERE area_id IN ('{US_POSTCODE}', (SELECT state_code FROM {CATALOG}.{DATABASE}.state_boundary LIMIT 1))
""").toPandas()

risk_df 

In [None]:
neighborhood = risk_df[risk_df['area_type'] == 'neighborhood'].iloc[0]
state = risk_df[risk_df['area_type'] == 'state'].iloc[0]
frequency_ratio = neighborhood['events_per_sqmi'] / state['events_per_sqmi']
size_ratio = neighborhood['avg_hail_size'] / state['avg_hail_size']
severity_ratio = neighborhood['avg_severe_prob'] / state['avg_severe_prob']
damaging_ratio = neighborhood['damaging_per_sqmi'] / state['damaging_per_sqmi']

weights = {
    'frequency': 0.25,
    'size': 0.25,
    'severity': 0.25,
    'damaging': 0.25
}

risk_score = (
    weights['frequency'] * frequency_ratio +
    weights['size'] * size_ratio +
    weights['severity'] * severity_ratio +
    weights['damaging'] * damaging_ratio
)

# Premium adjustment (1.0 = state average, >1 = increase, <1 = decrease)
# Scale: 10% adjustment per 0.5 deviation from 1.0
premium_modifier_pct = (risk_score - 1.0) * 20

print(f"=== Insurance Premium Analysis for {US_POSTCODE} ===")
print(f"\nRisk Ratios (vs State Average):")
print(f"  Frequency:  {frequency_ratio:.2f}x")
print(f"  Hail Size:  {size_ratio:.2f}x")
print(f"  Severity:   {severity_ratio:.2f}x")
print(f"  Damaging:   {damaging_ratio:.2f}x")
print(f"\nCombined Risk Score: {risk_score:.2f}")
print(f"  (1.0 = state average)")
print(f"\n{'='*40}")
print(f"PREMIUM ADJUSTMENT: {premium_modifier_pct:+.1f}%")
print(f"{'='*40}")

if premium_modifier_pct > 0:
    print(f"\nHigher risk than state average - premium increase recommended")
else:
    print(f"\nLower risk than state average - discount eligible")

## Viz

I was thinking the date with the most hail in the neighborhood visualized for the whole state.

In [None]:
from sedona.spark.maps.SedonaKepler import SedonaKepler
from pyspark.sql.functions import col

worst_day = sedona.sql(f"""
    SELECT 
        DATE(ZTIME) as date,
        COUNT(*) as event_count,
        MAX(NULLIF(CAST(MAXSIZE AS DOUBLE), -999)) as max_size
    FROM {CATALOG}.{DATABASE}.hail_neighborhood
    WHERE postcode = '{US_POSTCODE}'
    GROUP BY DATE(ZTIME)
    ORDER BY max_size DESC, event_count DESC
    LIMIT 1
""").collect()[0]

worst_date = worst_day['date']

state_hail_worst_day_df = sedona.sql(f"""
    SELECT 
        ZTIME,
        CAST(LON AS DOUBLE) as LON,
        CAST(LAT AS DOUBLE) as LAT,
        CAST(MAXSIZE AS DOUBLE) as MAXSIZE,
        CAST(SEVPROB AS DOUBLE) as SEVPROB,
        CAST(PROB AS DOUBLE) as PROB,
        geometry,
        DATE(ZTIME) as date
    FROM {CATALOG}.{DATABASE}.hail_state
    WHERE DATE(ZTIME) = '{worst_date}'
""")

state_boundary = sedona.sql(f"SELECT state_code FROM {CATALOG}.{DATABASE}.state_boundary").collect()[0]['state_code']

counties_df = sedona.table("wherobots_open_data.overture_maps_foundation.divisions_division_area")\
    .where(col("subtype") == "county")\
    .where(col("region") == state_boundary)\
    .select("geometry", col("names.primary").alias("county_name"))

county_name = sedona.sql(f"""
    SELECT da.names.primary as county_name
    FROM wherobots_open_data.overture_maps_foundation.divisions_division_area da,
         {CATALOG}.{DATABASE}.postcode_areas p
    WHERE da.subtype = 'county' 
      AND da.country = 'US'
      AND ST_Equals(da.geometry, p.area)
      AND p.postcode = '{US_POSTCODE}'
""").collect()

county_label = county_name[0]['county_name'] if county_name else f"County ({US_POSTCODE})"

selected_county_df = sedona.sql(f"""
    SELECT area as geometry, postcode
    FROM {CATALOG}.{DATABASE}.postcode_areas
    WHERE postcode = '{US_POSTCODE}'
""")

map_config = {'version': 'v1', 'config': {'visState': {'filters': [], 'layers': [{'id': 'hail_layer', 'type': 'geojson', 'config': {'dataId': 'hail', 'label': f'Hail - {worst_date}', 'color': [255, 153, 31], 'columns': {'geojson': 'geometry'}, 'isVisible': True, 'visConfig': {'opacity': 0.8, 'radius': 10, 'filled': True, 'stroked': True}}, 'visualChannels': {'colorField': {'name': 'MAXSIZE', 'type': 'real'}, 'colorScale': 'quantile'}}, {'id': 'counties_layer', 'type': 'geojson', 'config': {'dataId': 'counties', 'label': 'Counties', 'color': [34, 63, 154], 'columns': {'geojson': 'geometry'}, 'isVisible': True, 'visConfig': {'opacity': 0.1, 'strokeOpacity': 0.3, 'thickness': 0.5, 'filled': False, 'stroked': True}}}, {'id': 'selected_county_layer', 'type': 'geojson', 'config': {'dataId': 'selected_county', 'label': county_label, 'color': [0, 255, 100], 'columns': {'geojson': 'geometry'}, 'isVisible': True, 'visConfig': {'opacity': 0.2, 'strokeOpacity': 1.0, 'thickness': 3, 'strokeColor': [0, 255, 100], 'filled': True, 'stroked': True}}}]}, 'mapStyle': {'styleType': 'dark-matter'}}}

worst_day_map = SedonaKepler.create_map(state_hail_worst_day_df, name="hail", config=map_config)
SedonaKepler.add_df(worst_day_map, counties_df, name="counties")
SedonaKepler.add_df(worst_day_map, selected_county_df, name="selected_county")

worst_day_map