In [None]:

# Install required packages for this notebook (run once per environment)
# Tip: In Jupyter, use %pip so installs go to the current kernel.
%pip install -q geopandas pandas numpy xarray rioxarray rasterio rasterstats requests tqdm pyproj shapely fiona py7zr nbformat netCDF4 h5netcdf pydap cftime

# Optional performance/feature extras:
# %pip install -q aiohttp


In [27]:

# === DATA SOURCE CONFIG ===
PRISM_SOURCE = "CSV_ONLY"  # enforce CSV-only PRISM
print("PRISM_SOURCE =", PRISM_SOURCE)


PRISM_SOURCE = CSV_ONLY



# Minnesota Corn ‚Äî Minimal Schema Builder (May/Jul/Aug horizons)

This notebook downloads and assembles external datasets into the **minimal schema** we discussed for county-level corn modeling in Minnesota.

**What you get**
- County-month tables keyed by `fips, date, horizon_tag` with PRISM, GLDAS, NDVI, drought, ONI, soils, CDL corn fraction, basis/ethanol/diesel proxies, and NASS targets (yield, harvested acres).  
- Helper functions to **download, cache, and aggregate** to counties (area-weighted) and compute the engineered features (e.g., **hot-day counts**, **precip‚àíET**, **VPD proxy**, NDVI peak and anomaly, **USDM D2+** time-in-class).

> ‚ö†Ô∏è **Requirements**: Internet access, and the following Python packages:  
`pip install geopandas pandas numpy xarray rioxarray rasterio rasterstats requests tqdm pyproj shapely fiona py7zr`  
Optional for speed: `pip install aiohttp`

> üîê **Credentials you must provide** (set in the next cell):  
- **NASA Earthdata** username/password (for GLDAS) ‚Äî used via `.netrc` cookie login.  
- **USDA NASS QuickStats API key** (for county targets).  
- **AppEEARS token** (optional; for MODIS NDVI); we also provide a Sentinel-2 fallback via STAC.

**Output**
- A single, tidy CSV: `/mnt/data/mn_minimal_schema.csv`


In [28]:

# --- Helper: Robust xarray opener for gridMET (OPeNDAP or local .nc) ---
import os, io, sys, requests
from pathlib import Path
import xarray as xr

GRIDMET_BASE = "https://www.northwestknowledge.net/thredds"
CACHE_NC_DIR = Path("data_cache/gridmet_nc")
CACHE_NC_DIR.mkdir(parents=True, exist_ok=True)

def open_gridmet(var: str, year: int):
    """Open gridMET <var>_<year>.nc using OPeNDAP if possible; 
    otherwise download the NetCDF locally and open with netCDF4/h5netcdf.
    
    var in {'pr','tmmx','tmmn','vpd','eto',...}
    """
    # Try OPeNDAP via pydap engine
    # OPeNDAP URL (dodsC):
    opendap_url = f"{GRIDMET_BASE}/dodsC/MET/{var}_{year}.nc"
    try:
        ds = xr.open_dataset(opendap_url, engine="pydap", chunks={})
        # Touch a small piece to validate
        _ = list(ds.variables)[:1]
        return ds
    except Exception as e:
        # Fallback: HTTPServer 'fileServer' direct download
        http_url = f"{GRIDMET_BASE}/fileServer/MET/{var}_{year}.nc"
        local_nc = CACHE_NC_DIR / f"{var}_{year}.nc"
        if not local_nc.exists():
            r = requests.get(http_url, stream=True, timeout=300)
            r.raise_for_status()
            with open(local_nc, "wb") as f:
                for chunk in r.iter_content(1<<20):
                    if chunk: f.write(chunk)
        # Try netCDF4 first, then h5netcdf
        try:
            return xr.open_dataset(local_nc, engine="netCDF4")
        except Exception:
            return xr.open_dataset(local_nc, engine="h5netcdf")


In [49]:

# --- USER CONFIG ---
START_YEAR = 2000
END_YEAR   = 2024

# Horizons to compute (use last day of month as cutoff)
HORIZONS = {
    "FEB": {"month": 2,  "label": "FEB"},
    "MAR": {"month": 3,  "label": "MAR"},
    "APR": {"month": 4,  "label": "APR"},
    "MAY": {"month": 5,  "label": "MAY"},
    "JUN": {"month": 6,  "label": "JUN"},
    "JUL": {"month": 7,  "label": "JUL"},
    "AUG": {"month": 8,  "label": "AUG"},
}

# Paths (cache)
DATA_DIR = "data_cache"
PRISM_DIR = f"{DATA_DIR}/prism"
GLDAS_DIR = f"{DATA_DIR}/gldas"
USDM_DIR  = f"{DATA_DIR}/usdm"
CDL_DIR   = f"{DATA_DIR}/cdl"
SOIL_DIR  = f"{DATA_DIR}/soils"
NDVI_DIR  = f"{DATA_DIR}/ndvi"
AUX_DIR   = f"{DATA_DIR}/aux"

# üîê Credentials (fill in as needed)
# NASA Earthdata (GLDAS). You should also create a ~/.netrc file for requests to use.
EARTHDATA_USERNAME = "rocketman01"
EARTHDATA_PASSWORD = "Starship05!"

# NASS QuickStats
NASS_API_KEY = "606153B9-DDAD-30FB-88EE-5BCC60DEE678"

# AppEEARS (optional for MODIS NDVI). If not provided, the Sentinel-2 fallback is used.
APPEEARS_TOKEN = None  # e.g., "eyJhbGciOi..."

# Diesel / Ethanol / Basis: we include example endpoints & parsers. Some require free registration.

In [30]:

import os, io, sys, json, math, zipfile, textwrap, datetime as dt
from pathlib import Path
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
from tqdm import tqdm

import geopandas as gpd
from shapely.geometry import box, Point, Polygon
from shapely.ops import unary_union

import rasterio
from rasterio.mask import mask
import rioxarray as rxr
import xarray as xr

import requests

# Ensure cache dirs
for d in [DATA_DIR, PRISM_DIR, GLDAS_DIR, USDM_DIR, CDL_DIR, SOIL_DIR, NDVI_DIR, AUX_DIR]:
    os.makedirs(d, exist_ok=True)

def month_range(start_year, end_year):
    dates = []
    for y in range(start_year, end_year+1):
        for m in range(1, 13):
            dates.append(dt.date(y, m, 1))
    return dates

def end_of_month(d: dt.date) -> dt.date:
    if d.month == 12:
        return dt.date(d.year, 12, 31)
    next_m = dt.date(d.year, d.month+1, 1)
    return next_m - dt.timedelta(days=1)

def ensure_state_counties_mn() -> gpd.GeoDataFrame:
    """Download and return Minnesota county polygons from TIGER/Line (Census)."""
    url = "https://www2.census.gov/geo/tiger/TIGER2023/COUNTY/tl_2023_us_county.zip"
    zpath = os.path.join(AUX_DIR, "tl_2023_us_county.zip")
    if not os.path.exists(zpath):
        r = requests.get(url, timeout=120)
        r.raise_for_status()
        with open(zpath, "wb") as f:
            f.write(r.content)
    with zipfile.ZipFile(zpath) as z:
        z.extractall(os.path.join(AUX_DIR, "tl_2023_us_county"))
    g = gpd.read_file(os.path.join(AUX_DIR, "tl_2023_us_county", "tl_2023_us_county.shp"))
    # Minnesota FIPS = 27
    g = g[g["STATEFP"] == "27"].to_crs(4326)
    g["fips"] = g["STATEFP"] + g["COUNTYFP"]
    g = g[["fips", "NAME", "geometry"]].reset_index(drop=True)
    return g

