In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 1: Network Access
-- Safe to re-run daily — CREATE OR REPLACE is idempotent.
-- ════════════════════════════════════════════════════════════════

CREATE OR REPLACE NETWORK RULE noaa_cpc_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('ftp.cpc.ncep.noaa.gov:443', 'ftp.cpc.ncep.noaa.gov:80');

CREATE OR REPLACE NETWORK RULE rma_ftp_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pubfs-rma.fpac.usda.gov:443', 'pubfs-rma.fpac.usda.gov:80');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION noaa_cpc_access
    ALLOWED_NETWORK_RULES = (noaa_cpc_rule, rma_ftp_rule)
    ENABLED = TRUE;

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 2: Create Tables
-- Safe to re-run daily — IF NOT EXISTS.
-- ════════════════════════════════════════════════════════════════

CREATE TABLE IF NOT EXISTS PRF_RAINFALL_REALTIME (
    observation_date    DATE            NOT NULL,
    latitude            FLOAT           NOT NULL,
    longitude           FLOAT           NOT NULL,
    precip_mm           FLOAT,
    precip_in           FLOAT,
    gauge_count         INT,
    file_type           VARCHAR(10),
    ingested_at         TIMESTAMP_NTZ,
    CONSTRAINT pk_prf_rain PRIMARY KEY (observation_date, latitude, longitude)
);

CREATE TABLE IF NOT EXISTS PRF_GRID_REFERENCE (
    grid_id             INT             NOT NULL,
    center_lat          FLOAT           NOT NULL,
    center_lon          FLOAT           NOT NULL,
    state_fips          VARCHAR(2),
    county_fips         VARCHAR(3),
    area_acres          FLOAT,
    ingested_at         TIMESTAMP_NTZ,
    CONSTRAINT pk_prf_grid PRIMARY KEY (grid_id)
);

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 3: Email Notification Integration
-- Safe to re-run — CREATE OR REPLACE.
-- ════════════════════════════════════════════════════════════════

CREATE OR REPLACE NOTIFICATION INTEGRATION prf_email_alerts
    TYPE = EMAIL
    ENABLED = TRUE
    ALLOWED_RECIPIENTS = ('akeenan@texasfcs.com');

GRANT USAGE ON INTEGRATION prf_email_alerts TO ROLE ACCOUNTADMIN;

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 4: Grid Reference — only loads if table is empty.
-- First run: downloads shapefile, loads ~25K grids.
-- Subsequent runs: skips in <1 second.
-- ════════════════════════════════════════════════════════════════

CREATE OR REPLACE PROCEDURE SP_LOAD_GRID_REFERENCE_IF_EMPTY()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python', 'geopandas', 'requests', 'shapely')
EXTERNAL_ACCESS_INTEGRATIONS = (noaa_cpc_access)
HANDLER = 'run'
AS
$$
import requests
import zipfile
import io
import os
import tempfile
import geopandas as gpd
from datetime import datetime

