# USDA Data Extraction

This notebook extracts public data from USDA APIs (NASS and AMS) for egg market analysis.

**Data Sources:**
- NASS QuickStats API: Feed prices (corn, soybeans), poultry statistics
- AMS MARS API: Wholesale egg prices

**Output Tables:**
- `COST_SB_US_MONTHLY`: Soybean prices ($/metric ton)
- `COST_CORN_US_MONTHLY`: Corn prices ($/metric ton)
- `EGG_PROD_DOZ_MONTHLY`: Egg production (dozen)
- `LAYER_INV_MONTHLY`: Layer inventory (head)
- `LOSS_DTH_RENDER_MONTHLY`: Layer losses (head)
- `WHOLESALE_PRICE_WEEKLY`: Weekly wholesale egg prices by region
- `WHOLESALE_PRICE_MONTHLY`: Monthly wholesale egg prices by region
- `EGG_STAT_MONTHLY`: Combined monthly statistics

In [49]:
# =============================================================================
# Imports and Configuration
# =============================================================================

import os
from pathlib import Path
from functools import reduce
import re

import duckdb
import numpy as np
import pandas as pd
import requests
from dotenv import load_dotenv

# Load API keys
load_dotenv(os.path.expanduser("~/.config/mysecrets/env"))
NASS_KEY = os.environ["NASS_KEY"]
AMS_KEY = os.environ["AMS_KEY"]

# API endpoints
NASS_BASE = "https://quickstats.nass.usda.gov/api"
AMS_BASE = "https://marsapi.ams.usda.gov/services/v1.2"

In [50]:
# =============================================================================
# Constants
# =============================================================================

# Physical constants (document units and sources)
LB_PER_BUSHEL_SOY = 60.0                    # USDA standard weight
LB_PER_METRIC_TON = 2_204.622_621_85        # Exact SI definition
BUSHELS_PER_METRIC_TON_SOY = LB_PER_METRIC_TON / LB_PER_BUSHEL_SOY  # ≈ 36.744

LB_PER_BUSHEL_CORN = 56.0
BUSHELS_PER_METRIC_TON_CORN = LB_PER_METRIC_TON / LB_PER_BUSHEL_CORN  # ≈ 39.369

# Month mappings for NASS API responses
MONTH_MAP: dict[str, int] = {
    "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4,  "MAY": 5,  "JUN": 6,
    "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12,
}

MONTH_MAP_FIRST_OF: dict[str, int] = {
    "FIRST OF JAN": 1, "FIRST OF FEB": 2, "FIRST OF MAR": 3, "FIRST OF APR": 4,
    "FIRST OF MAY": 5, "FIRST OF JUN": 6, "FIRST OF JUL": 7, "FIRST OF AUG": 8,
    "FIRST OF SEP": 9, "FIRST OF OCT": 10, "FIRST OF NOV": 11, "FIRST OF DEC": 12,
}

# File paths
DATA_FOLDER = '/home/akimovh/rockets_feathers/eggs/data'  # Update as needed
TXT_DATA_DIR = Path(DATA_FOLDER) / "2848_txt"
USDA_OUTPUT_DIR = Path(DATA_FOLDER) / "usda"
DUCKDB_PATH = Path(DATA_FOLDER) / "duckdb" / "egg_usda.duckdb"

In [51]:
# =============================================================================
# Database Connection
# =============================================================================

con = duckdb.connect(str(DUCKDB_PATH))
con.execute(f"PRAGMA threads={os.cpu_count()}")
print(f"Connected to: {DUCKDB_PATH}")
print(f"Threads: {os.cpu_count()}")

Connected to: /home/akimovh/rockets_feathers/eggs/data/duckdb/egg_usda.duckdb
Threads: 40


In [4]:
# =============================================================================
# Utility Functions
# =============================================================================

def fetch_nass_data(params: dict, timeout: int = 120) -> pd.DataFrame:
    """Fetch data from NASS QuickStats API."""
    response = requests.get(
        f"{NASS_BASE}/api_GET/",
        params=params,
        timeout=timeout,
    )
    response.raise_for_status()
    
    raw_data = response.json().get("data", [])
    if not raw_data:
        raise ValueError("NASS API returned empty dataset — check query parameters")
    
    return pd.DataFrame(raw_data)