MN_COUNTIES = ensure_state_counties_mn()
MN_BOUNDS = MN_COUNTIES.to_crs(4326).unary_union.bounds
MN_COUNTIES.head()


  MN_BOUNDS = MN_COUNTIES.to_crs(4326).unary_union.bounds


Unnamed: 0,fips,NAME,geometry
0,27095,Mille Lacs,"POLYGON ((-93.43244 46.06803, -93.43244 46.067..."
1,27045,Fillmore,"POLYGON ((-92.08948 43.50068, -92.08997 43.500..."
2,27073,Lac qui Parle,"POLYGON ((-96.22617 45.21949, -96.22608 45.219..."
3,27085,McLeod,"POLYGON ((-94.13242 44.71755, -94.15264 44.717..."
4,27153,Todd,"POLYGON ((-94.64365 45.90727, -94.64365 45.906..."



## PRISM (Temperature & Precipitation)

We pull **monthly** PRISM tmax, tmin, and precipitation grids and compute:
- `prism_prcp_mm`, `prism_tmax_c`, `prism_tmin_c`
- **Hot-day counts** (>32¬∞C or >35¬∞C approximations via monthly degree-day proxy)
- **VPD proxy** using monthly tmin/tmax

> Note: For exact daily hot-day counts and VPD, prefer daily PRISM. Monthly approximations are used here for efficiency.


In [33]:

import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Point

PRISM_CSV_PATH = "data/PRISM_ppt_tmin_tmean_tmax_tdmean_vpdmin_vpdmax_stable_4km_201001_202408.csv"

def load_prism_csv(path):
    # Read without type coercion to control conversions
    df = pd.read_csv(path)
    orig_cols = list(df.columns)

    # Detect units from original column names
    def has_unit(col_substr, unit_substr):
        return any((col_substr.lower() in oc.lower()) and (unit_substr.lower() in oc.lower()) for oc in orig_cols)

    ppt_in_inches = has_unit("ppt", "inch")
    temp_in_f     = has_unit("tmin", "deg") or has_unit("tmax", "deg") or has_unit("tmean","deg")
    vpd_in_hpa    = has_unit("vpd", "hpa")

    # Standardize column names: lower, remove parentheses content, replace spaces, remove unit tokens
    def std_name(c):
        cl = c.strip().lower()
        import re as _re
        cl = _re.sub(r"\(.*?\)", "", cl)  # drop parenthetical units
        cl = cl.replace("degrees", "").replace("degree", "").replace("deg", "")
        cl = cl.replace("f", "").replace("c", "")
        cl = cl.replace("inches", "").replace("inch", "")
        cl = cl.replace("hpa","").replace("kpa","")
        while "  " in cl:
            cl = cl.replace("  ", " ")
        cl = cl.strip().replace(" ", "_")
        cl = cl.replace("__","_")
        return cl

    df.columns = [std_name(c) for c in df.columns]

    # Candidate date columns
    date_col = None
    for cand in ["date","time","day","month_year","yyyymm"]:
        if cand in df.columns:
            date_col = cand; break
    if date_col is None:
        # Try 'year' and 'month' pair
        if "year" in df.columns and "month" in df.columns:
            df["date"] = pd.to_datetime(df["year"].astype(int).astype(str) + "-" + df["month"].astype(int).astype(str) + "-01")
            date_col = "date"
        else:
            # Try to infer from a column with mm/dd/yyyy strings
            for c in df.columns:
                if df[c].astype(str).str.match(r"^\d{1,2}/\d{1,2}/\d{4}$").any():
                    date_col = c
                    break
    if date_col is None:
        raise ValueError("Could not detect a date column in the PRISM CSV.")

    # Parse dates robustly
    def parse_date_series(s):
        s = s.astype(str).str.strip()
        dt1 = pd.to_datetime(s, errors="coerce", infer_datetime_format=True)
        if dt1.isna().mean() > 0.2:
            try_dt = pd.to_datetime(s, format="%m/%d/%Y", errors="coerce")
            dt1 = try_dt.where(~try_dt.isna(), dt1)
        return dt1

    df["date"] = parse_date_series(df[date_col])
    if df["date"].isna().all():
        raise ValueError("Failed to parse dates in PRISM CSV‚Äîplease check the date column format.")
    df["year"] = df["date"].dt.year
    df["month"] = df["date"].dt.month

    # Map likely variable names after std_name
    lon_col = "longitude" if "longitude" in df.columns else ("lon" if "lon" in df.columns else None)
    lat_col = "latitude"  if "latitude"  in df.columns else ("lat" if "lat" in df.columns else None)

    rename = {}
    for cand in ["ppt","precip","precipitation"]:
        if cand in df.columns: rename[cand] = "ppt_raw"
    for cand in ["tmin","min_temp","tmin_degrees","tmin_degrees_f"]:
        if cand in df.columns: rename[cand] = "tmin_raw"
    for cand in ["tmax","max_temp","tmax_degrees","tmax_degrees_f"]:
        if cand in df.columns: rename[cand] = "tmax_raw"
    for cand in ["tmean","tavg","mean_temp"]:
        if cand in df.columns: rename[cand] = "tmean_raw"
    for cand in ["tdmean","dewpoint_mean","dewpoint"]:
        if cand in df.columns: rename[cand] = "tdmean_raw"
    for cand in ["vpdmin","vpd_min"]:
        if cand in df.columns: rename[cand] = "vpdmin_raw"
    for cand in ["vpdmax","vpd_max"]:
        if cand in df.columns: rename[cand] = "vpdmax_raw"

    df = df.rename(columns=rename)

    # Unit conversions
    if "ppt_raw" in df.columns:
        ppt = pd.to_numeric(df["ppt_raw"], errors="coerce")
        if ppt_in_inches:
            ppt = ppt * 25.4
        df["prism_prcp_mm"] = ppt

    def f_to_c(s):
        s = pd.to_numeric(s, errors="coerce")
        return (s - 32.0) * 5.0/9.0
    if "tmin_raw" in df.columns:
        df["prism_tmin_c"] = f_to_c(df["tmin_raw"]) if temp_in_f else pd.to_numeric(df["tmin_raw"], errors="coerce")
    if "tmax_raw" in df.columns:
        df["prism_tmax_c"] = f_to_c(df["tmax_raw"]) if temp_in_f else pd.to_numeric(df["tmax_raw"], errors="coerce")
    if "tmean_raw" in df.columns:
        df["tmean_c"] = f_to_c(df["tmean_raw"]) if temp_in_f else pd.to_numeric(df["tmean_raw"], errors="coerce")
    if "tdmean_raw" in df.columns:
        df["tdmean_c"] = f_to_c(df["tdmean_raw"]) if temp_in_f else pd.to_numeric(df["tdmean_raw"], errors="coerce")

    vpdmin = pd.to_numeric(df["vpdmin_raw"], errors="coerce") if "vpdmin_raw" in df.columns else np.nan
    vpdmax = pd.to_numeric(df["vpdmax_raw"], errors="coerce") if "vpdmax_raw" in df.columns else np.nan
    if isinstance(vpdmin, pd.Series) or isinstance(vpdmax, pd.Series):
        if vpd_in_hpa:
            if isinstance(vpdmin, pd.Series): vpdmin = vpdmin * 0.1
            if isinstance(vpdmax, pd.Series): vpdmax = vpdmax * 0.1
        df["vpd_proxy"] = pd.concat([vpdmin, vpdmax], axis=1).mean(axis=1)

    if "prism_tmax_c" in df.columns:
        df["heat_days_35c"] = 30.0 * (1/(1+np.exp(-(df["prism_tmax_c"] - 35.0)/2)))

    if (lon_col is not None) and (lat_col is not None):
        pts = gpd.GeoDataFrame(
            df[["year","month","prism_prcp_mm","prism_tmax_c","prism_tmin_c","vpd_proxy","heat_days_35c", lon_col, lat_col]].copy(),
            geometry=gpd.points_from_xy(pd.to_numeric(df[lon_col], errors="coerce"),
                                        pd.to_numeric(df[lat_col], errors="coerce")),
            crs=4326
        ).dropna(subset=["geometry"])
        pts = pts[pts.geometry.within(MN_COUNTIES.unary_union)]
        joined = gpd.sjoin(pts, MN_COUNTIES[["fips","geometry"]], how="inner", predicate="within")
        prism_df_from_csv = joined.groupby(["fips","year","month"], as_index=False).agg({
            "prism_prcp_mm":"mean",
            "prism_tmax_c":"mean",
            "prism_tmin_c":"mean",
            "vpd_proxy":"mean",
            "heat_days_35c":"mean"
        })
    elif "fips" in df.columns:
        temp = df.rename(columns={"fips":"fips_raw"})
        temp["fips"] = temp["fips_raw"].astype(str).str.zfill(5)
        prism_df_from_csv = temp.groupby(["fips","year","month"], as_index=False).agg({
            "prism_prcp_mm":"mean",
            "prism_tmax_c":"mean",
            "prism_tmin_c":"mean",
            "vpd_proxy":"mean",
            "heat_days_35c":"mean"
        })
    else:
        base = df.groupby(["year","month"], as_index=False).agg({
            "prism_prcp_mm":"mean",
            "prism_tmax_c":"mean",
            "prism_tmin_c":"mean",
            "vpd_proxy":"mean",
            "heat_days_35c":"mean"
        })
        cnt = MN_COUNTIES[["fips"]].copy()
        cnt["key"]=1; base["key"]=1
        prism_df_from_csv = cnt.merge(base, on="key").drop(columns=["key"])

    need_cols = ["fips","year","month","prism_prcp_mm","prism_tmax_c","prism_tmin_c","vpd_proxy","heat_days_35c"]
    for c in need_cols:
        if c not in prism_df_from_csv.columns:
            prism_df_from_csv[c] = np.nan
    prism_df_from_csv = prism_df_from_csv[need_cols].sort_values(["fips","year","month"]).reset_index(drop=True)
    return prism_df_from_csv

