# USDA Fruit & Vegetables Data Processor

This notebook implements the ETL pipeline for USDA Fruit and Vegetable retail price data.

## Setup & Imports

In [None]:
import os
import re
import tempfile
from collections import defaultdict
from decimal import Decimal, InvalidOperation
from pathlib import Path
from zipfile import ZipFile

import openpyxl
import requests
from bs4 import BeautifulSoup

## Constants

These constants define the processing behavior:
- `LISTING_URL`: USDA data products page with download links
- `OUTPUT_DIR`: Destination for generated CSV files (layout: `alternative/usda/fruitandvegetables/{series}.csv`). Defaults to `/temp-output-directory` (Datafleet cloud path). For local development, set `TEMP_OUTPUT_DIRECTORY=./output`.
- `YEAR_REGEX`: Matches 4-digit years (1900-2099) in titles/filenames
- Unit mappings: Canonical forms derived from empirical profiling of 365 XLSX files

In [None]:
# --- Constants ---
LISTING_URL = "https://www.ers.usda.gov/data-products/fruit-and-vegetable-prices"

# Output directory: defaults to /temp-output-directory ("Datafleet" cloud path).
# For local development, set TEMP_OUTPUT_DIRECTORY=./output to write to the repo's output folder.
OUTPUT_DIR = Path(os.environ.get("TEMP_OUTPUT_DIRECTORY", "/temp-output-directory")) / "alternative/usda/fruitandvegetables"

YEAR_REGEX = re.compile(r"\b((?:19|20)\d{2})\b")
FOOTNOTE_REGEX = re.compile(r"\s*\d+(?:,\d+)*\s*$")
FORM_CATEGORY_LABELS = {
    "fresh",
    "canned",
    "frozen",
    "dried",
    "juice",
    "peas & carrots",
    "green peas & carrots",
    "succotash",
}

# --- XLSX Structure Constants ---
# Empirically profiled from 365 USDA XLSX files (2026-01-15):
# - Header rows observed at index 0-1; we check up to MAX_HEADER_SEARCH_ROWS for safety margin
# - Data rows have 7-9 columns; MIN_DATA_ROW_COLUMNS is the minimum for a valid data row
MAX_HEADER_SEARCH_ROWS = 15
MIN_DATA_ROW_COLUMNS = 7

# Note: Extended descriptions like "per pint (16 fluid ounces concentrate)" match via substring.
PRICE_UNIT_MAP = {
    "per pound": "per_pound",
    "per pint": "per_pint",
}
CUP_UNIT_MAP = {
    "pounds": "pounds",
    "pound": "pounds",
    "pints": "pints",
    "fluid ounces": "fluid_ounces",
    "fl oz": "fluid_ounces",
    "fl. oz.": "fluid_ounces",
}

## Helper Functions

Utility functions for text processing and decimal formatting.

In [None]:
def slugify(text: str) -> str:
    """Convert text to lowercase slug with underscores."""
    return re.sub(r"[^a-z0-9]+", "_", text.lower()).strip("_")


def collapse_whitespace(text: str) -> str:
    """Collapse multiple whitespace characters into single spaces."""
    return " ".join(text.split())


def format_decimal_for_csv(value: Decimal | None) -> str:
    """Format decimal for CSV output, stripping trailing zeros."""
    return "" if value is None else format(value, "f").rstrip("0").rstrip(".")


def parse_decimal(value: object) -> Decimal | None:
    """Parse a cell value as Decimal. Handles int, float, and string inputs uniformly via str()."""
    if value is None:
        return None
    text = str(value).strip()
    if not text:
        return None
    try:
        return Decimal(text)
    except InvalidOperation:
        return None


def lookup_canonical_unit(raw_unit_text: object, unit_map: dict[str, str]) -> str:
    """Look up canonical unit form from raw XLSX text using substring matching.

    Uses next() with generator expression - idiomatic Python for "find first match or default".
    Equivalent to a for-loop with early return, but expresses intent more clearly.
    """
    if raw_unit_text is None:
        return ""
    text = collapse_whitespace(str(raw_unit_text)).lower()
    return next((canonical for pattern, canonical in unit_map.items() if pattern in text), "")


def make_series_code(product: str, form: str) -> str:
    """Generate series code from product name and form."""
    return f"{slugify(product)}_{slugify(form)}"


def normalize_cup_equivalent(size: Decimal | None, unit: str) -> tuple[Decimal | None, str]:
    """Normalize cup equivalent units: convert fluid_ounces to pints (16 fl oz = 1 pint)."""
    if size is None or unit != "fluid_ounces":
        return size, unit
    return size / Decimal("16"), "pints"

## Download & Extract

Downloads XLSX/ZIP files from the USDA website and extracts them to a temporary directory.