def column_filter_format(df: pd.DataFrame, value_name: str) -> pd.DataFrame:
    """Select and rename standard columns from NASS response."""
    required_cols = {"year", "reference_period_desc", "Value"}
    missing = required_cols - set(df.columns)
    if missing:
        raise KeyError(f"Expected columns missing from API response: {missing}")
    
    return df[["year", "reference_period_desc", "Value"]].rename(columns={
        "reference_period_desc": "month",
        "Value": value_name,
    })


def parse_value(df: pd.DataFrame, value_name: str) -> pd.DataFrame:
    """Parse numeric value column, handling commas and errors."""
    df = df.copy()
    df[value_name] = (
        df[value_name]
        .astype(str)
        .str.replace(",", "", regex=False)
        .pipe(pd.to_numeric, errors="coerce")
    )
    return df


def data_filter_format(df: pd.DataFrame, month_map: dict) -> pd.DataFrame:
    """Convert year/month columns to date and sort."""
    df = df.copy()
    df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64")
    df["month"] = df["month"].str.upper().map(month_map)
    
    unmapped = df["month"].isna()
    if unmapped.any():
        bad_rows = df.loc[unmapped]
        raise ValueError(f"Unmapped month values:\n{bad_rows}")
    
    df["month"] = df["month"].astype("Int64")
    df["date"] = pd.to_datetime(dict(year=df["year"], month=df["month"], day=1))
    df = df.drop(["year", "month"], axis=1)
    
    return df.sort_values("date").reset_index(drop=True)


def validate_dataframe(df: pd.DataFrame, name: str, date_col: str = "date") -> None:
    """Run basic validation checks on extracted dataframe."""
    print(f"\n{'='*60}")
    print(f"Validation: {name}")
    print(f"{'='*60}")
    print(f"Rows: {len(df):,}")
    print(f"Columns: {list(df.columns)}")
    print(f"Date range: {df[date_col].min()} → {df[date_col].max()}")
    print(f"Nulls per column:\n{df.isnull().sum()}")
    print(f"Duplicates on {date_col}: {df.duplicated(subset=[date_col]).sum()}")
    print(f"\nSample (first 3 rows):")
    display(df.head(3))


def save_to_db_and_csv(
    df: pd.DataFrame, 
    table_name: str, 
    csv_filename: str,
    con: duckdb.DuckDBPyConnection
) -> None:
    """Save dataframe to DuckDB table and CSV file."""
    csv_path = USDA_OUTPUT_DIR / csv_filename
    df.to_csv(csv_path, index=False)
    con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM df")
    print(f"Saved: {table_name} → {csv_path}")

---
# NASS Data (Feed Prices & Poultry Statistics)

## Soybean Prices

In [5]:
# Extract soybean prices
params = {
    "key":               NASS_KEY,
    "format":            "JSON",
    "source_desc":       "SURVEY",
    "sector_desc":       "CROPS",
    "group_desc":        "FIELD CROPS",
    "commodity_desc":    "SOYBEANS",
    "statisticcat_desc": "PRICE RECEIVED",
    "agg_level_desc":    "NATIONAL",
    "freq_desc":         "MONTHLY",
    "unit_desc":         "$ / BU",
    "year__GE":          1990,
}

soy_prices = fetch_nass_data(params)

In [6]:
# Format soybean prices
soy_prices = column_filter_format(soy_prices, "cost_sb_us")
soy_prices = parse_value(soy_prices, "cost_sb_us")
soy_prices = data_filter_format(soy_prices, MONTH_MAP)

# Convert $/bushel to $/metric ton
soy_prices["cost_sb_us"] = soy_prices["cost_sb_us"] * BUSHELS_PER_METRIC_TON_SOY

# Validate
validate_dataframe(soy_prices, "Soybean Prices")


Validation: Soybean Prices
Rows: 432
Columns: ['cost_sb_us', 'date']
Date range: 1990-01-01 00:00:00 → 2025-12-01 00:00:00
Nulls per column:
cost_sb_us    0
date          0
dtype: int64
Duplicates on date: 0

Sample (first 3 rows):


Unnamed: 0,cost_sb_us,date
0,207.601964,1990-01-01
1,204.29503,1990-02-01
2,207.601964,1990-03-01


In [12]:
# Save soybean prices
save_to_db_and_csv(soy_prices, "COST_SB_US_MONTHLY", "cost_sb_us_monthly.csv", con)

Saved: COST_SB_US_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/cost_sb_us_monthly.csv


## Corn Prices