# Build from uploaded CSV and set as prism_df
prism_df = load_prism_csv(PRISM_CSV_PATH)
print("Loaded PRISM CSV ‚Üí prism_df shape:", prism_df.shape)
display(prism_df.head())


  dt1 = pd.to_datetime(s, errors="coerce", infer_datetime_format=True)
  pts = pts[pts.geometry.within(MN_COUNTIES.unary_union)]


Loaded PRISM CSV ‚Üí prism_df shape: (15312, 8)


Unnamed: 0,fips,year,month,prism_prcp_mm,prism_tmax_c,prism_tmin_c,vpd_proxy,heat_days_35c
0,27001,2010,1,18.542,-7.222222,-17.611111,0.0635,2.035552e-08
1,27001,2010,2,5.334,-2.611111,-17.0,0.1535,2.041607e-07
2,27001,2010,3,24.384,8.388889,-3.833333,0.318,4.99564e-05
3,27001,2010,4,23.622,16.333333,0.888889,0.6725,0.002652575
4,27001,2010,5,78.232,19.388889,5.555556,0.773,0.01221897


In [57]:
prism_df.to_csv('prism_.csv', index=False) 

In [34]:

# Export Minnesota county centroids for PRISM Bulk (‚â§12-char names)
import pandas as pd
import os

OUT_DIR = "manual_inputs/prism"
os.makedirs(OUT_DIR, exist_ok=True)

# MN_COUNTIES is already built earlier in the notebook
mn = MN_COUNTIES.copy()
mn["centroid"] = mn.geometry.centroid
def trim12(s): 
    return (s.replace(" ", "")[:12]) if isinstance(s,str) else ""

points = pd.DataFrame({
    "latitude": mn["centroid"].y.round(6),
    "longitude": mn["centroid"].x.round(6),
    "name": mn["NAME"].apply(trim12)  # optional, 12 chars max
})
centroids_csv = f"{OUT_DIR}/mn_prism_bulk_points.csv"
points.to_csv(centroids_csv, index=False)
print("Wrote:", centroids_csv)
points.head()


Wrote: manual_inputs/prism/mn_prism_bulk_points.csv



  mn["centroid"] = mn.geometry.centroid


Unnamed: 0,latitude,longitude,name
0,45.938046,-93.630095,MilleLacs
1,43.674005,-92.090176,Fillmore
2,44.995487,-96.173478,LacquiParle
3,44.823559,-94.272436,McLeod
4,46.070627,-94.897603,Todd



## GLDAS (Noah 0.25¬∞, monthly)

We fetch monthly GLDAS variables and aggregate to counties:
- `gldas_soil_moisture_0_10cm`, `gldas_et_mm`, `gldas_rn_wm2`

> ‚ö†Ô∏è Requires **NASA Earthdata** login via `.netrc` or session cookies. See: https://disc.gsfc.nasa.gov/earthdata-login



## U.S. Drought Monitor (USDM)

We compute:
- `usdm_pct_D2plus`: percent of county area in D2‚ÄìD4 during the month
- `usdm_days_D2plus`: number of **days** in D2+ within the month

We download weekly shapefiles and aggregate by county and by day-count per month.


In [38]:

# --- Robust USDM builder (fills zeros if downloads are missing) ---
import datetime as dt, os, zipfile, requests, pandas as pd, geopandas as gpd, numpy as np

USDM_DIR  = USDM_DIR if "USDM_DIR" in globals() else "data_cache/usdm"
os.makedirs(USDM_DIR, exist_ok=True)

def _usdm_zip_candidates(d: dt.date):
    # weekly product usually Thursday; filenames use YYYYMMDD
    dstr = d.strftime("%Y%m%d")
    # modern and legacy paths
    return [
        f"https://droughtmonitor.unl.edu/data/shapefiles_m/USDM_{dstr}_M.zip",
        f"https://droughtmonitor.unl.edu/data/shapefiles/USDM_{dstr}_M.zip",
    ]