def run(session):
    count = session.sql("SELECT COUNT(*) AS cnt FROM PRF_GRID_REFERENCE").collect()
    if count[0]['CNT'] > 0:
        return f"SKIPPED: PRF_GRID_REFERENCE already has {count[0]['CNT']} grids."

    url = "https://pubfs-rma.fpac.usda.gov/pub/Miscellaneous_Files/VI_RI_Data/rainfall_index_grids.zip"
    resp = requests.get(url, timeout=120)

    tmpdir = tempfile.mkdtemp()
    z = zipfile.ZipFile(io.BytesIO(resp.content))
    z.extractall(tmpdir)

    shp_path = None
    for root, dirs, files in os.walk(tmpdir):
        for f in files:
            if f.endswith('.shp'):
                shp_path = os.path.join(root, f)
                break

    gdf = gpd.read_file(shp_path)
    if gdf.crs and gdf.crs.to_epsg() != 4326:
        gdf = gdf.to_crs(epsg=4326)

    gdf['center_lat'] = gdf.geometry.centroid.y.round(3)
    gdf['center_lon'] = gdf.geometry.centroid.x.round(3)

    grid_id_col = None
    for candidate in ['GRIDID', 'Grid_ID', 'GRID_ID', 'gridid', 'ID']:
        if candidate in gdf.columns:
            grid_id_col = candidate
            break

    if grid_id_col is None:
        return f"ERROR: Can't find grid ID column. Columns: {gdf.columns.tolist()}"

    now_str = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    rows = []
    for _, row in gdf.iterrows():
        rows.append((
            int(row[grid_id_col]),
            float(row['center_lat']),
            float(row['center_lon']),
            None, None, None, now_str
        ))

    df = session.create_dataframe(
        rows,
        schema=['GRID_ID', 'CENTER_LAT', 'CENTER_LON',
                'STATE_FIPS', 'COUNTY_FIPS', 'AREA_ACRES', 'INGESTED_AT']
    )
    df.write.mode("append").save_as_table("PRF_GRID_REFERENCE")

    new_count = session.sql("SELECT COUNT(*) AS cnt FROM PRF_GRID_REFERENCE").collect()
    return f"LOADED: {new_count[0]['CNT']} grids into PRF_GRID_REFERENCE"
$$;

CALL SP_LOAD_GRID_REFERENCE_IF_EMPTY();

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 5: Automated RT Rainfall Ingestion (UNIT FIX 2026-02-19)
-- ════════════════════════════════════════════════════════════════

CREATE OR REPLACE PROCEDURE SP_INGEST_RT_RAINFALL()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python', 'requests', 'numpy')
EXTERNAL_ACCESS_INTEGRATIONS = (noaa_cpc_access)
HANDLER = 'run'
AS
$$
import requests
import numpy as np
from datetime import datetime, timedelta