In [13]:
# Extract corn prices
params = {
    "key":               NASS_KEY,
    "format":            "JSON",
    "source_desc":       "SURVEY",
    "sector_desc":       "CROPS",
    "group_desc":        "FIELD CROPS",
    "commodity_desc":    "CORN",
    "statisticcat_desc": "PRICE RECEIVED",
    "agg_level_desc":    "NATIONAL",
    "freq_desc":         "MONTHLY",
    "unit_desc":         "$ / BU",
    "year__GE":          1990,
}

corn_prices = fetch_nass_data(params)

In [14]:
# Format corn prices
corn_prices = column_filter_format(corn_prices, "cost_corn_us")
corn_prices = parse_value(corn_prices, "cost_corn_us")
corn_prices = data_filter_format(corn_prices, MONTH_MAP)

# Convert $/bushel to $/metric ton
corn_prices["cost_corn_us"] = corn_prices["cost_corn_us"] * BUSHELS_PER_METRIC_TON_CORN

# Validate
validate_dataframe(corn_prices, "Corn Prices")


Validation: Corn Prices
Rows: 432
Columns: ['cost_corn_us', 'date']
Date range: 1990-01-01 00:00:00 → 2025-12-01 00:00:00
Nulls per column:
cost_corn_us    0
date            0
dtype: int64
Duplicates on date: 0

Sample (first 3 rows):


Unnamed: 0,cost_corn_us,date
0,90.940683,1990-01-01
1,91.334366,1990-02-01
2,93.302779,1990-03-01


In [16]:
# Save corn prices
save_to_db_and_csv(corn_prices, "COST_CORN_US_MONTHLY", "cost_corn_us_monthly.csv", con)

Saved: COST_CORN_US_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/cost_corn_us_monthly.csv


## Egg Production

In [18]:
# Extract egg production data
params = {
    "key":          NASS_KEY,
    "format":       "JSON",
    "source_desc":  "SURVEY",
    "sector_desc":  "ANIMALS & PRODUCTS",
    "group_desc":   "POULTRY",
    "commodity_desc": "EGGS",
    "short_desc":   "EGGS, TABLE - PRODUCTION, MEASURED IN DOZEN",
    "agg_level_desc": "NATIONAL",
    "freq_desc":    "MONTHLY",
    "year__GE":     2000,
}

egg_prod_doz = fetch_nass_data(params)

In [19]:
# Format egg production data
egg_prod_doz = column_filter_format(egg_prod_doz, "egg_prod_doz")
egg_prod_doz = parse_value(egg_prod_doz, "egg_prod_doz")
egg_prod_doz = data_filter_format(egg_prod_doz, MONTH_MAP)

# Validate
validate_dataframe(egg_prod_doz, "Egg Production")


Validation: Egg Production
Rows: 157
Columns: ['egg_prod_doz', 'date']
Date range: 2012-12-01 00:00:00 → 2025-12-01 00:00:00
Nulls per column:
egg_prod_doz    0
date            0
dtype: int64
Duplicates on date: 0

Sample (first 3 rows):


Unnamed: 0,egg_prod_doz,date
0,606807900,2012-12-01
1,599733800,2013-01-01
2,540083400,2013-02-01


In [20]:
# Save egg production
save_to_db_and_csv(egg_prod_doz, "EGG_PROD_DOZ_MONTHLY", "egg_prod_doz_monthly.csv", con)

Saved: EGG_PROD_DOZ_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/egg_prod_doz_monthly.csv


## Layer Inventory

In [21]:
# Extract layer inventory data
params = {
    "key":               NASS_KEY,
    "format":            "JSON",
    "source_desc":       "SURVEY",
    "sector_desc":       "ANIMALS & PRODUCTS",
    "group_desc":        "POULTRY",
    "commodity_desc":    "CHICKENS",
    "statisticcat_desc": "INVENTORY",
    "class_desc":        "LAYERS, TABLE",
    "agg_level_desc":    "NATIONAL",
    "year__GE":          2000,
}

layer_inv = fetch_nass_data(params)

In [22]:
# Format layer inventory data (uses "FIRST OF" month format)
layer_inv = column_filter_format(layer_inv, "layer_inv")
layer_inv = parse_value(layer_inv, "layer_inv")
layer_inv = data_filter_format(layer_inv, MONTH_MAP_FIRST_OF)

# Validate
validate_dataframe(layer_inv, "Layer Inventory")