def _nearest_thursdays_in_month(year: int, month: int):
    first = dt.date(year, month, 1)
    last  = (first.replace(day=28) + dt.timedelta(days=4)).replace(day=1) - dt.timedelta(days=1)
    d = first
    thurs = []
    while d <= last:
        # Thursday = 3 (Mon=0)
        if d.weekday() == 3:
            thurs.append(d)
        d += dt.timedelta(days=1)
    # if none (rare), include last day as fallback
    return thurs or [last]

def _fetch_usdm_weekly_zip(d: dt.date):
    for url in _usdm_zip_candidates(d):
        try:
            r = requests.get(url, timeout=90)
            if r.status_code == 200:
                zpath = os.path.join(USDM_DIR, os.path.basename(url))
                with open(zpath, "wb") as f:
                    f.write(r.content)
                # extract once
                with zipfile.ZipFile(zpath) as z:
                    z.extractall(os.path.join(USDM_DIR, os.path.splitext(os.path.basename(zpath))[0]))
                return zpath
        except Exception:
            continue
    return None

def build_usdm_table(start_year=START_YEAR, end_year=END_YEAR):
    rows = []
    for y in tqdm(range(start_year, end_year+1), desc="USDM years"):
        for m in range(1, 13):
            thursdays = _nearest_thursdays_in_month(y, m)
            weekly_polys = []
            for d in thursdays:
                z = _fetch_usdm_weekly_zip(d)
                if z:
                    folder = os.path.join(USDM_DIR, os.path.splitext(os.path.basename(z))[0])
                    shp = None
                    for f in os.listdir(folder):
                        if f.lower().endswith(".shp"):
                            shp = os.path.join(folder, f)
                            break
                    if shp:
                        g = gpd.read_file(shp).to_crs(4326)
                        # Level column sometimes named DM (D0..D4)
                        lvl = g.get("DM", None)
                        if lvl is None:
                            continue
                        g["level"] = g["DM"].map({"D0":0,"D1":1,"D2":2,"D3":3,"D4":4}).fillna(-1)
                        weekly_polys.append(g[["level","geometry"]])

            # If we still found nothing for this month ‚Üí fill zeros so concat never fails
            if not weekly_polys:
                for _, crow in MN_COUNTIES.iterrows():
                    rows.append(pd.DataFrame([{
                        "fips": crow.fips, "year": y, "month": m,
                        "usdm_pct_D2plus": 0.0, "usdm_days_D2plus": 0
                    }]))
                continue

            # Compute area share in D2+; assume weekly maps cover ~7 days
            month_res = []
            for _, crow in MN_COUNTIES.iterrows():
                cgeom = gpd.GeoSeries([crow.geometry], crs=4326)
                total_area = cgeom.to_crs(3857).area.values[0]
                days_D2p = 0
                area_pcts = []
                for wg in weekly_polys:
                    inter = gpd.overlay(gpd.GeoDataFrame(geometry=cgeom), wg, how="intersection")
                    if inter.empty:
                        area_pcts.append(0.0); continue
                    inter = inter[inter["level"] >= 2]
                    if inter.empty:
                        area_pcts.append(0.0); continue
                    inter_area = inter.to_crs(3857).area.sum()
                    pct = float(inter_area / total_area * 100.0)
                    area_pcts.append(pct)
                    if pct > 0:
                        days_D2p += 7
                usdm_pct = float(np.mean(area_pcts)) if area_pcts else 0.0
                month_res.append({
                    "fips": crow.fips, "year": y, "month": m,
                    "usdm_pct_D2plus": usdm_pct,
                    "usdm_days_D2plus": int(days_D2p)
                })
            rows.append(pd.DataFrame(month_res))

    # Even if everything failed, guarantee a zeroed frame (so downstream doesn‚Äôt break)
    if not rows:
        zero = []
        for y in range(start_year, end_year+1):
            for m in range(1,13):
                for _, crow in MN_COUNTIES.iterrows():
                    zero.append({"fips": crow.fips, "year": y, "month": m,
                                 "usdm_pct_D2plus": 0.0, "usdm_days_D2plus": 0})
        return pd.DataFrame(zero)

    return pd.concat(rows, ignore_index=True)

usdm_df = build_usdm_table()
print("USDM built:", usdm_df.shape)
usdm_df.head()



USDM years: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 25/25 [05:43<00:00, 13.74s/it]


USDM built: (26100, 5)


Unnamed: 0,fips,year,month,usdm_pct_D2plus,usdm_days_D2plus
0,27095,2000,1,0.0,0
1,27045,2000,1,0.0,0
2,27073,2000,1,0.0,0
3,27085,2000,1,0.0,0
4,27153,2000,1,0.0,0



## ENSO ‚Äî Oceanic Ni√±o Index (ONI)

We ingest NOAA's ONI table and join monthly with a 1‚Äì3 month lead option.


In [40]:
# --- Robust USDM builder (fills zeros if downloads are missing), CRS-safe & geometry-safe ---
import datetime as dt, os, zipfile, requests, pandas as pd, geopandas as gpd, numpy as np
from tqdm import tqdm

USDM_DIR  = USDM_DIR if "USDM_DIR" in globals() else "data_cache/usdm"
os.makedirs(USDM_DIR, exist_ok=True)

# Make sure MN_COUNTIES exists and is EPSG:4326
assert "MN_COUNTIES" in globals(), "MN_COUNTIES GeoDataFrame must be defined earlier."
if MN_COUNTIES.crs is None or str(MN_COUNTIES.crs).lower() not in ("epsg:4326","4326"):
    MN_COUNTIES = MN_COUNTIES.to_crs(4326)

def _usdm_zip_candidates(d: dt.date):
    dstr = d.strftime("%Y%m%d")
    return [
        f"https://droughtmonitor.unl.edu/data/shapefiles_m/USDM_{dstr}_M.zip",
        f"https://droughtmonitor.unl.edu/data/shapefiles/USDM_{dstr}_M.zip",
    ]

def _nearest_thursdays_in_month(year: int, month: int):
    first = dt.date(year, month, 1)
    last  = (first.replace(day=28) + dt.timedelta(days=4)).replace(day=1) - dt.timedelta(days=1)
    d = first
    thurs = []
    while d <= last:
        if d.weekday() == 3:  # Thursday
            thurs.append(d)
        d += dt.timedelta(days=1)
    return thurs or [last]

def _fetch_usdm_weekly_zip(d: dt.date):
    headers = {"User-Agent": "Mozilla/5.0 (compatible; data-pipeline; +https://example.org)"}
    for url in _usdm_zip_candidates(d):
        try:
            r = requests.get(url, timeout=120, headers=headers)
            if r.status_code == 200 and r.content:
                zpath = os.path.join(USDM_DIR, os.path.basename(url))
                with open(zpath, "wb") as f:
                    f.write(r.content)
                with zipfile.ZipFile(zpath) as z:
                    z.extractall(os.path.join(USDM_DIR, os.path.splitext(os.path.basename(zpath))[0]))
                return zpath
        except Exception:
            continue
    return None

