In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
%pip install lxml

In [0]:
import pandas as pd
from modules.clients import SnotelClient, NWSClient
from modules.database import upsert_to_delta, get_table_watermark
from modules.transformer import generate_combined_forecast
from config.settings import *

In [0]:
# 1. Setup Parameters
run_mode = "full"  # options: ["incremental", "full"]

snotel = SnotelClient()
nws = NWSClient()

# 2. Determine Dates
if run_mode == "incremental":
    last_date = get_table_watermark(TBL_SNOW_OBS, "date")
    start_date = (last_date).strftime('%Y-%m-%d') if last_date else "2025-12-01"
    end_date = (pd.Timestamp.now() + pd.Timedelta(days=1)).strftime('%Y-%m-%d')
else:
    start_date = "2025-12-01"
    end_date = pd.Timestamp.now().strftime('%Y-%m-%d')

print(f"Running in {run_mode} mode with dates: {start_date} to {end_date}")

In [0]:
# 3. Process Observations
stations = spark.table(TBL_WEATHER_STATIONS).toPandas()
# stations = stations[:5]
all_obs = []
for _, row in stations.iterrows():
    data = snotel.fetch_historical_data(row['site_id'], row['ntwk'], start_date, end_date)
    if data is not None: 
        all_obs.append(data)

if all_obs:
    from modules.transformer import transform_historical_data
    
    # Combine raw dataframes
    obs_df = pd.concat(all_obs)
    
    # Transform raw columns (Station Id -> site_id, etc.) to match Delta schema
    # This addresses the [DELTA_MERGE_UNRESOLVED_EXPRESSION] error
    clean_obs_df = transform_historical_data(obs_df)
    
    # Use 'date_hr' and 'site_id' as join keys for a more granular unique constraint
    upsert_to_delta(clean_obs_df, TBL_SNOW_OBS, ["date_hr", "site_id"], mode=run_mode)

In [0]:
# 4. Process Forecasts
all_hourly = []
all_snow_grid = []

# Determine write mode for forecasts
# If run_mode is "incremental", we use "upsert" to update existing windows
forecast_write_mode = "upsert" if run_mode == "incremental" else "overwrite"

for _, row in stations.iterrows():
    # Fetch data
    hourly_data = nws.get_hourly_forecast(row['lat'], row['lon']).assign(site_id=row['site_id'])
    snow_data = nws.get_snow_grid_data(row['lat'], row['lon']).assign(site_id=row['site_id'])
    
    all_hourly.append(hourly_data)
    all_snow_grid.append(snow_data)

# Apply the dynamic mode to satisfy incremental update requirements
if all_hourly:
    upsert_to_delta(pd.concat(all_hourly), TBL_WEATHER_FCST, ["startTime", "site_id"], mode=forecast_write_mode)

if all_snow_grid:
    upsert_to_delta(pd.concat(all_snow_grid), TBL_SNOW_FCST, ["snow_start", "site_id"], mode=forecast_write_mode)

In [0]:
# 5. Aggregate (Efficient Spark Join)
generate_combined_forecast()

In [0]:
%sql
select * from gold.weather.combined_forecast where site_id = '505';
-- select * from bronze.raw.snowfall_observations where site_id = '505';

In [0]:
%sql
with depth as (
select
  site_id,
  snow_depth_in
from (
  select
    site_id,
    snow_depth_in,
    row_number() over (partition by site_id order by date_hr desc) as rn
  from bronze.raw.snowfall_observations
)
where rn = 1
)
select * from depth

In [0]:
%sql
with depth as (
select
  site_id,
  snow_depth_in
from (
  select
    site_id,
    snow_depth_in,
    row_number() over (partition by site_id order by date_hr desc) as rn
  from bronze.raw.snowfall_observations
)
where rn = 1
)
select
  f.site_name,
  s.lat,
  s.lon,
  depth.snow_depth_in,
  sum(f.total_snow_in) as snow_acc
from
  gold.weather.combined_forecast as f
    join bronze.raw.weather_stations as s
      on s.site_id = f.site_id
    left join depth
      on cast(depth.site_id as string) = f.site_id
where
  snow_end < '2026-01-11'
group by
  all
order by
  snow_acc desc

In [0]:
plot_df = _sqldf.toPandas()

In [0]:
plot_df.info()

In [0]:
import plotly.express as px

# Create the scatter mapbox visual
fig = px.scatter_mapbox(
    plot_df, 
    lat="lat", 
    lon="lon", 
    color="snow_acc",              # Markers colored by snow accumulation
    size="snow_acc",               # Marker size relative to snow amount (optional)
    # color_continuous_scale=px.colors.sequential.Ice, # Color scale representing snow
    hover_name="site_name",        # Header for the tooltip
    hover_data={                   # Customize tooltip content
        "lat": False,              # Hide lat/lon in tooltip if redundant
        "lon": False, 
        "snow_acc": ":.2f"         # Show snow_acc with 2 decimal places
    },
    zoom=6,                        # Starting zoom level for Colorado/mountain regions
    center={"lat": 39.0, "lon": -106.0}, # Centered near CO mountains
    height=600,
    title="Forecasted Snow Accumulation by Station"
)

# Update layout to use an open-source map style
fig.update_layout(
    mapbox_style="carto-positron", 
    margin={"r":0,"t":40,"l":0,"b":0}
)

# Render in Databricks
fig.show()