Validation: Layer Inventory
Rows: 217
Columns: ['layer_inv', 'date']
Date range: 2008-01-01 00:00:00 → 2026-01-01 00:00:00
Nulls per column:
layer_inv    0
date         0
dtype: int64
Duplicates on date: 0

Sample (first 3 rows):


Unnamed: 0,layer_inv,date
0,284729000,2008-01-01
1,282437000,2008-02-01
2,281404000,2008-03-01


In [23]:
# Save layer inventory
save_to_db_and_csv(layer_inv, "LAYER_INV_MONTHLY", "layer_inv_monthly.csv", con)

Saved: LAYER_INV_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/layer_inv_monthly.csv


## Layer Losses (Death & Rendered)

In [24]:
# Extract layer loss data
params = {
    "key":               NASS_KEY,
    "format":            "JSON",
    "source_desc":       "SURVEY",
    "sector_desc":       "ANIMALS & PRODUCTS",
    "group_desc":        "POULTRY",
    "commodity_desc":    "CHICKENS",
    "statisticcat_desc": "LOSS, DEATH & RENDERED",
    "class_desc":        "LAYERS",
    "unit_desc":         "HEAD",
    "agg_level_desc":    "NATIONAL",
    "freq_desc":         "MONTHLY",
    "year__GE":          2000,
}

loss_dth_render = fetch_nass_data(params)

In [25]:
# Format layer loss data
loss_dth_render = column_filter_format(loss_dth_render, "loss_dth_render")
loss_dth_render = parse_value(loss_dth_render, "loss_dth_render")
loss_dth_render = data_filter_format(loss_dth_render, MONTH_MAP)

# Validate
validate_dataframe(loss_dth_render, "Layer Losses")


Validation: Layer Losses
Rows: 217
Columns: ['loss_dth_render', 'date']
Date range: 2007-12-01 00:00:00 → 2025-12-01 00:00:00
Nulls per column:
loss_dth_render    0
date               0
dtype: int64
Duplicates on date: 0

Sample (first 3 rows):


Unnamed: 0,loss_dth_render,date
0,7501000,2007-12-01
1,7422000,2008-01-01
2,7370000,2008-02-01


In [26]:
# Save layer losses
save_to_db_and_csv(loss_dth_render, "LOSS_DTH_RENDER_MONTHLY", "loss_dth_render_monthly.csv", con)

Saved: LOSS_DTH_RENDER_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/loss_dth_render_monthly.csv


---
# AMS Data (Wholesale Egg Prices)

## Historical Data (TXT files from MMN archive)

In [27]:
# Parse historical TXT files

HEADER_RE = re.compile(
    r"^Washington,\s*DC\s+(?P<date>(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)\.\s+"
    r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},\s+\d{4})\s+"
    r"USDA\s+Market\s+News\s*$"
)


def parse_one_file(fp: Path) -> pd.DataFrame:
    """Parse a single USDA Market News TXT file."""
    text = fp.read_text(encoding="latin-1", errors="replace")
    lines = text.splitlines()

    # Extract date from header line
    date_str = next(
        HEADER_RE.match(line.strip()).group("date")
        for line in lines if HEADER_RE.match(line.strip())
    )
    date = pd.to_datetime(date_str, format="%a. %b %d, %Y").normalize()

    # Find table start (line after "REGIONS ...")
    start = next(i for i, line in enumerate(lines) if line.strip().startswith("REGIONS")) + 1

    # Collect data rows until footer
    rows = []
    for line in lines[start:]:
        s = line.strip()
        if not s:
            continue
        if s.startswith(("Computed", "Source:", "Prepared:")):
            break

        parts = s.split()
        if len(parts) < 4:
            continue

        region = " ".join(parts[:-3])
        ex_large, large, medium = map(float, parts[-3:])
        rows.append({
            "date": date,
            "region": region,
            "ex_large": ex_large,
            "large": large,
            "medium": medium,
        })

    return pd.DataFrame(rows)


# Parse all TXT files
files = sorted([p for p in TXT_DATA_DIR.iterdir() if p.is_file() and p.suffix.lower() == ".txt"])

dfs = []
errors = []

for fp in files:
    try:
        dfs.append(parse_one_file(fp))
    except Exception as e:
        errors.append((fp.name, str(e)))

txt_df = pd.concat(dfs, ignore_index=True).sort_values(["date", "region"]).reset_index(drop=True)