def _read_usdm_shp_from_zip(zpath: str) -> gpd.GeoDataFrame | None:
    folder = os.path.join(USDM_DIR, os.path.splitext(os.path.basename(zpath))[0])
    shp = None
    for f in os.listdir(folder):
        if f.lower().endswith(".shp"):
            shp = os.path.join(folder, f); break
    if not shp: return None
    g = gpd.read_file(shp)
    if g.crs is None:
        g = g.set_crs(4326, allow_override=True)
    else:
        g = g.to_crs(4326)
    # Normalize level column; some files use 'DM'
    if "DM" not in g.columns:
        return None
    g["level"] = g["DM"].map({"D0":0,"D1":1,"D2":2,"D3":3,"D4":4}).fillna(-1).astype(int)
    # Fix invalid geometries that break overlay
    g["geometry"] = g.geometry.buffer(0)
    # Keep only D2+ polygons; small speedup
    g = g[g["level"] >= 2][["level","geometry"]]
    if g.empty:
        return g
    # Clip to MN bounding box for speed
    g = g.clip(MN_COUNTIES.unary_union)
    return g

def build_usdm_table(start_year=START_YEAR, end_year=END_YEAR):
    rows = []
    # Precompute county areas in an equal-area projection
    counties_aea = MN_COUNTIES.to_crs(5070)  # NAD83 / Conus Albers
    county_area = counties_aea.area.values
    county_ids  = MN_COUNTIES["fips"].values

    for y in tqdm(range(start_year, end_year+1), desc="USDM years"):
        for m in range(1, 13):
            weekly_polys = []
            for d in _nearest_thursdays_in_month(y, m):
                z = _fetch_usdm_weekly_zip(d)
                if not z: continue
                g = _read_usdm_shp_from_zip(z)
                if g is not None and not g.empty:
                    weekly_polys.append(g)

            # If nothing found ‚Üí fill zeros, keep pipeline moving
            if not weekly_polys:
                rows.append(pd.DataFrame({
                    "fips": county_ids,
                    "year": y,
                    "month": m,
                    "usdm_pct_D2plus": np.zeros(len(county_ids), dtype=float),
                    "usdm_days_D2plus": np.zeros(len(county_ids), dtype=int)
                }))
                continue

            # Compute area share per week vectorized via overlay with all counties at once
            month_pct_list = []
            days_D2p = np.zeros(len(county_ids), dtype=int)
            for wg in weekly_polys:
                try:
                    inter = gpd.overlay(MN_COUNTIES[["fips","geometry"]], wg, how="intersection")
                except Exception:
                    # Fallback: repair and retry
                    MN_fixed = MN_COUNTIES.copy()
                    MN_fixed["geometry"] = MN_fixed.geometry.buffer(0)
                    wg_fixed = wg.copy()
                    wg_fixed["geometry"] = wg_fixed.geometry.buffer(0)
                    inter = gpd.overlay(MN_fixed[["fips","geometry"]], wg_fixed, how="intersection")

                if inter.empty:
                    month_pct_list.append(np.zeros(len(county_ids), dtype=float))
                    continue

                inter_area = inter.to_crs(5070).area.groupby(inter["fips"]).sum()
                # Map to county order
                pct = np.zeros(len(county_ids), dtype=float)
                for i, f in enumerate(county_ids):
                    if f in inter_area.index:
                        pct[i] = float(inter_area.loc[f] / county_area[i] * 100.0)
                month_pct_list.append(pct)
                days_D2p += (pct > 0).astype(int) * 7  # ~7 days per map

            # Average weekly percentages ‚Üí monthly percent area
            if month_pct_list:
                pct_month = np.vstack(month_pct_list).mean(axis=0)
            else:
                pct_month = np.zeros(len(county_ids), dtype=float)

            rows.append(pd.DataFrame({
                "fips": county_ids,
                "year": y,
                "month": m,
                "usdm_pct_D2plus": pct_month,
                "usdm_days_D2plus": days_D2p
            }))

    return pd.concat(rows, ignore_index=True)

usdm_df = build_usdm_table()
print("USDM built:", usdm_df.shape)
display(usdm_df.head())



USDM years: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 25/25 [05:41<00:00, 13.65s/it]

USDM built: (26100, 5)





Unnamed: 0,fips,year,month,usdm_pct_D2plus,usdm_days_D2plus
0,27095,2000,1,0.0,0
1,27045,2000,1,0.0,0
2,27073,2000,1,0.0,0
3,27085,2000,1,0.0,0
4,27153,2000,1,0.0,0



## Cropland Data Layer (CDL) ‚Äî Corn Fraction

We compute the fraction of each county planted to **corn** using CDL (2008‚Äìpresent).  
This uses the public AWS endpoint maintained by USDA/NASS (GeoTIFF per year/state or CONUS).

In [56]:
usdm_df.to_csv('OceanicNinoIndex.csv', index=False) 

In [41]:

def fetch_cdl_year(year: int) -> Path:
    # Use CONUS GeoTIFF (might be large). Alternatively, pull per-state tiles if available.
    url = f"https://www.nass.usda.gov/Research_and_Science/Cropland/Release/datasets/CDL_{year}_v6.tif"
    out = Path(CDL_DIR)/f"CDL_{year}.tif"
    if out.exists():
        return out
    r = requests.get(url, timeout=300)
    if r.status_code == 404:
        # older naming
        url = f"https://www.nass.usda.gov/Research_and_Science/Cropland/Release/datasets/USDA_CDL_{year}.tif"
        r = requests.get(url, timeout=300)
    r.raise_for_status()
    with open(out, "wb") as f:
        f.write(r.content)
    return out

CORN_CODES = {1, 3}  # 1=Corn (yellow), 3=Sweet Corn (check legend by year)

def county_corn_fraction_from_cdl(tif_path: Path, counties: gpd.GeoDataFrame) -> pd.DataFrame:
    with rasterio.open(tif_path) as src:
        res = []
        for _, row in counties.iterrows():
            geom = [row.geometry.__geo_interface__]
            try:
                out_image, out_transform = mask(src, geom, crop=True)
            except Exception:
                res.append({"fips": row.fips, "corn_frac": np.nan})
                continue
            data = out_image[0]
            if data.size == 0:
                res.append({"fips": row.fips, "corn_frac": np.nan})
                continue
            valid = data > 0
            if valid.sum() == 0:
                res.append({"fips": row.fips, "corn_frac": 0.0})
                continue
            corn = np.isin(data, list(CORN_CODES))
            frac = float(corn.sum()/valid.sum())
            res.append({"fips": row.fips, "corn_frac": frac})
        return pd.DataFrame(res)

def build_cdl_table(start_year=2008, end_year=END_YEAR):
    rows = []
    for y in tqdm(range(start_year, min(end_year, dt.date.today().year)+1), desc="CDL years"):
        tif = fetch_cdl_year(y)
        df = county_corn_fraction_from_cdl(tif, MN_COUNTIES)
        df["year"]=y; df["cdl_corn_fraction"]=df.pop("corn_frac")
        rows.append(df)
    return pd.concat(rows, ignore_index=True)

cdl_df = build_cdl_table()
cdl_df.head()


CDL years:   0%|                                                                                | 0/17 [00:01<?, ?it/s]


HTTPError: 404 Client Error: Not Found for url: https://www.nass.usda.gov/Research_and_Science/Cropland/Release/datasets/USDA_CDL_2008.tif


## gSSURGO ‚Äî Soils (AWC, drainage, OM)

We query the Soil Data Access (SDA) API to compute county-level aggregates for:
- `gssurgo_awc` (available water capacity in top 1 m)
- `gssurgo_drainage_idx` (ordinal drainage class)
- (Optional) `om` (organic matter) as a cross-check

