# PRF Premium Rates Pipeline — User Guide

## What This Pipeline Does

This notebook pulls **PRF premium rates** from the USDA RMA public API and loads them into Snowflake. Premium rates represent how risky a given grid/interval is — they're one of the key inputs to the Champion vs. Challenger Streamlit app.

**Destination Table:** `CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES`

---

## Quick Start (Copy & Paste)

If you just want to load all Texas grids for 2025 with default settings:

```sql
CALL RUN_PRF_PREMIUM_PIPELINE(
    ARRAY_CONSTRUCT('Texas'),
    NULL,
    ARRAY_CONSTRUCT(2025),
    ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
    'Grazing', '007', '997', '997',
    '[50,0,50,0,0,0,0,0,0,0,0]',
    5, 60, 'OVERWRITE'
);
```

---

## Notebook Cells (Run in Order)

| Cell | Name | What It Does |
|------|------|-------------|
| 1 | **SETUP** | Sets Snowflake context, creates network rules, creates/replaces the target table |
| 2 | **PROCEDURE** | Creates the `RUN_PRF_PREMIUM_PIPELINE` stored procedure |
| 3 | **EXECUTE** | Where you set your parameters and call the procedure |
| 4 | **AUDIT** | Queries to verify your data loaded correctly |

> **First time?** Run all 4 cells in order.
> **Refreshing data?** Skip to Cell 3 (EXECUTE) — the procedure already exists.

---

## Parameters (Cell 3)

These are the 12 parameters you can configure in the EXECUTE cell:

### Data Selection

| # | Parameter | What It Controls | Example |
|---|-----------|-----------------|---------|
| 1 | `STATE_NAMES` | Which states to load (auto-resolves grids) | `ARRAY_CONSTRUCT('Texas')` |
| 2 | `GRID_IDS` | Specific grids (use instead of states) | `ARRAY_CONSTRUCT(9128, 9129)` |
| 3 | `YEARS` | Policy years to fetch | `ARRAY_CONSTRUCT(2025)` |
| 4 | `COVERAGE_LEVELS` | Coverage percentages | `ARRAY_CONSTRUCT(70, 75, 80, 85, 90)` |

> **Rule:** Use `STATE_NAMES` OR `GRID_IDS` — set the other one to `NULL`.

### Policy Settings

| # | Parameter | Default | Notes |
|---|-----------|---------|-------|
| 5 | `INTENDED_USE` | `'Grazing'` | Friendly label (`'Grazing'` or `'Haying'`) |
| 6 | `INTENDED_USE_CODE` | `'007'` | `'007'` = Grazing, `'006'` = Haying |
| 7 | `IRRIGATION_PRACTICE` | `'997'` | `'997'` = Non-irrigated (standard) |
| 8 | `ORGANIC_PRACTICE` | `'997'` | `'997'` = Not specified (standard) |
| 9 | `INTERVAL_PERCENT` | `'[50,0,50,0,0,0,0,0,0,0,0]'` | Allocation across 11 intervals |

> For most PRF use cases, parameters 7–9 stay at their defaults.

### Performance & Mode

| # | Parameter | Default | Notes |
|---|-----------|---------|-------|
| 10 | `MAX_CONCURRENT` | `5` | Parallel API threads (5–10 recommended) |
| 11 | `REQUEST_TIMEOUT` | `60` | Seconds per API call before timeout |
| 12 | `MODE` | `'OVERWRITE'` | How data is written (see below) |

---

## Load Modes Explained

| Mode | What It Does | When to Use |
|------|-------------|-------------|
| `OVERWRITE` | Drops all existing data, loads fresh | First load or full refresh |
| `APPEND` | Adds rows without checking for duplicates | Use carefully — can create duplicates |
| `MERGE` | Inserts new rows, updates existing ones | Adding new states/grids to existing data |