print(f"Parsed {len(files)} files")
print(f"Total rows: {len(txt_df):,}")
print(f"Files with errors: {len(errors)}")
if errors:
    display(pd.DataFrame(errors, columns=["file", "error"]).head(20))

Parsed 378 files
Total rows: 1,890
Files with errors: 0


In [28]:
# Normalize dates to week-ending Friday
# Some reports are published on different days due to holidays

txt_df["date"] = pd.to_datetime(txt_df["date"]).dt.normalize()
fri = pd.offsets.Week(weekday=4)  # Friday


def week_end_friday(d: pd.Timestamp) -> pd.Timestamp:
    """Map date to nearest Friday (week-ending date)."""
    prev_fri = fri.rollback(d)
    next_fri = fri.rollforward(d)
    return prev_fri if (d - prev_fri) <= (next_fri - d) else next_fri


txt_df["week_fri"] = txt_df["date"].apply(week_end_friday)

# Validation checks
dup = txt_df.duplicated(["week_fri", "region"]).sum()
print(f"Duplicate (week_fri, region) rows: {dup}")

dates = pd.DatetimeIndex(sorted(txt_df["week_fri"].unique()))
expected = pd.date_range(dates.min(), dates.max(), freq="W-FRI")

missing = expected.difference(dates)
extra = dates.difference(expected)

print(f"Date range: {dates.min().date()} → {dates.max().date()}")
print(f"Unique weeks: {len(dates)}")
print(f"Expected weeks: {len(expected)}")
print(f"Missing weeks: {len(missing)}")
print(f"Extra weeks: {len(extra)}")

# Show dates that were adjusted
moved = (
    txt_df.loc[txt_df["date"] != txt_df["week_fri"], ["date", "week_fri"]]
    .drop_duplicates()
    .sort_values("date")
)
print(f"\nDates adjusted to nearest Friday: {len(moved)}")
display(moved)

Duplicate (week_fri, region) rows: 0
Date range: 2017-11-03 → 2025-01-24
Unique weeks: 378
Expected weeks: 378
Missing weeks: 0
Extra weeks: 0

Dates adjusted to nearest Friday: 8


Unnamed: 0,date,week_fri
5,2017-11-09,2017-11-10
390,2019-05-06,2019-05-03
465,2019-08-19,2019-08-16
695,2020-07-02,2020-07-03
825,2020-12-31,2021-01-01
1080,2021-12-23,2021-12-24
1085,2021-12-30,2021-12-31
1310,2022-11-10,2022-11-11


In [29]:
# Finalize historical data
txt_df["date"] = txt_df["week_fri"]
txt_df = txt_df.drop("week_fri", axis=1).copy()

# Standardize region names
REGION_MAP = {
    "COMBINED REGIONAL": "National",
    "MIDWEST": "Midwest",
    "NORTHEAST": "Northeast",
    "SOUTH CENTRAL": "South Central",
    "SOUTHEAST": "Southeast",
}
txt_df["region"] = txt_df["region"].map(REGION_MAP)

print(f"Regions: {txt_df['region'].unique().tolist()}")

Regions: ['National', 'Midwest', 'Northeast', 'South Central', 'Southeast']


## Recent Data (JSON via AMS API)

In [30]:
# Fetch recent data from AMS API

SLUG = 2848  # Report ID for shell egg prices
start = "01/20/2025"  # Start after TXT data ends
end = pd.Timestamp.today().strftime("%m/%d/%Y")

response = requests.get(
    f"{AMS_BASE}/reports/{SLUG}",
    params={"q": f"report_begin_date={start}:{end}", "allSections": "true"},
    auth=(AMS_KEY, ""),
    timeout=120,
)
print(f"Status: {response.status_code}")
print(f"URL: {response.url}")
response.raise_for_status()

rep_range = response.json()
api_df = pd.DataFrame(rep_range[4]["results"])
print(f"Raw rows from API: {len(api_df):,}")

Status: 200
URL: https://marsapi.ams.usda.gov/services/v1.2/reports/2848?q=report_begin_date%3D01%2F20%2F2025%3A02%2F02%2F2026&allSections=true
Raw rows from API: 1,113


In [31]:
# Format API data to match historical format

api_df = api_df.query(
    "`class` in ['Extra Large', 'Large', 'Medium'] and delivery == 'Delivered Warehouse'"
)
api_df = api_df[["report_begin_date", "class", "region", "avg_price"]]