> This uses an SDA SQL query and intersects with county boundaries.


In [None]:

def sda_query(sql: str) -> pd.DataFrame:
    url = "https://sdmdataaccess.sc.egov.usda.gov/tabular/post.rest"
    headers = {"Content-Type":"application/json"}
    payload = {"query": sql}
    r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=180)
    r.raise_for_status()
    js = r.json()
    if "Table" not in js:
        raise RuntimeError(f"SDA response missing 'Table': {js}")
    return pd.DataFrame(js["Table"])

# We'll pull mapunits that intersect Minnesota and compute AWC from component/horizon tables.
SQL = '''
SELECT mu.mukey, mu.musym, mu.muname, areasymbol, areaname, mu.acres
FROM legend lg
JOIN mapunit mu ON mu.lkey = lg.lkey
WHERE areasymbol LIKE 'MN%'
'''
try:
    mu = sda_query(SQL)
    mu.head()
except Exception as e:
    print("SDA query failed; check internet access.", e)

# For brevity, we show structure; full AWC computation typically aggregates from CHORIZON by thickness.



## NDVI (MODIS or Sentinel-2)

Two options:

1) **MODIS MOD13A2 (16-day, 1 km)** via AppEEARS API (recommended for long record).  
2) **Sentinel-2** via STAC search (10 m) for recent years.

We then compute: `ndvi_mean`, `ndvi_peak`, `ndvi_peak_doy`, `ndvi_anom` per month/county.


In [44]:
# ==================== NDVI (MODIS via AppEEARS) ====================
# Two paths:
# 1) Live: set APPEEARS_TOKEN in env (or in-notebook) and set USE_APPEEARS_LIVE=True.
# 2) Offline: place a ZIP from AppEEARS at /mnt/data/appeears_ndvi.zip (CSV bundle) and leave USE_APPEEARS_LIVE=False.

import os, io, zipfile, time, json, requests, pandas as pd, numpy as np, geopandas as gpd
from pathlib import Path
from tqdm import tqdm

APPEEARS_BASE = "https://appeears.earthdatacloud.nasa.gov/api"
USE_APPEEARS_LIVE = False  # <- flip to True if you set APPEEARS_TOKEN

# OPTIONAL: set token here instead of environment variable
# APPEEARS_TOKEN = "paste-your-token"

def _appeears_headers():
    tok = os.environ.get("APPEEARS_TOKEN", "").strip()
    if not tok and "APPEEARS_TOKEN" in globals():
        tok = str(APPEEARS_TOKEN).strip()
    if not tok:
        raise RuntimeError("Set APPEEARS_TOKEN to use live NDVI download.")
    return {"Authorization": f"Bearer {tok}"}

def _appeears_post(path, payload):
    r = requests.post(f"{APPEEARS_BASE}{path}", headers=_appeears_headers(), json=payload, timeout=60)
    r.raise_for_status()
    return r.json()

def _appeears_get(path):
    r = requests.get(f"{APPEEARS_BASE}{path}", headers=_appeears_headers(), timeout=60)
    r.raise_for_status()
    return r.json()

def _appeears_download_file(url, out_path):
    with requests.get(url, headers=_appeears_headers(), stream=True, timeout=300) as r:
        r.raise_for_status()
        with open(out_path, "wb") as f:
            for chunk in r.iter_content(1<<20):
                if chunk: f.write(chunk)

def _mn_counties_geojson():
    g = MN_COUNTIES[["fips","geometry"]].copy()
    g["FID"] = g["fips"]  # AppEEARS echoes FID in CSV ‚Üí we map back to FIPS easily
    return json.loads(g.to_json())

def submit_ndvi_area_task(start_date, end_date, task_name="MN_NDVI_MOD13A2"):
    # MODIS Terra Vegetation Indices 16-Day, 1 km, Collection 6.1
    product = "MOD13A2.061"
    layers  = ["NDVI","SummaryQA","DayOfYear"]
    payload = {
        "task_type": "area",
        "task_name": task_name,
        "params": {
            "dates": [{"startDate": start_date, "endDate": end_date}],
            "layers": [{"layer": lyr, "product": product} for lyr in layers],
            "output": {"format": {"type": "csv"}},
            "geo": _mn_counties_geojson()
        }
    }
    out = _appeears_post("/task", payload)
    return out["task_id"]

def wait_for_task(task_id, poll=20):
    while True:
        st = _appeears_get(f"/task/{task_id}")
        if st.get("status") in ("done","failed"):
            return st
        time.sleep(poll)

def download_task_bundle(task_id, out_zip="/mnt/data/appeears_ndvi.zip"):
    files = _appeears_get(f"/bundle/{task_id}")
    # Prefer the CSV bundle
    for f in files.get("files", []):
        if f.get("file_type","").lower() == "csv":
            _appeears_download_file(f["url"], out_zip)
            return out_zip
    # fallback: download first file
    if files.get("files"):
        _appeears_download_file(files["files"][0]["url"], out_zip)
        return out_zip
    raise RuntimeError("No downloadable file found in AppEEARS bundle.")

def parse_appeears_csv_zip(zip_path="/mnt/data/appeears_ndvi.zip"):
    # AppEEARS CSV has rows per feature (FID) per composite date
    with zipfile.ZipFile(zip_path) as z:
        cand = [n for n in z.namelist() if n.lower().endswith(".csv")]
        if not cand:
            raise FileNotFoundError("No CSV in AppEEARS ZIP.")
        with z.open(cand[0]) as f:
            df = pd.read_csv(f)

    # Normalize columns
    df.columns = [c.strip().lower() for c in df.columns]
    if "fid" not in df.columns:
        if "feature_id" in df.columns:
            df["fid"] = df["feature_id"]
        else:
            raise ValueError("AppEEARS CSV missing FID/feature_id column.")
    date_col = "date" if "date" in df.columns else ("acq_date" if "acq_date" in df.columns else None)
    if not date_col:
        raise ValueError("AppEEARS CSV missing date/acq_date column.")
    df["date"] = pd.to_datetime(df[date_col])
    df["year"] = df["date"].dt.year
    df["month"] = df["date"].dt.month

    # Variables: NDVI scaled 0.0001, mask with SummaryQA == 0 (best)
    ndvi_col = next((c for c in df.columns if "ndvi" in c), None)
    qa_col   = next((c for c in df.columns if "summaryqa" in c or c.endswith("_qa")), None)
    doy_col  = next((c for c in df.columns if "dayofyear" in c or c.endswith("_doy")), None)

    if ndvi_col is None:
        raise ValueError("Could not find NDVI column in AppEEARS CSV.")
    df["ndvi"] = pd.to_numeric(df[ndvi_col], errors="coerce") * 0.0001
    if qa_col and qa_col in df.columns:
        qa = pd.to_numeric(df[qa_col], errors="coerce")
        df.loc[qa != 0, "ndvi"] = np.nan

    df["doy"] = pd.to_numeric(df[doy_col], errors="coerce") if doy_col else df["date"].dt.dayofyear
    df["fips"] = df["fid"].astype(str).str.zfill(5)

    # Monthly by county
    g = df.groupby(["fips","year","month"], as_index=False)
    ndvi_mean = g["ndvi"].mean().rename(columns={"ndvi":"ndvi_mean"})
    # Peak within month
    peak_idx = df.groupby(["fips","year","month"])["ndvi"].idxmax()
    peaks = df.loc[peak_idx, ["fips","year","month","ndvi","doy"]].rename(columns={"ndvi":"ndvi_peak","doy":"ndvi_peak_doy"})
    out = ndvi_mean.merge(peaks, on=["fips","year","month"], how="left")

    # 2001‚Äì2020 climatology per county√ómonth for anomalies
    clim = out[(out["year"]>=2001)&(out["year"]<=2020)].groupby(["fips","month"])["ndvi_mean"].mean().rename("ndvi_mean_clim").reset_index()
    out = out.merge(clim, on=["fips","month"], how="left")
    out["ndvi_anom"] = out["ndvi_mean"] - out["ndvi_mean_clim"]
    out = out.drop(columns=["ndvi_mean_clim"])
    return out[["fips","year","month","ndvi_mean","ndvi_peak","ndvi_peak_doy","ndvi_anom"]]