def run(session):
    NROWS       = 120
    NCOLS       = 300
    LON_START   = -129.875
    LAT_START   =   20.125
    STEP        =    0.25
    MISSING     = -999.0
    MAX_BACKFILL = 30
    BASE_URL    = "https://ftp.cpc.ncep.noaa.gov/precip/CPC_UNI_PRCP/GAUGE_CONUS"

    result = session.sql(
        "SELECT COALESCE(MAX(observation_date), '1948-01-01'::DATE) AS max_date "
        "FROM PRF_RAINFALL_REALTIME WHERE file_type = 'RT'"
    ).collect()

    last_loaded = datetime.strptime(str(result[0]['MAX_DATE']), '%Y-%m-%d').date()
    yesterday   = (datetime.utcnow() - timedelta(days=1)).date()

    start_date = last_loaded + timedelta(days=1)
    earliest   = yesterday - timedelta(days=MAX_BACKFILL)
    if start_date < earliest:
        start_date = earliest

    candidates = []
    d = start_date
    while d <= yesterday:
        candidates.append(d)
        d += timedelta(days=1)

    if not candidates:
        return "UP_TO_DATE|0|0|" + str(last_loaded)

    loaded_dates  = []
    skipped_dates = []

    for target_date in candidates:
        date_str = target_date.strftime('%Y%m%d')
        year_str = target_date.strftime('%Y')
        url = BASE_URL + "/RT/" + year_str + "/PRCP_CU_GAUGE_V1.0CONUS_0.25deg.lnx." + date_str + ".RT"

        try:
            resp = requests.get(url, timeout=30)
        except Exception:
            skipped_dates.append(date_str)
            continue

        if resp.status_code == 404:
            break
        elif resp.status_code != 200:
            skipped_dates.append(date_str)
            continue

        raw = resp.content
        expected = NROWS * NCOLS * 4 * 2
        if len(raw) != expected:
            skipped_dates.append(date_str)
            continue

        data = np.frombuffer(raw, dtype='<f4')
        precip_grid = data[:NROWS * NCOLS].reshape(NROWS, NCOLS)
        gauge_grid  = data[NROWS * NCOLS:].reshape(NROWS, NCOLS)

        rows = []
        obs_date_str = target_date.strftime('%Y-%m-%d')
        now_str = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')

        for r in range(NROWS):
            for c in range(NCOLS):
                raw_val = float(precip_grid[r, c])
                if raw_val <= MISSING:
                    continue
                lat = round(LAT_START + r * STEP, 3)
                lon = round(LON_START + c * STEP, 3)

                # ─── UNIT FIX (2026-02-19) ──────────────────────────
                # NOAA CPC binary stores precipitation in TENTHS of mm
                # (0.1 mm units), NOT full millimeters.
                #   raw_val = 128.5  means 12.85 mm, NOT 128.5 mm
                #   PRECIP_MM = raw_val / 10.0
                #   PRECIP_IN = raw_val / 254.0  (0.1mm -> inches)
                # Previously divided by 25.4, producing 10x inflation.
                # ─────────────────────────────────────────────────────
                pmm = round(raw_val / 10.0, 2)
                pin = round(raw_val / 254.0, 4)
                gc  = int(gauge_grid[r, c])
                rows.append((obs_date_str, lat, lon, pmm, pin, gc, 'RT', now_str))

        if not rows:
            skipped_dates.append(date_str)
            continue

        df = session.create_dataframe(
            rows,
            schema=['OBSERVATION_DATE', 'LATITUDE', 'LONGITUDE',
                    'PRECIP_MM', 'PRECIP_IN', 'GAUGE_COUNT',
                    'FILE_TYPE', 'INGESTED_AT']
        )

        session.sql(
            "DELETE FROM PRF_RAINFALL_REALTIME "
            "WHERE observation_date = '" + obs_date_str + "' AND file_type = 'RT'"
        ).collect()

        df.write.mode("append").save_as_table("PRF_RAINFALL_REALTIME")
        loaded_dates.append(date_str)

    if loaded_dates:
        stats = session.sql(
            "SELECT MAX(observation_date) AS max_date, "
            "COUNT(DISTINCT observation_date) AS total_days, "
            "COUNT(*) AS total_rows "
            "FROM PRF_RAINFALL_REALTIME WHERE file_type = 'RT'"
        ).collect()
        return (
            "LOADED|" + str(len(loaded_dates)) + "|" + str(len(skipped_dates))
            + "|" + loaded_dates[0] + "|" + loaded_dates[-1]
            + "|" + str(stats[0]['MAX_DATE']) + "|" + str(stats[0]['TOTAL_DAYS']) + "|" + str(stats[0]['TOTAL_ROWS'])
            + "|" + (",".join(skipped_dates) if skipped_dates else "none")
        )
    elif skipped_dates:
        return "ERRORS|0|" + str(len(skipped_dates)) + "|" + ",".join(skipped_dates)
    else:
        return "UP_TO_DATE|0|0|" + str(last_loaded)
$$;

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 6: Email notification
-- Calls ingest proc, parses result, sends email to akeenan.
-- ════════════════════════════════════════════════════════════════

CREATE OR REPLACE PROCEDURE SP_INGEST_AND_NOTIFY()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
AS
$$
from datetime import datetime