api_df = (
    api_df
    .pivot(
        index=["report_begin_date", "region"],
        columns="class",
        values="avg_price",
    )
    .reset_index()
)

# Match column names to TXT data
api_df.columns = txt_df.columns

# Parse and normalize dates to Friday
api_df["date"] = pd.to_datetime(api_df["date"], format="%m/%d/%Y").dt.normalize()
api_df["date"] = api_df["date"].map(fri.rollforward)

# Ensure numeric types
api_df["ex_large"] = api_df["ex_large"].astype(float)
api_df["large"] = api_df["large"].astype(float)
api_df["medium"] = api_df["medium"].astype(float)

print(f"Formatted API rows: {len(api_df):,}")
print(f"Date range: {api_df['date'].min().date()} → {api_df['date'].max().date()}")

Formatted API rows: 265
Date range: 2025-01-31 → 2026-01-30


In [32]:
# Combine historical and recent data
wholesale_price = (
    pd.concat([txt_df, api_df])
    .sort_values(["date", "region"])
    .drop_duplicates()
    .reset_index(drop=True)
)

# Validate combined data
validate_dataframe(wholesale_price, "Wholesale Prices (Weekly)")

# Check for duplicates on primary key
dup_check = wholesale_price.duplicated(subset=["date", "region"]).sum()
print(f"\nDuplicates on (date, region): {dup_check}")


Validation: Wholesale Prices (Weekly)
Rows: 2,155
Columns: ['date', 'region', 'ex_large', 'large', 'medium']
Date range: 2017-11-03 00:00:00 → 2026-01-30 00:00:00
Nulls per column:
date        0
region      0
ex_large    0
large       0
medium      0
dtype: int64
Duplicates on date: 1724

Sample (first 3 rows):


Unnamed: 0,date,region,ex_large,large,medium
0,2017-11-03,Midwest,90.5,88.5,74.5
1,2017-11-03,National,101.33,96.29,79.15
2,2017-11-03,Northeast,99.0,94.0,80.0



Duplicates on (date, region): 0


In [33]:
# Save weekly wholesale prices
save_to_db_and_csv(wholesale_price, "WHOLESALE_PRICE_WEEKLY", "wholesale_price_weekly.csv", con)

Saved: WHOLESALE_PRICE_WEEKLY → /home/akimovh/rockets_feathers/eggs/data/usda/wholesale_price_weekly.csv


## Aggregate to Monthly

In [34]:
# Aggregate weekly prices to monthly averages
# Expand each week to daily observations, then average by month

df_weekly = wholesale_price.copy()
df_weekly["week_start"] = df_weekly["date"] - pd.Timedelta(days=6)
df_weekly["week_end"] = df_weekly["date"]


def expand_week_to_months(row):
    """Expand a week's prices to daily observations for proper monthly averaging."""
    dates = pd.date_range(row.week_start, row.week_end, freq="D")
    return pd.DataFrame({
        "date": dates,
        "month": dates.to_period("M").to_timestamp(),
        "price_ex_large": row.ex_large,
        "price_large": row.large,
        "price_medium": row.medium,
        "region": row.region,
    })


expanded_df = pd.concat(
    df_weekly.apply(expand_week_to_months, axis=1).tolist(),
    ignore_index=True,
)

monthly_df = (
    expanded_df
    .groupby(["month", "region"], as_index=False)
    .agg({
        "price_ex_large": "mean",
        "price_large": "mean",
        "price_medium": "mean",
    })
)
monthly_df.columns = ["date", "region", "price_ex_large", "price_large", "price_medium"]

# Validate
validate_dataframe(monthly_df, "Wholesale Prices (Monthly)")


Validation: Wholesale Prices (Monthly)
Rows: 500
Columns: ['date', 'region', 'price_ex_large', 'price_large', 'price_medium']
Date range: 2017-10-01 00:00:00 → 2026-01-01 00:00:00
Nulls per column:
date              0
region            0
price_ex_large    0
price_large       0
price_medium      0
dtype: int64
Duplicates on date: 400

Sample (first 3 rows):


Unnamed: 0,date,region,price_ex_large,price_large,price_medium
0,2017-10-01,Midwest,90.5,88.5,74.5
1,2017-10-01,National,101.33,96.29,79.15
2,2017-10-01,Northeast,99.0,94.0,80.0


In [35]:
# Save monthly wholesale prices
save_to_db_and_csv(monthly_df, "WHOLESALE_PRICE_MONTHLY", "wholesale_price_monthly.csv", con)