def build_ndvi_modis_appeears(start_year=START_YEAR, end_year=END_YEAR):
    start_date = f"{start_year:04d}-01-01"; end_date = f"{end_year:04d}-12-31"
    tid = submit_ndvi_area_task(start_date, end_date)
    print("Submitted AppEEARS NDVI task:", tid)
    status = wait_for_task(tid, poll=20)
    if status.get("status") != "done":
        raise RuntimeError(f"AppEEARS task failed: {status}")
    out_zip = download_task_bundle(tid, out_zip="/mnt/data/appeears_ndvi.zip")
    print("Downloaded NDVI bundle to", out_zip)
    return parse_appeears_csv_zip(out_zip)

def ingest_appeears_ndvi_zip(zip_path="/mnt/data/appeears_ndvi.zip"):
    return parse_appeears_csv_zip(zip_path)

# ==== Run it ====
if USE_APPEEARS_LIVE:
    ndvi_df = build_ndvi_modis_appeears(START_YEAR, END_YEAR)
else:
    if Path("/mnt/data/appeears_ndvi.zip").exists():
        ndvi_df = ingest_appeears_ndvi_zip("/mnt/data/appeears_ndvi.zip")
    else:
        print("No AppEEARS ZIP found and live mode disabled; keeping placeholder NDVI.")
        ndvi_df = placeholder_ndvi_builder()

print("NDVI built:", ndvi_df.shape)
display(ndvi_df.head())



No AppEEARS ZIP found and live mode disabled; keeping placeholder NDVI.
NDVI built: (26100, 7)


Unnamed: 0,fips,year,month,ndvi_mean,ndvi_peak,ndvi_peak_doy,ndvi_anom
0,27095,2000,1,,,,
1,27045,2000,1,,,,
2,27073,2000,1,,,,
3,27085,2000,1,,,,
4,27153,2000,1,,,,



## Markets & Logistics Proxies

- **Diesel** (EIA weekly) ‚Üí monthly mean around planting/harvest.  
- **Ethanol plants** (EIA list) ‚Üí compute `dist_km_ethanol` from county centroid to nearest plant.  
- **Basis** (USDA AMS bids; optional) ‚Üí join nearest terminal to county; use May‚ÄìJune mean.

> We include diesel and ethanol distance here. Basis collection varies by terminal and may require custom scrapers.


In [45]:

from math import radians, sin, cos, asin, sqrt

def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0
    dlat = radians(lat2-lat1); dlon = radians(lon2-lon1)
    a = sin(dlat/2)**2 + cos(radians(lat1))*cos(radians(lat2))*sin(dlon/2)**2
    return 2*R*asin(sqrt(a))

def fetch_eia_ethanol_plants() -> pd.DataFrame:
    # Example CSV (public snapshot). Replace with EIA authoritative list if available.
    # For now, we provide a small MN-centric list as a starter.
    plants = [
        {"name":"Guardian Energy - Janesville, MN","lat":44.12,"lon":-93.71},
        {"name":"POET - Preston, MN","lat":43.67,"lon":-92.09},
        {"name":"Green Plains - Fairmont, MN","lat":43.63,"lon":-94.46},
        {"name":"Bushmills Ethanol - Atwater, MN","lat":45.13,"lon":-94.79},
        {"name":"POET - Glenville, MN","lat":43.56,"lon":-93.34},
    ]
    return pd.DataFrame(plants)

def compute_ethanol_distance():
    plants = fetch_eia_ethanol_plants()
    rows = []
    centroids = MN_COUNTIES.copy()
    centroids["centroid"] = centroids.geometry.centroid
    for _, r in centroids.iterrows():
        clat = r.centroid.y; clon = r.centroid.x
        dmin = 1e9; which = None
        for _, p in plants.iterrows():
            d = haversine_km(clat, clon, p.lat, p.lon)
            if d < dmin:
                dmin = d; which = p["name"]
        rows.append({"fips": r.fips, "dist_km_ethanol": dmin, "nearest_ethanol": which})
    return pd.DataFrame(rows)

ethanol_df = compute_ethanol_distance()
ethanol_df.head()




  centroids["centroid"] = centroids.geometry.centroid


Unnamed: 0,fips,dist_km_ethanol,nearest_ethanol
0,27095,127.415615,"Bushmills Ethanol - Atwater, MN"
1,27045,0.445526,"POET - Preston, MN"
2,27073,109.682327,"Bushmills Ethanol - Atwater, MN"
3,27085,53.088949,"Bushmills Ethanol - Atwater, MN"
4,27153,104.927353,"Bushmills Ethanol - Atwater, MN"


In [46]:

def fetch_eia_diesel_weekly() -> pd.DataFrame:
    # EIA weekly U.S. No. 2 Diesel Retail Prices (Dollars per Gallon) ‚Äî national series
    # CSV download:
    url = "https://www.eia.gov/dnav/pet/hist_xls/RWTCd.htm"  # Placeholder; consider EIA API for JSON with API key.
    # For illustration, we will create a stub series (you should replace with EIA API call).
    dates = pd.date_range("1994-01-03","2025-10-20", freq="W-MON")
    vals = np.clip(2.5 + 0.5*np.sin(np.linspace(0, 50, len(dates))), 1.5, 6.0)
    return pd.DataFrame({"date":dates, "diesel_usd_gal": vals})

def diesel_monthly_mean(diesel_df: pd.DataFrame) -> pd.DataFrame:
    diesel_df["year"] = diesel_df["date"].dt.year
    diesel_df["month"] = diesel_df["date"].dt.month
    out = diesel_df.groupby(["year","month"]).agg(diesel_usd_gal=("diesel_usd_gal","mean")).reset_index()
    return out

diesel_df = diesel_monthly_mean(fetch_eia_diesel_weekly())
diesel_df.head()


Unnamed: 0,year,month,diesel_usd_gal
0,1994,1,2.530093
1,1994,2,2.59727
2,1994,3,2.655512
3,1994,4,2.711496
4,1994,5,2.770678


In [55]:
diesel_df.to_csv('diesel_dist.csv', index=False) 
ethanol_df.to_csv('ethanol_dist.csv', index=False) 


## Feature Engineering & Merge to Minimal Schema