def run(session):
    RECIPIENT = 'akeenan@texasfcs.com'
    now = datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')

    result = session.sql("CALL SP_INGEST_RT_RAINFALL()").collect()
    raw = str(result[0][0])
    parts = raw.split('|')
    status = parts[0]

    if status == 'LOADED':
        loaded_count  = parts[1]
        skipped_count = parts[2]
        first_date    = parts[3]
        last_date     = parts[4]
        max_date      = parts[5]
        total_days    = parts[6]
        total_rows    = parts[7]
        skipped_list  = parts[8]

        subject = f"PRF Rain RT: {loaded_count} day(s) loaded thru {last_date}"
        body = (
            f"PRF Rainfall RT Ingestion -- {now}\n\n"
            f"STATUS: NEW DATA LOADED\n\n"
            f"Dates loaded: {loaded_count}\n"
            f"  Range: {first_date} to {last_date}\n\n"
            f"Dates skipped: {skipped_count}\n"
            f"  {skipped_list}\n\n"
            f"Table snapshot:\n"
            f"  Latest RT date: {max_date}\n"
            f"  Total RT days:  {total_days}\n"
            f"  Total RT rows:  {total_rows}\n"
        )

    elif status == 'ERRORS':
        skipped_count = parts[2]
        skipped_list  = parts[3]

        subject = f"PRF Rain RT: ERRORS -- {now}"
        body = (
            f"PRF Rainfall RT Ingestion -- {now}\n\n"
            f"STATUS: ERRORS -- NO DATA LOADED\n\n"
            f"No dates successfully loaded.\n"
            f"Dates with errors: {skipped_list}\n\n"
            f"Please check the notebook logs.\n"
        )

    else:
        last_loaded = parts[3]

        subject = f"PRF Rain RT: Up to date -- {last_loaded}"
        body = (
            f"PRF Rainfall RT Ingestion -- {now}\n\n"
            f"STATUS: ALREADY UP TO DATE\n\n"
            f"No new dates to load.\n"
            f"Last loaded RT date: {last_loaded}\n"
            f"NOAA typically publishes with a 1-2 day lag.\n"
        )

    # Escape single quotes in subject/body for SQL
    safe_subject = subject.replace("'", "''")
    safe_body    = body.replace("'", "''")

    session.sql(f"""
        CALL SYSTEM$SEND_EMAIL(
            'prf_email_alerts',
            '{RECIPIENT}',
            '{safe_subject}',
            '{safe_body}'
        )
    """).collect()

    return f"Email sent to {RECIPIENT}: {subject}"
$$;

CALL SP_INGEST_AND_NOTIFY();

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 7: Health check — latest 10 days loaded
-- ════════════════════════════════════════════════════════════════

SELECT
    observation_date,
    file_type,
    COUNT(*)                        AS cell_count,
    ROUND(AVG(precip_mm), 2)       AS avg_mm,
    ROUND(MAX(precip_mm), 2)       AS max_mm,
    ROUND(SUM(precip_mm), 0)       AS total_mm,
    MIN(ingested_at)               AS loaded_at
FROM PRF_RAINFALL_REALTIME
GROUP BY observation_date, file_type
ORDER BY observation_date DESC
LIMIT 10;

In [None]:
-- ════════════════════════════════════════════════════════════════
-- CELL 8: Range-bound RT Reload
-- Usage: CALL SP_RELOAD_RT_RAINFALL('20260101', '20260219');
-- ════════════════════════════════════════════════════════════════

CREATE OR REPLACE PROCEDURE SP_RELOAD_RT_RAINFALL(START_DATE VARCHAR, END_DATE VARCHAR)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python', 'requests', 'numpy')
EXTERNAL_ACCESS_INTEGRATIONS = (noaa_cpc_access)
HANDLER = 'run'
AS
$$
import requests
import numpy as np
from datetime import datetime, timedelta