Saved: WHOLESALE_PRICE_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/wholesale_price_monthly.csv


---
# Combined Monthly Dataset

In [36]:
# Merge all monthly datasets

dfs_to_merge = [
    soy_prices,
    corn_prices,
    egg_prod_doz,
    layer_inv,
    loss_dth_render,
    monthly_df.query("region == 'National'").drop(columns=["region"]),  # National prices only
]

egg_stat = reduce(
    lambda left, right: pd.merge(left, right, on="date", how="inner"),
    dfs_to_merge,
)

# Filter to analysis period
egg_stat = egg_stat.query("date >= '2018-01-01' and date < '2025-10-01'").copy()

# Validate
validate_dataframe(egg_stat, "Combined Egg Statistics (Monthly)")


Validation: Combined Egg Statistics (Monthly)
Rows: 93
Columns: ['cost_sb_us', 'date', 'cost_corn_us', 'egg_prod_doz', 'layer_inv', 'loss_dth_render', 'price_ex_large', 'price_large', 'price_medium']
Date range: 2018-01-01 00:00:00 → 2025-09-01 00:00:00
Nulls per column:
cost_sb_us         0
date               0
cost_corn_us       0
egg_prod_doz       0
layer_inv          0
loss_dth_render    0
price_ex_large     0
price_large        0
price_medium       0
dtype: int64
Duplicates on date: 0

Sample (first 3 rows):


Unnamed: 0,cost_sb_us,date,cost_corn_us,egg_prod_doz,layer_inv,loss_dth_render,price_ex_large,price_large,price_medium
3,341.716506,2018-01-01,129.521579,670016600,329809000,10175700,127.786129,125.01129,100.276774
4,349.065248,2018-02-01,133.064723,607308600,329125000,10202100,161.273929,159.872143,118.452857
5,360.455799,2018-03-01,138.182596,677992000,333188000,10482800,222.17,220.675484,132.794194


In [37]:
# Save combined dataset
save_to_db_and_csv(egg_stat, "EGG_STAT_MONTHLY", "egg_stat_monthly.csv", con)

Saved: EGG_STAT_MONTHLY → /home/akimovh/rockets_feathers/eggs/data/usda/egg_stat_monthly.csv


### Flu outbreak data

In [68]:
flu = pd.read_csv('/home/akimovh/rockets_feathers/eggs/data/usda/flu.csv')
flu.columns = ['date', 'state', 'value']
flu = flu.groupby(by = 'date', as_index = False).value.sum()
flu['date'] = pd.to_datetime(flu['date'])
flu_weekly = (
    flu
    .set_index('date')
    .resample('W-SAT', label='right', closed='right')
    .sum()
    .reset_index()
)

save_to_db_and_csv(flu_weekly, "FLU_WEEKLY", "flu_weekly.csv", con)



Saved: FLU_WEEKLY → /home/akimovh/rockets_feathers/eggs/data/usda/flu_weekly.csv


---
# Final Validation

In [38]:
# List all tables created
print("Tables in database:")
display(con.execute("SHOW TABLES").df())

# Quick row counts
print("\nRow counts:")
tables = con.execute("SHOW TABLES").df()["name"].tolist()
for table in tables:
    count = con.execute(f"SELECT COUNT(*) as n FROM {table}").fetchone()[0]
    print(f"  {table}: {count:,}")

Tables in database:


Unnamed: 0,name
0,COST_CORN_US_MONTHLY
1,COST_SB_US_MONTHLY
2,EGG_PROD_DOZ_MONTHLY
3,EGG_STAT_MONTHLY
4,LAYER_INV_MONTHLY
5,LOSS_DTH_RENDER_MONTHLY
6,WHOLESALE_PRICE_MONTHLY
7,WHOLESALE_PRICE_WEEKLY



Row counts:
  COST_CORN_US_MONTHLY: 432
  COST_SB_US_MONTHLY: 432
  EGG_PROD_DOZ_MONTHLY: 157
  EGG_STAT_MONTHLY: 93
  LAYER_INV_MONTHLY: 217
  LOSS_DTH_RENDER_MONTHLY: 217
  WHOLESALE_PRICE_MONTHLY: 500
  WHOLESALE_PRICE_WEEKLY: 2,155


In [70]:
# Close database connection
con.close()
print("Database connection closed.")

Database connection closed.