In [None]:
def download_and_extract(temp_dir: str) -> list[Path]:
    """Download XLSX/ZIP files from USDA and extract to temp directory.

    Note: BeautifulSoup's dynamic attribute access doesn't have complete type stubs,
    so we use explicit str() conversion to satisfy type checkers.
    """
    print(f"Fetching {LISTING_URL}")
    response = requests.get(LISTING_URL, timeout=60)
    response.raise_for_status()
    soup = BeautifulSoup(response.content, "html.parser")

    download_links: list[str] = []
    for link_tag in soup.find_all("a", href=True):
        href: str = str(link_tag["href"])
        # Strip query string before checking extension (handles ".xlsx?timestamp=123" patterns)
        base_href = href.lower().split("?")[0]
        if base_href.endswith((".xlsx", ".zip")):
            download_links.append(href)

    print(f"Found {len(download_links)} files to download")
    xlsx_files: list[Path] = []

    for link in download_links:
        url = link if link.startswith("http") else f"https://www.ers.usda.gov{link}"
        filename = url.split("/")[-1].split("?")[0]
        local_path = Path(temp_dir) / filename

        try:
            print(f"Downloading {filename}")
            file_response = requests.get(url, timeout=60)
            file_response.raise_for_status()
            local_path.write_bytes(file_response.content)

            if filename.endswith(".zip"):
                with ZipFile(local_path, "r") as zip_archive:
                    for archived_name in zip_archive.namelist():
                        if archived_name.endswith(".xlsx"):
                            zip_archive.extract(archived_name, temp_dir)
                            xlsx_files.append(Path(temp_dir) / archived_name)
            else:
                xlsx_files.append(local_path)
        except Exception as e:
            print(f"Error downloading {filename}: {e}")

    return xlsx_files

## XLSX Parsing

Functions to parse XLSX files and extract price data from worksheets.

In [None]:
def find_header_row(rows: list[list[object]]) -> int:
    """Find header row index by looking for 'Form' and 'Average retail price'."""
    for i, row in enumerate(rows[:MAX_HEADER_SEARCH_ROWS]):
        text = " ".join(collapse_whitespace(str(v)) for v in row if v).lower()
        if "form" in text and "average retail price" in text:
            return i
        # Check merged header (split across two rows)
        if i + 1 < len(rows):
            next_text = " ".join(collapse_whitespace(str(v)) for v in rows[i + 1] if v).lower()
            combined = text + " " + next_text
            if "form" in combined and "average retail price" in combined:
                return i + 1
    return -1


def extract_year(rows: list[list[object]], header_row_index: int, sheet_title: str, filename: str) -> int | None:
    """Extract year from title rows, sheet name, or filename (in priority order).

    Uses walrus operator (:=) to search and capture in a single expression.
    Sources are checked in order: title rows first (most reliable), then sheet name, then filename.
    """
    sources = [str(row[0]) for row in rows[:header_row_index] if row and row[0]]
    sources += [sheet_title, filename]
    for source in sources:
        if match := YEAR_REGEX.search(source):
            return int(match.group(1))
    return None


def extract_product_name(rows: list[list[object]], header_row_index: int, sheet_title: str) -> str:
    """Extract product name from title row or sheet name."""
    for row in rows[:header_row_index]:
        if row and row[0]:
            title = str(row[0]).strip()
            # Split on em-dash or hyphen (USDA uses various dash styles)
            for delim in ("\u2014", " - ", " \u2013 "):
                if delim in title:
                    return title.split(delim)[0].strip()
            if title:
                return title
    return sheet_title.strip()

In [None]:
def parse_data_row(
    row: list[object], product_name: str, date_str: str, current_group: str | None
) -> tuple[str, str] | None:
    """Parse a data row into (series_code, csv_line), ("__GROUP__", group_name), or None.

    Returns:
        - (series_code, csv_row): Valid data row
        - ("__GROUP__", group_name): Group header (e.g., "Fresh", "Canned") - signals context change
        - None: Skip row (footnote, source line, or insufficient data)
    """
    if len(row) < MIN_DATA_ROW_COLUMNS:
        return None

    form_raw = str(row[0] or "").strip()
    if not form_raw:
        return None

    # Skip non-data rows: footnotes (start with digit), source/contact lines
    form_lower = form_raw.lower()
    if form_raw[0].isdigit() or form_lower.startswith(("source", "contact", "errata")):
        return None

    # Parse numeric values first to determine if this is a data row or group header
    avg_price = parse_decimal(row[1])
    yield_factor = parse_decimal(row[3])
    cup_size = parse_decimal(row[4])
    price_per_cup = parse_decimal(row[6])
    has_numeric_data = any(v is not None for v in (avg_price, yield_factor, cup_size, price_per_cup))

    # Check for group headers (Fresh, Canned, etc.) - must have no numeric data
    # Strip footnotes first (e.g., "Fresh1" → "Fresh") before checking
    form_normalized = FOOTNOTE_REGEX.sub("", form_raw).strip().lower()
    if form_normalized in FORM_CATEGORY_LABELS and not has_numeric_data:
        return ("__GROUP__", form_normalized.title())

    # All numeric values missing = not a data row (but wasn't a group header either)
    if not has_numeric_data:
        return None

    # Strip trailing footnote markers from form
    form = FOOTNOTE_REGEX.sub("", form_raw).strip()
    if not form:
        return None

    # Apply group context (e.g., "Florets" -> "Fresh, Florets")
    # But don't apply if form already contains the current group
    if current_group:
        form_lower_clean = form.lower()
        if current_group.lower() not in form_lower_clean:
            form = f"{current_group}, {form}"

    # Parse units
    price_unit = lookup_canonical_unit(row[2], PRICE_UNIT_MAP) if avg_price is not None else ""
    cup_unit = lookup_canonical_unit(row[5], CUP_UNIT_MAP) if cup_size is not None else ""

    # Normalize cup equivalent units (fluid_ounces → pints)
    cup_size, cup_unit = normalize_cup_equivalent(cup_size, cup_unit)

    series_code = make_series_code(product_name, form)
    csv_row = f"{date_str},{format_decimal_for_csv(avg_price)},{price_unit},{format_decimal_for_csv(yield_factor)},{format_decimal_for_csv(cup_size)},{cup_unit},{format_decimal_for_csv(price_per_cup)}"

    return series_code, csv_row