We compute:
- `heat_days_35c` (approximate from PRISM monthly Tmax via exceedance proxy)
- `vpd_proxy` (simple Clausius‚ÄìClapeyron-based monthly proxy)
- `moisture_deficit = prism_prcp_mm - gldas_et_mm`
- Label horizons and keep only rows up to each horizon month per year.


In [59]:
# ================= Corn price features (no leakage) =================
# Dec corn futures (contract for year t) via yfinance + MN cash price via USDA QuickStats
# Outputs per-year features:
#   corn_fut_dec_preplant_avg, corn_price_volatility_preplant,
#   mn_cash_preplant_avg, corn_basis_state_preplant, mn_cash_harvest_prior

import os, pandas as pd, numpy as np, requests
from datetime import datetime as _dt

# ---- Config ----
USE_YFINANCE = True  # set False to skip futures if yfinance isn't installed
PREPLANT_MONTHS = [1,2,3,4]   # Jan‚ÄìApr
HARVEST_MONTHS  = [9,10,11]   # Sep‚ÄìNov (prior year, attributed to year t)
QS_BASE = "https://quickstats.nass.usda.gov/api/api_GET/"
QS_KEY  = "606153B9-DDAD-30FB-88EE-5BCC60DEE678"

# Try import yfinance (only needed for futures)
try:
    import yfinance as yf
except Exception:
    yf = None
    USE_YFINANCE = False

def _dec_contract_symbols(year: int):
    """Candidate Yahoo symbols for December corn contract of 'year' (ZC Dec)."""
    yy = year % 100
    return [f"ZCZ{yy}.CBT", f"ZCZ{yy}", "ZC=F"]  # last is continuous fallback

def _fetch_dec_futures_preplant(year: int) -> pd.Series:
    """Jan‚ÄìApr daily settles for December contract of 'year' as a Series."""
    if not USE_YFINANCE or yf is None:
        return pd.Series(dtype=float)
    start = f"{year}-01-01"; end = f"{year}-04-30"
    for sym in _dec_contract_symbols(year):
        try:
            df = yf.download(sym, start=start, end=end, progress=False, auto_adjust=False, actions=False)
            if isinstance(df, pd.DataFrame) and not df.empty:
                px = df.get("Adj Close", df.get("Close"))
                px = pd.to_numeric(px, errors="coerce").dropna()
                if not px.empty:
                    return px
        except Exception:
            continue
    return pd.Series(dtype=float)

def quickstats_query(params: dict) -> pd.DataFrame:
    """USDA NASS QuickStats fetch ‚Üí DataFrame with lowercase columns and numeric 'value'."""
    if not QS_KEY:
        return pd.DataFrame()
    p = {"key": QS_KEY, **params, "format": "JSON"}
    r = requests.get(QS_BASE, params=p, timeout=120)
    r.raise_for_status()
    data = r.json()
    rows = data.get("data", [])
    df = pd.DataFrame(rows)
    if df.empty:
        return df
    df.columns = [c.lower() for c in df.columns]
    if "value" in df.columns:
        df["value"] = (
            df["value"].astype(str).str.replace(",", "", regex=False)
            .replace({"(d)": np.nan, "(z)": 0}, regex=True)
        )
        df["value"] = pd.to_numeric(df["value"], errors="coerce")
    return df

def mn_cash_prices_monthly(start_year: int, end_year: int) -> pd.DataFrame:
    """MN cash price received ($/bu), monthly."""
    params = {
        "source_desc": "SURVEY",
        "sector_desc": "CROPS",
        "group_desc": "FIELD CROPS",
        "commodity_desc": "CORN",
        "statisticcat_desc": "PRICE RECEIVED",
        "unit_desc": "$ / BU",
        "agg_level_desc": "STATE",
        "state_alpha": "MN",
        "reference_period_desc": "MONTH",
        "year__GE": start_year,
        "year__LE": end_year,
    }
    df = quickstats_query(params)
    if df.empty:
        return pd.DataFrame(columns=["year","month","mn_cash_usd_bu"])
    # Parse month from 'time_period' or 'period'
    mon_map = {m.upper(): i for i, m in enumerate(["","Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"])}
    if "time_period" in df.columns:
        mtxt = df["time_period"].astype(str).str[:3].str.upper()
    elif "period" in df.columns:
        mtxt = df["period"].astype(str).str[:3].str.upper()
    else:
        mtxt = np.nan
    df["month"] = mtxt.map(mon_map)
    out = df[["year","month","value"]].dropna()
    out["year"] = pd.to_numeric(out["year"], errors="coerce").astype(int)
    out["month"] = pd.to_numeric(out["month"], errors="coerce").astype(int)
    out = out.rename(columns={"value":"mn_cash_usd_bu"})
    return out

def build_corn_price_features(start_year: int, end_year: int) -> pd.DataFrame:
    # Futures: Jan‚ÄìApr stats for Dec contract of year t
    fut_rows = []
    for y in range(start_year, end_year+1):
        px = _fetch_dec_futures_preplant(y)
        fut_rows.append({
            "year": y,
            "corn_fut_dec_preplant_avg": float(px.mean()) if not px.empty else np.nan,
            "corn_price_volatility_preplant": float(px.std(ddof=0)) if not px.empty else np.nan
        })
    fut_df = pd.DataFrame(fut_rows)

    # MN cash price monthly (include prior year for harvest window)
    cash = mn_cash_prices_monthly(start_year-1, end_year)
    preplant = cash[cash["month"].isin(PREPLANT_MONTHS)].groupby("year")["mn_cash_usd_bu"] \
               .mean().rename("mn_cash_preplant_avg").reset_index()

    # Prior-year harvest (Sep‚ÄìNov of y-1 attributed to year y)
    harvest = cash[cash["month"].isin(HARVEST_MONTHS)].copy()
    harvest["year"] = harvest["year"] + 1
    harvest_prior = harvest.groupby("year")["mn_cash_usd_bu"] \
                     .mean().rename("mn_cash_harvest_prior").reset_index()

    out = fut_df.merge(preplant, on="year", how="left").merge(harvest_prior, on="year", how="left")
    out["corn_basis_state_preplant"] = out["mn_cash_preplant_avg"] - out["corn_fut_dec_preplant_avg"]
    # Convenience month for merging; you can also drop 'month' and merge on year only
    out["month"] = 4
    cols = ["year","month","corn_fut_dec_preplant_avg","corn_price_volatility_preplant",
            "mn_cash_preplant_avg","corn_basis_state_preplant","mn_cash_harvest_prior"]
    return out[cols].sort_values("year")

# Build
try:
    corn_price_df = build_corn_price_features(START_YEAR, END_YEAR)
except NameError:
    # Fallback if START_YEAR/END_YEAR not defined in the notebook yet
    corn_price_df = build_corn_price_features(2000, _dt.now().year)

print("corn_price_df:", corn_price_df.shape)
display(corn_price_df.head())


HTTPError: 400 Client Error: Bad Request for url: https://quickstats.nass.usda.gov/api/api_GET/?key=606153B9-DDAD-30FB-88EE-5BCC60DEE678&source_desc=SURVEY&sector_desc=CROPS&group_desc=FIELD+CROPS&commodity_desc=CORN&statisticcat_desc=PRICE+RECEIVED&unit_desc=%24+%2F+BU&agg_level_desc=STATE&state_alpha=MN&reference_period_desc=MONTH&year__GE=1999&year__LE=2024&format=JSON