**Recommended workflow:**
1. **First load →** `OVERWRITE`
2. **Adding more states later →** `MERGE`
3. **Annual refresh (new year's rates) →** `MERGE`

---

## Common Scenarios

### Load all grids for one state
```sql
CALL RUN_PRF_PREMIUM_PIPELINE(
    ARRAY_CONSTRUCT('Texas'), NULL,
    ARRAY_CONSTRUCT(2025),
    ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
    'Grazing', '007', '997', '997',
    '[50,0,50,0,0,0,0,0,0,0,0]',
    5, 60, 'OVERWRITE'
);
```

### Load multiple states at once
```sql
CALL RUN_PRF_PREMIUM_PIPELINE(
    ARRAY_CONSTRUCT('Texas', 'Oklahoma', 'New Mexico'), NULL,
    ARRAY_CONSTRUCT(2025),
    ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
    'Grazing', '007', '997', '997',
    '[50,0,50,0,0,0,0,0,0,0,0]',
    10, 60, 'OVERWRITE'
);
```
> Bump `MAX_CONCURRENT` to 10 for multi-state loads.

### Load only King Ranch grids
```sql
CALL RUN_PRF_PREMIUM_PIPELINE(
    NULL,
    ARRAY_CONSTRUCT(9128, 9129, 9130, 9131, 8828, 8829, 8830, 8831,
                    8528, 8529, 8530, 8531, 8228, 8229, 8230, 8231,
                    7928, 7929, 7930, 7931),
    ARRAY_CONSTRUCT(2025),
    ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
    'Grazing', '007', '997', '997',
    '[50,0,50,0,0,0,0,0,0,0,0]',
    5, 60, 'MERGE'
);
```

### Add a new state without wiping existing data
```sql
CALL RUN_PRF_PREMIUM_PIPELINE(
    ARRAY_CONSTRUCT('Colorado'), NULL,
    ARRAY_CONSTRUCT(2025),
    ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
    'Grazing', '007', '997', '997',
    '[50,0,50,0,0,0,0,0,0,0,0]',
    5, 60, 'MERGE'
);
```

---

## Table Creation Mode (Cell 1)

Cell 1 has a toggle at the top:

```sql
SET TABLE_MODE = 1;
```

| Mode | Behavior |
|------|----------|
| `1` | **Clean slate** — drops and recreates the table (use for first setup or schema changes) |
| `2` | **Preserve data** — keeps existing rows, adds any new columns |

> After your first run, switch to `TABLE_MODE = 2` so you don't accidentally wipe data.

---

## How to Verify Your Data (Cell 4)

Cell 4 runs three audit queries:

1. **Summary by state** — shows grid counts, year counts, and total rows per state
2. **Completeness check** — flags any grid/year combos that don't have the expected 55 rows (11 intervals × 5 coverage levels). If anything shows up here, those grids had API errors — re-run them with `MERGE` mode.
3. **Sample records** — eyeball check on actual data

---

## Error Handling

The pipeline won't crash if some grids fail. Here's what happens:

- **Bad state name →** Returns an error immediately, nothing loads
- **Grid missing from MAP_YTD →** Warns you, skips that grid, processes the rest
- **API call fails for one grid →** Logs the error, keeps going with other grids
- **API returns empty data →** Logged as an error, no rows written for that combo

At the end you get a summary like:
```
COMPLETE | Mode: OVERWRITE | 487 grids | 26,785 rows | 3 errors | 142.5s
```

If you have errors, just re-run those specific grids with `MERGE` mode.

---

## Performance Tips

| Scenario | Grids | Est. API Calls | Suggested MAX_CONCURRENT | Est. Time |
|----------|-------|---------------|--------------------------|-----------|
| King Ranch only | 20 | 100 | 5 | ~30s |
| Single state (small) | ~50 | 250 | 5 | ~1 min |
| Texas (all grids) | ~500 | 2,500 | 10 | ~5–10 min |
| Multi-state | ~1,000+ | 5,000+ | 10 | ~15–20 min |

> **Formula:** API calls = grids × years × coverage levels (e.g., 500 × 1 × 5 = 2,500)

---

## Where This Fits

```
┌─────────────────────────────────────────────┐
│           Preparation Pipelines             │
├─────────────────────────────────────────────┤
│  1. FTP Ingestion (ADM source files)        │
│  2. Rain Index    → RAIN_INDEX_PLATINUM_ENH │
│  3. Subsidies     → SUBSIDYPERCENT_PLATINUM │
│  4. Premiums      → PRF_PREMIUM_RATES    ◄──── THIS PIPELINE
│  5. County Base   → PRF_COUNTY_BASE_VALUES  │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
        Streamlit PRF Application
     (Champion vs Challenger Engine)
```

Premium rates feed directly into the app's ROI calculations:
**Protection × Premium Rate = Total Premium → minus Subsidy = Producer Premium**

In [None]:
-- ====================================================================
-- SETUP: Context, Network Rules, Target Table, Log Table
-- ====================================================================
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE COMPUTE_WH;
USE DATABASE CAPITAL_MARKETS_SANDBOX;
USE SCHEMA PUBLIC;

-- ====================================================================
-- PARAMETER: TABLE CREATION MODE
--   1 = CREATE OR REPLACE (clean slate, drops existing data)
--   2 = ALTER TABLE (preserve existing data, add new columns)
-- ====================================================================
SET TABLE_MODE = 1;

-- Network Rule (shared with Rain Index pipeline)
CREATE OR REPLACE NETWORK RULE usda_api_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('public-rma.fpac.usda.gov');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION usda_api_integration
    ALLOWED_NETWORK_RULES = (usda_api_network_rule)
    ENABLED = TRUE;

-- Pipeline Log Table (always replace — it's just a monitoring tool)
CREATE OR REPLACE TABLE CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PIPELINE_LOG (
    LOG_TIME        TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    STEP            VARCHAR,
    MESSAGE         VARCHAR
);

-- Target Table
EXECUTE IMMEDIATE $$
BEGIN
    IF ($TABLE_MODE = 1) THEN
        CREATE OR REPLACE TABLE CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES (
            GRID_ID                 NUMBER,
            STATE_CODE              VARCHAR,
            COUNTY_CODE             VARCHAR,
            INTENDED_USE            VARCHAR,
            COVERAGE_LEVEL          VARCHAR,
            YEAR                    NUMBER,
            INDEX_INTERVAL_CODE     VARCHAR,
            INDEX_INTERVAL_NAME     VARCHAR,
            PREMIUMRATE             FLOAT,
            INSERT_TIMESTAMP        TIMESTAMP_NTZ
        );
        RETURN 'Mode 1: Table replaced (clean slate).';
    ELSE
        CREATE TABLE IF NOT EXISTS CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES (
            GRID_ID                 NUMBER,
            INTENDED_USE            VARCHAR,
            COVERAGE_LEVEL          VARCHAR,
            YEAR                    NUMBER,
            INDEX_INTERVAL_CODE     VARCHAR,
            INDEX_INTERVAL_NAME     VARCHAR,
            PREMIUMRATE             FLOAT,
            INSERT_TIMESTAMP        TIMESTAMP_NTZ
        );
        ALTER TABLE CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES
            ADD COLUMN IF NOT EXISTS STATE_CODE VARCHAR;
        ALTER TABLE CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES
            ADD COLUMN IF NOT EXISTS COUNTY_CODE VARCHAR;
        RETURN 'Mode 2: Existing data preserved, new columns added.';
    END IF;
END;
$$;

In [None]:
-- ====================================================================
-- MASTER PROCEDURE: RUN_PRF_PREMIUM_PIPELINE (V2)
-- ====================================================================
-- Upgrades from V1:
--   - requests.Session() for connection pooling (matches Rain Index)
--   - GRID_BATCH parameter for parallel cell execution
--   - Retry with backoff on failed API calls
--   - Longer default timeout (180s, matches Rain Index)
--   - Real-time progress logging to PRF_PIPELINE_LOG
-- ====================================================================

CREATE OR REPLACE PROCEDURE CAPITAL_MARKETS_SANDBOX.PUBLIC.RUN_PRF_PREMIUM_PIPELINE(
    STATE_NAMES             ARRAY    DEFAULT NULL,
    GRID_IDS                ARRAY    DEFAULT NULL,
    YEARS                   ARRAY    DEFAULT ARRAY_CONSTRUCT(2025),
    COVERAGE_LEVELS         ARRAY    DEFAULT ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
    INTENDED_USE            VARCHAR  DEFAULT 'Grazing',
    INTENDED_USE_CODE       VARCHAR  DEFAULT '007',
    IRRIGATION_PRACTICE     VARCHAR  DEFAULT '997',
    ORGANIC_PRACTICE        VARCHAR  DEFAULT '997',
    INTERVAL_PERCENT        VARCHAR  DEFAULT '[50,0,50,0,0,0,0,0,0,0,0]',
    MAX_CONCURRENT          INT      DEFAULT 10,
    REQUEST_TIMEOUT         INT      DEFAULT 180,
    MODE                    VARCHAR  DEFAULT 'OVERWRITE',
    GRID_BATCH              VARCHAR  DEFAULT NULL
)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('snowflake-snowpark-python', 'requests', 'pandas')
HANDLER = 'main'
EXTERNAL_ACCESS_INTEGRATIONS = (usda_api_integration)
AS
$$
import requests
import pandas as pd
import datetime
import snowflake.snowpark as snowpark
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
import math

def main(session: snowpark.Session,
         state_names: list = None,
         grid_ids: list = None,
         years: list = None,
         coverage_levels: list = None,
         intended_use: str = 'Grazing',
         intended_use_code: str = '007',
         irrigation_practice: str = '997',
         organic_practice: str = '997',
         interval_percent: str = '[50,0,50,0,0,0,0,0,0,0,0]',
         max_concurrent: int = 10,
         request_timeout: int = 180,
         mode: str = 'OVERWRITE',
         grid_batch: str = None) -> str:

    pipeline_start = datetime.datetime.now()

    LOG_TABLE = "CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PIPELINE_LOG"

    # ================================================================
    # HELPER: LOG TO TABLE
    # ================================================================
    def log(step, message):
        """Write a progress row to PRF_PIPELINE_LOG."""
        safe_msg = message.replace("'", "''")
        safe_step = step.replace("'", "''")
        try:
            session.sql(f"""
                INSERT INTO {LOG_TABLE} (LOG_TIME, STEP, MESSAGE)
                VALUES (CURRENT_TIMESTAMP(), '{safe_step}', '{safe_msg}')
            """).collect()
        except:
            pass
        print(f"  [{step}] {message}")

    # ================================================================
    # VALIDATE MODE
    # ================================================================
    mode = mode.upper()
    if mode not in ['OVERWRITE', 'APPEND', 'MERGE']:
        return f"ERROR: Invalid MODE '{mode}'. Must be OVERWRITE, APPEND, or MERGE."

    # ================================================================
    # DEFAULTS
    # ================================================================
    if years is None:
        years = [2025]
    if coverage_levels is None:
        coverage_levels = [70, 75, 80, 85, 90]

    TARGET_TABLE = "CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES"
    STAGING_TABLE = "CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES_STAGING"
    BASE_URL = "https://public-rma.fpac.usda.gov/apps/PrfWebApi/PrfExternalPricingRates/GetPricingRates"

    INTERVAL_MAP = {
        "625": "Jan-Feb", "626": "Feb-Mar", "627": "Mar-Apr", "628": "Apr-May",
        "629": "May-Jun", "630": "Jun-Jul", "631": "Jul-Aug", "632": "Aug-Sep",
        "633": "Sep-Oct", "634": "Oct-Nov", "635": "Nov-Dec"
    }

    # Clear previous log entries only if NOT batching (avoid clearing other batch logs)
    if grid_batch is None:
        try:
            session.sql(f"TRUNCATE TABLE {LOG_TABLE}").collect()
        except:
            pass

    batch_label = f" | Batch: {grid_batch}" if grid_batch else ""
    log("START", f"Pipeline started | Mode: {mode} | Use: {intended_use} | Years: {years} | Coverage: {coverage_levels}{batch_label}")

    # ================================================================
    # STEP 1: RESOLVE GRIDS
    # ================================================================
    log("RESOLVE", "Resolving grids...")

    if grid_ids and len(grid_ids) > 0:
        valid_grids = []
        skipped_grids = []
        for g in grid_ids:
            try:
                valid_grids.append(int(g))
            except (ValueError, TypeError):
                skipped_grids.append(str(g))

        if skipped_grids:
            log("RESOLVE", f"WARNING: Skipped {len(skipped_grids)} non-numeric grid IDs: {skipped_grids[:10]}")

        resolved_grids = valid_grids
        log("RESOLVE", f"Using {len(resolved_grids)} explicitly provided grid IDs.")

    elif state_names and len(state_names) > 0:
        states_upper = [s.strip().upper() for s in state_names]
        states_sql = ",".join([f"'{s}'" for s in states_upper])

        grid_df = session.sql(f"""
            SELECT DISTINCT
                TRY_TO_NUMBER(GRID_ID) AS GRID_ID,
                STATE_CODE,
                STATE_ABBREVIATION
            FROM CAPITAL_MARKETS_SANDBOX.PUBLIC.GRID_STATE_PRF
            WHERE (UPPER(STATE_NAME) IN ({states_sql})
               OR UPPER(STATE_ABBREVIATION) IN ({states_sql}))
               AND TRY_TO_NUMBER(GRID_ID) IS NOT NULL
            ORDER BY 1
        """).to_pandas()

        if grid_df.empty:
            log("ERROR", f"No grids found for states: {state_names}")
            return f"ERROR: No grids found for states: {state_names}"

        resolved_grids = [int(g) for g in grid_df['GRID_ID'].tolist()]
        states_found = grid_df['STATE_ABBREVIATION'].unique().tolist()
        log("RESOLVE", f"Resolved {len(resolved_grids)} numeric grids for states: {states_found}")
    else:
        return "ERROR: Must provide either STATE_NAMES or GRID_IDS."

    if not resolved_grids:
        return "ERROR: No valid numeric grid IDs to process."

    # ================================================================
    # STEP 1B: APPLY GRID BATCH FILTER
    # ================================================================
    if grid_batch:
        try:
            parts = grid_batch.lower().replace(' ', '').split('of')
            batch_num = int(parts[0])
            batch_total = int(parts[1])

            if batch_num < 1 or batch_num > batch_total:
                return f"ERROR: Invalid GRID_BATCH '{grid_batch}'. Batch number must be between 1 and {batch_total}."

            resolved_grids.sort()
            chunk_size = math.ceil(len(resolved_grids) / batch_total)
            start_idx = (batch_num - 1) * chunk_size
            end_idx = min(start_idx + chunk_size, len(resolved_grids))
            all_grids_count = len(resolved_grids)
            resolved_grids = resolved_grids[start_idx:end_idx]

            log("BATCH", f"Batch {batch_num} of {batch_total}: grids {start_idx+1}-{end_idx} of {all_grids_count} ({len(resolved_grids)} grids)")
        except Exception as e:
            return f"ERROR: Invalid GRID_BATCH format '{grid_batch}'. Use '1 of 4', '2 of 4', etc. Error: {str(e)}"

    # ================================================================
    # STEP 2: BUILD GRID → STATE/COUNTY LOOKUP
    # ================================================================
    log("LOOKUP", "Looking up state/county codes from MAP_YTD...")

    grid_list_sql = ",".join([f"'{g}'" for g in resolved_grids])

    lookup_df = session.sql(f"""
        SELECT DISTINCT
            TRY_TO_NUMBER(SUB_COUNTY_CODE) AS GRID_ID,
            STATE_CODE,
            COUNTY_CODE
        FROM CAPITAL_MARKETS_SANDBOX.PUBLIC.MAP_YTD
        WHERE SUB_COUNTY_CODE IN ({grid_list_sql})
          AND TRY_TO_NUMBER(SUB_COUNTY_CODE) IS NOT NULL
    """).to_pandas()

    grid_lookup = {}
    for _, row in lookup_df.iterrows():
        grid_lookup[int(row['GRID_ID'])] = {
            'state_code': str(row['STATE_CODE']),
            'county_code': str(row['COUNTY_CODE'])
        }

    missing_grids = [g for g in resolved_grids if g not in grid_lookup]
    if missing_grids:
        log("LOOKUP", f"WARNING: {len(missing_grids)} grids not found in MAP_YTD: {missing_grids[:10]}...")
        resolved_grids = [g for g in resolved_grids if g in grid_lookup]

    log("LOOKUP", f"Lookup ready for {len(resolved_grids)} grids.")

    # ================================================================
    # STEP 3: BUILD API CALL LIST
    # ================================================================
    api_calls = []
    for grid_id in resolved_grids:
        info = grid_lookup[grid_id]
        for year in years:
            for cov in coverage_levels:
                api_calls.append({
                    'grid_id': grid_id,
                    'state_code': info['state_code'],
                    'county_code': info['county_code'],
                    'year': int(year),
                    'coverage_level': int(cov)
                })

    total_calls = len(api_calls)
    log("BUILD", f"{len(resolved_grids)} grids x {len(years)} years x {len(coverage_levels)} coverage levels = {total_calls} API calls")

    # ================================================================
    # STEP 4: FETCH DATA FROM API (PARALLEL + CONNECTION POOLING)
    # ================================================================
    log("FETCH", f"Starting API fetch ({max_concurrent} concurrent, connection pooling enabled)...")

    all_rows = []
    errors = []
    retried = 0
    run_timestamp = datetime.datetime.now()

    # Shared HTTP session for connection pooling (matches Rain Index pattern)
    http_session = requests.Session()

    def fetch_single(call_info, attempt=1):
        """Fetch premium rates for one grid/year/coverage combination."""
        params = {
            "intervalType": "BiMonthly",
            "irrigationPracticeCode": irrigation_practice,
            "organicPracticeCode": organic_practice,
            "intendedUseCode": intended_use_code,
            "stateCode": call_info['state_code'],
            "countyCode": call_info['county_code'],
            "productivityFactor": "100",
            "insurableInterest": "100",
            "insuredAcres": "1000",
            "sampleYear": str(call_info['year']),
            "intervalPercentOfValues": interval_percent,
            "coverageLevelPercent": str(call_info['coverage_level']),
            "gridId": str(call_info['grid_id']),
            "gridName": str(call_info['grid_id'])
        }

        try:
            resp = http_session.get(BASE_URL, params=params, timeout=request_timeout)
            resp.raise_for_status()
            data = resp.json()

            if 'returnData' not in data or 'PricingRateRows' not in data['returnData']:
                return [], f"No PricingRateRows for Grid {call_info['grid_id']}, Year {call_info['year']}, Cov {call_info['coverage_level']}", False

            rows = []
            for row in data['returnData']['PricingRateRows']:
                code = row.get('IntervalCode')
                if code and code != 'Total':
                    rows.append((
                        int(call_info['grid_id']),
                        call_info['state_code'],
                        call_info['county_code'],
                        intended_use,
                        f"{call_info['coverage_level']}%",
                        call_info['year'],
                        code,
                        INTERVAL_MAP.get(code, code),
                        row.get('PremiumRate'),
                        run_timestamp
                    ))
            return rows, None, False

        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            # Retry once on timeout/connection errors
            if attempt == 1:
                time.sleep(2)  # Brief backoff
                return [], str(e), True  # Signal retry needed
            return [], f"Error Grid {call_info['grid_id']}, Year {call_info['year']}, Cov {call_info['coverage_level']} (attempt {attempt}): {str(e)}", False

        except Exception as e:
            return [], f"Error Grid {call_info['grid_id']}, Year {call_info['year']}, Cov {call_info['coverage_level']}: {str(e)}", False

    # Execute with thread pool
    completed = 0
    last_log_time = time.time()
    LOG_INTERVAL_SECONDS = 30
    retry_queue = []

    with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
        futures = {executor.submit(fetch_single, call, 1): call for call in api_calls}

        for future in as_completed(futures):
            rows, error, needs_retry = future.result()
            if rows:
                all_rows.extend(rows)
            if needs_retry:
                retry_queue.append(futures[future])
            elif error:
                errors.append(error)
            completed += 1

            now = time.time()
            if (now - last_log_time >= LOG_INTERVAL_SECONDS) or completed == total_calls:
                pct = round(100 * completed / total_calls, 1)
                elapsed = round(now - pipeline_start.timestamp())
                log("FETCH", f"Progress: {completed}/{total_calls} ({pct}%) | {len(all_rows)} rows | {len(errors)} errors | {elapsed}s elapsed")
                last_log_time = now

    # Process retries
    if retry_queue:
        log("RETRY", f"Retrying {len(retry_queue)} failed calls...")
        with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
            futures = {executor.submit(fetch_single, call, 2): call for call in retry_queue}
            for future in as_completed(futures):
                rows, error, _ = future.result()
                if rows:
                    all_rows.extend(rows)
                    retried += 1
                if error:
                    errors.append(error)
        log("RETRY", f"Recovered {retried} calls on retry.")

    # Close the shared HTTP session
    http_session.close()

    log("FETCH", f"Fetch complete: {len(all_rows)} total rows, {len(errors)} errors, {retried} recovered on retry")

    if errors and len(errors) <= 10:
        for e in errors:
            print(f"    ! {e}")
    elif errors:
        log("FETCH", f"Showing first 10 of {len(errors)} errors")
        for e in errors[:10]:
            print(f"    ! {e}")

    if not all_rows:
        log("ERROR", f"No data returned. {len(errors)} errors.")
        return f"COMPLETE: No data returned. {len(errors)} errors."

    # ================================================================
    # STEP 5: WRITE TO SNOWFLAKE
    # ================================================================
    log("WRITE", f"Writing {len(all_rows)} rows (mode={mode})...")

    columns = [
        "GRID_ID", "STATE_CODE", "COUNTY_CODE", "INTENDED_USE",
        "COVERAGE_LEVEL", "YEAR", "INDEX_INTERVAL_CODE",
        "INDEX_INTERVAL_NAME", "PREMIUMRATE", "INSERT_TIMESTAMP"
    ]

    df = session.create_dataframe(all_rows, schema=columns)

    if mode == 'OVERWRITE':
        df.write.mode("overwrite").save_as_table(TARGET_TABLE)
        log("WRITE", f"OVERWRITE complete: {len(all_rows)} rows.")

    elif mode == 'APPEND':
        df.write.mode("append").save_as_table(TARGET_TABLE)
        log("WRITE", f"APPEND complete: {len(all_rows)} rows added.")

    elif mode == 'MERGE':
        # Use batch-specific staging table to avoid conflicts with parallel runs
        if grid_batch:
            batch_id = grid_batch.lower().replace(' ', '_').replace('of', '')
            STAGING_TABLE_USED = f"{STAGING_TABLE}_{batch_id}"
        else:
            STAGING_TABLE_USED = STAGING_TABLE

        df.write.mode("overwrite").save_as_table(STAGING_TABLE_USED)

        merge_sql = f"""
            MERGE INTO {TARGET_TABLE} AS tgt
            USING {STAGING_TABLE_USED} AS src
            ON  tgt.GRID_ID              = src.GRID_ID
            AND tgt.YEAR                 = src.YEAR
            AND tgt.COVERAGE_LEVEL       = src.COVERAGE_LEVEL
            AND tgt.INDEX_INTERVAL_CODE  = src.INDEX_INTERVAL_CODE
            AND tgt.INTENDED_USE         = src.INTENDED_USE
            WHEN MATCHED THEN UPDATE SET
                tgt.STATE_CODE          = src.STATE_CODE,
                tgt.COUNTY_CODE         = src.COUNTY_CODE,
                tgt.INDEX_INTERVAL_NAME = src.INDEX_INTERVAL_NAME,
                tgt.PREMIUMRATE         = src.PREMIUMRATE,
                tgt.INSERT_TIMESTAMP    = src.INSERT_TIMESTAMP
            WHEN NOT MATCHED THEN INSERT (
                GRID_ID, STATE_CODE, COUNTY_CODE, INTENDED_USE,
                COVERAGE_LEVEL, YEAR, INDEX_INTERVAL_CODE,
                INDEX_INTERVAL_NAME, PREMIUMRATE, INSERT_TIMESTAMP
            ) VALUES (
                src.GRID_ID, src.STATE_CODE, src.COUNTY_CODE, src.INTENDED_USE,
                src.COVERAGE_LEVEL, src.YEAR, src.INDEX_INTERVAL_CODE,
                src.INDEX_INTERVAL_NAME, src.PREMIUMRATE, src.INSERT_TIMESTAMP
            )
        """
        result = session.sql(merge_sql).collect()
        session.sql(f"DROP TABLE IF EXISTS {STAGING_TABLE_USED}").collect()
        log("WRITE", f"MERGE complete: {result}")

    # ================================================================
    # SUMMARY
    # ================================================================
    elapsed = (datetime.datetime.now() - pipeline_start).total_seconds()
    summary = (
        f"COMPLETE | Mode: {mode} | "
        f"{len(resolved_grids)} grids | {len(all_rows)} rows | "
        f"{len(errors)} errors | {retried} retries recovered | {elapsed:.1f}s"
        f"{batch_label}"
    )
    log("DONE", summary)
    return summary
$$;

In [None]:
-- ====================================================================
-- RUN_PRF_PREMIUM_PIPELINE — Parameter Guide
-- ====================================================================
--
-- PARAMETER 1: STATE_NAMES (ARRAY or NULL)
--   States to load premium rates for (resolves grids automatically)
--   Examples: ARRAY_CONSTRUCT('Texas')
--             ARRAY_CONSTRUCT('Texas', 'Oklahoma')
--   Set to NULL if using GRID_IDS instead
--
-- PARAMETER 2: GRID_IDS (ARRAY or NULL)
--   Specific grid IDs to load (use instead of STATE_NAMES)
--   Examples: ARRAY_CONSTRUCT(9128, 9129, 9130)
--   Set to NULL if using STATE_NAMES instead
--
-- PARAMETER 3: YEARS (ARRAY)
--   Which policy years to fetch rates for
--   Default: ARRAY_CONSTRUCT(2025)
--
-- PARAMETER 4: COVERAGE_LEVELS (ARRAY)
--   Coverage level percentages to fetch
--   Default: ARRAY_CONSTRUCT(70, 75, 80, 85, 90)
--
-- PARAMETER 5: INTENDED_USE (VARCHAR)
--   Friendly label — 'Grazing' or 'Haying'
--   Default: 'Grazing'
--
-- PARAMETER 6: INTENDED_USE_CODE (VARCHAR)
--   RMA code — '007' for Grazing, '006' for Haying
--   Default: '007'
--
-- PARAMETER 7: IRRIGATION_PRACTICE (VARCHAR)
--   '997' = Non-irrigated (standard for PRF)
--   Default: '997'
--
-- PARAMETER 8: ORGANIC_PRACTICE (VARCHAR)
--   '997' = Not specified (standard for PRF)
--   Default: '997'
--
-- PARAMETER 9: INTERVAL_PERCENT (VARCHAR)
--   JSON array of 11 values (percent allocation per interval)
--   Default: '[50,0,50,0,0,0,0,0,0,0,0]'
--   NOTE: This affects rate calculation. Use a balanced default.
--
-- PARAMETER 10: MAX_CONCURRENT (INT)
--   Parallel API threads (5-10 recommended)
--   Default: 5
--
-- PARAMETER 11: REQUEST_TIMEOUT (INT)
--   Seconds per API call before timeout
--   Default: 60
--
-- PARAMETER 12: MODE (VARCHAR)
--   'OVERWRITE' — Drop and reload all data
--   'APPEND'    — Add rows (risk of duplicates)
--   'MERGE'     — Upsert on GRID_ID + YEAR + COVERAGE_LEVEL +
--                 INDEX_INTERVAL_CODE + INTENDED_USE
-- ====================================================================

CALL RUN_PRF_PREMIUM_PIPELINE(
    ARRAY_CONSTRUCT('Oklahoma'),                  -- 1.  STATE_NAMES
    NULL,                                      -- 2.  GRID_IDS (NULL = use states)
    ARRAY_CONSTRUCT(2025),                     -- 3.  YEARS
    ARRAY_CONSTRUCT(70, 75, 80, 85, 90),       -- 4.  COVERAGE_LEVELS
    'Grazing',                                 -- 5.  INTENDED_USE
    '007',                                     -- 6.  INTENDED_USE_CODE
    '997',                                     -- 7.  IRRIGATION_PRACTICE
    '997',                                     -- 8.  ORGANIC_PRACTICE
    '[50,0,50,0,0,0,0,0,0,0,0]',              -- 9.  INTERVAL_PERCENT
    10,                                         -- 10. MAX_CONCURRENT
    60,                                        -- 11. REQUEST_TIMEOUT
    'OVERWRITE'                                -- 12. MODE
);

-- ====================================================================
-- OTHER EXAMPLES (uncomment to use)
-- ====================================================================

-- === King Ranch grids only (MERGE mode) ===
-- CALL RUN_PRF_PREMIUM_PIPELINE(
--     NULL,
--     ARRAY_CONSTRUCT(9128, 9129, 9130, 9131, 8828, 8829, 8830, 8831,
--                     8528, 8529, 8530, 8531, 8228, 8229, 8230, 8231,
--                     7928, 7929, 7930, 7931),
--     ARRAY_CONSTRUCT(2025),
--     ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
--     'Grazing', '007', '997', '997',
--     '[50,0,50,0,0,0,0,0,0,0,0]',
--     5, 60, 'MERGE'
-- );

-- === Multi-state load ===
-- CALL RUN_PRF_PREMIUM_PIPELINE(
--     ARRAY_CONSTRUCT('Texas', 'Oklahoma', 'New Mexico'),
--     NULL,
--     ARRAY_CONSTRUCT(2025),
--     ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
--     'Grazing', '007', '997', '997',
--     '[50,0,50,0,0,0,0,0,0,0,0]',
--     10, 60, 'MERGE'
-- );

In [None]:
-- ====================================================================
-- OTHER EXAMPLES (uncomment to use)
-- ====================================================================

-- === King Ranch grids only (MERGE mode) ===
 CALL RUN_PRF_PREMIUM_PIPELINE(
     NULL,
     ARRAY_CONSTRUCT(9128, 9129, 9130, 9131, 8828, 8829, 8830, 8831,
                     8528, 8529, 8530, 8531, 8228, 8229, 8230, 8231,
                     7928, 7929, 7930, 7931),
     ARRAY_CONSTRUCT(2025),
     ARRAY_CONSTRUCT(70, 75, 80, 85, 90),
     'Grazing', '007', '997', '997',
     '[50,0,50,0,0,0,0,0,0,0,0]',
     10, 60, 'MERGE'
 );

In [None]:
-- ====================================================================
-- AUDIT: Verify loaded data
-- ====================================================================

-- 1. Summary by state
SELECT
    STATE_CODE,
    COUNT(DISTINCT GRID_ID)         AS GRIDS,
    COUNT(DISTINCT YEAR)            AS YEARS,
    COUNT(DISTINCT COVERAGE_LEVEL)  AS COVERAGE_LEVELS,
    COUNT(*)                        AS TOTAL_ROWS
FROM CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES
GROUP BY STATE_CODE
ORDER BY STATE_CODE;

-- 2. Completeness check: every grid should have 55 rows
--    (11 intervals x 5 coverage levels per year)
SELECT
    GRID_ID,
    YEAR,
    COUNT(*) AS ROW_COUNT,
    CASE WHEN COUNT(*) = 55 THEN 'COMPLETE' ELSE 'INCOMPLETE' END AS STATUS
FROM CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES
GROUP BY GRID_ID, YEAR
HAVING COUNT(*) != 55
ORDER BY GRID_ID, YEAR;

-- 3. Sample records
SELECT *
FROM CAPITAL_MARKETS_SANDBOX.PUBLIC.PRF_PREMIUM_RATES
ORDER BY GRID_ID, YEAR, COVERAGE_LEVEL, INDEX_INTERVAL_CODE
LIMIT 100;

In [None]:
DROP PROCEDURE IF EXISTS CAPITAL_MARKETS_SANDBOX.PUBLIC.RUN_PRF_PREMIUM_PIPELINE(
    ARRAY, ARRAY, ARRAY, ARRAY, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, INT, INT, VARCHAR
);