In [None]:
def parse_xlsx(file_path: Path, series_csv_rows: dict[str, dict[int, str]]) -> None:
    """Parse single XLSX file into series_csv_rows.

    Note: openpyxl's cell.value has complex union types. We use list[list[object]]
    as a practical type annotation that captures "list of rows, each row is a list of cell values".
    """
    workbook = openpyxl.load_workbook(file_path, data_only=True)

    for sheet in workbook.worksheets:
        rows: list[list[object]] = [[cell.value for cell in row] for row in sheet.iter_rows()]
        if not rows:
            continue

        header_row_index = find_header_row(rows)
        if header_row_index < 0:
            print(f"Warning: No header found in {file_path.name} sheet {sheet.title}")
            continue

        year = extract_year(rows, header_row_index, sheet.title, file_path.name)
        if not year:
            print(f"Warning: No year found in {file_path.name} sheet {sheet.title}")
            continue

        product_name = extract_product_name(rows, header_row_index, sheet.title)
        # Offset by 1 year to prevent look-ahead bias (publication date unknown)
        date_str = f"{year + 1}0101"
        current_group = None  # Track group context (Fresh, Canned, etc.)

        for row_idx, row in enumerate(rows[header_row_index + 1 :], start=header_row_index + 2):
            try:
                row_result = parse_data_row(row, product_name, date_str, current_group)
                if row_result is None:
                    continue
                if row_result[0] == "__GROUP__":
                    current_group = row_result[1]
                    continue
                series_code, csv_row = row_result
                series_csv_rows[series_code][year] = csv_row
            except ValueError as e:
                print(f"Warning: {file_path.name} sheet {sheet.title} row {row_idx}: {e}")


def parse_all_files(xlsx_files: list[Path]) -> dict[str, dict[int, str]]:
    """Parse all XLSX files and return data grouped by series code."""
    series_csv_rows: dict[str, dict[int, str]] = defaultdict(dict)
    for file_path in xlsx_files:
        try:
            parse_xlsx(file_path, series_csv_rows)
        except Exception as e:
            print(f"Error parsing {file_path.name}: {e}")
    return series_csv_rows

## Output

Writes one CSV file per series, sorted by date.

In [None]:
def write_output(series_csv_rows: dict[str, dict[int, str]]) -> None:
    """Write CSV files for each series."""
    for series_code, rows_by_year in sorted(series_csv_rows.items()):
        output_path = OUTPUT_DIR / f"{series_code}.csv"
        csv_rows = [csv_row for _, csv_row in sorted(rows_by_year.items())]
        output_path.write_text("\n".join(csv_rows))
        print(f"Wrote {len(csv_rows)} rows to {output_path.name}")

## Main Entry Point

Run the full ETL pipeline.

In [None]:
def main() -> int:
    print("USDA Fruit & Vegetables Data Processor")
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    with tempfile.TemporaryDirectory() as temp_dir:
        xlsx_files = download_and_extract(temp_dir)
        if not xlsx_files:
            print("Error: No XLSX files found")
            return 1

        series_csv_rows = parse_all_files(xlsx_files)
        if not series_csv_rows:
            print("Error: No data parsed")
            return 1

        write_output(series_csv_rows)

    print("Processing complete")
    return 0


# Run the processor. Raise SystemExit only on failure so callers can detect errors.
# Success (exit code 0) completes silently to avoid nbconvert treating it as an exception.
_exit_code = main()
if _exit_code != 0:
    raise SystemExit(_exit_code)