def run(session, start_date: str, end_date: str):
    NROWS       = 120
    NCOLS       = 300
    LON_START   = -129.875
    LAT_START   =   20.125
    STEP        =    0.25
    MISSING     = -999.0
    BASE_URL    = "https://ftp.cpc.ncep.noaa.gov/precip/CPC_UNI_PRCP/GAUGE_CONUS"

    current = datetime.strptime(start_date, '%Y%m%d').date()
    end     = datetime.strptime(end_date,   '%Y%m%d').date()

    loaded_dates  = []
    skipped_dates = []

    while current <= end:
        date_str = current.strftime('%Y%m%d')
        year_str = current.strftime('%Y')
        url = BASE_URL + "/RT/" + year_str + "/PRCP_CU_GAUGE_V1.0CONUS_0.25deg.lnx." + date_str + ".RT"

        try:
            resp = requests.get(url, timeout=30)
        except Exception:
            skipped_dates.append(date_str)
            current += timedelta(days=1)
            continue

        if resp.status_code != 200:
            skipped_dates.append(date_str)
            current += timedelta(days=1)
            continue

        raw = resp.content
        expected = NROWS * NCOLS * 4 * 2
        if len(raw) != expected:
            skipped_dates.append(date_str)
            current += timedelta(days=1)
            continue

        data = np.frombuffer(raw, dtype='<f4')
        precip_grid = data[:NROWS * NCOLS].reshape(NROWS, NCOLS)
        gauge_grid  = data[NROWS * NCOLS:].reshape(NROWS, NCOLS)

        rows = []
        obs_date_str = current.strftime('%Y-%m-%d')
        now_str = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')

        for r in range(NROWS):
            for c in range(NCOLS):
                raw_val = float(precip_grid[r, c])
                if raw_val <= MISSING:
                    continue
                lat = round(LAT_START + r * STEP, 3)
                lon = round(LON_START + c * STEP, 3)

                # ─── UNIT FIX (2026-02-19) ──────────────────────────
                # NOAA CPC binary: values in 0.1mm (tenths of mm)
                # ─────────────────────────────────────────────────────
                pmm = round(raw_val / 10.0, 2)
                pin = round(raw_val / 254.0, 4)
                gc  = int(gauge_grid[r, c])
                rows.append((obs_date_str, lat, lon, pmm, pin, gc, 'RT', now_str))

        if not rows:
            skipped_dates.append(date_str)
            current += timedelta(days=1)
            continue

        df = session.create_dataframe(
            rows,
            schema=['OBSERVATION_DATE', 'LATITUDE', 'LONGITUDE',
                    'PRECIP_MM', 'PRECIP_IN', 'GAUGE_COUNT',
                    'FILE_TYPE', 'INGESTED_AT']
        )

        session.sql(
            "DELETE FROM PRF_RAINFALL_REALTIME "
            "WHERE observation_date = '" + obs_date_str + "' AND file_type = 'RT'"
        ).collect()

        df.write.mode("append").save_as_table("PRF_RAINFALL_REALTIME")
        loaded_dates.append(date_str)
        current += timedelta(days=1)

    loaded_str  = str(len(loaded_dates))
    skipped_str = str(len(skipped_dates))

    if loaded_dates:
        return (
            "RELOAD_OK|" + loaded_str + " loaded|" + skipped_str + " skipped"
            + "|" + loaded_dates[0] + " thru " + loaded_dates[-1]
            + "|skipped: " + (",".join(skipped_dates) if skipped_dates else "none")
        )
    else:
        return "RELOAD_FAIL|0 loaded|" + skipped_str + " skipped|" + ",".join(skipped_dates)
$$;

-- Run it for whatever range you need:
CALL SP_RELOAD_RT_RAINFALL('20260101', '20260219');

In [None]:
SELECT 
    r.OBSERVATION_DATE,
    COUNT(*) AS row_count,
    SUM(r.PRECIP_IN) AS total_precip,
    MIN(r.PRECIP_IN) AS single_day_value
FROM CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_RAINFALL_REALTIME r
JOIN CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_GRID_REFERENCE g
  ON r.LATITUDE = g.CENTER_LAT
  AND r.LONGITUDE = g.CENTER_LON
WHERE g.GRIDCODE = 8830
  AND r.OBSERVATION_DATE BETWEEN '2026-02-08' AND '2026-02-17'
GROUP BY r.OBSERVATION_DATE
ORDER BY r.OBSERVATION_DATE;