From d660f2d3bfc27563c4e3ae141d518883c836a582 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 21 Apr 2026 15:12:54 -0700 Subject: [PATCH] feat(data): one-shot migration for OHLCV-only ArcticDB symbols MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds builders/promote_ohlcv_only_schema.py — scans every symbol in the universe library and rewrites any whose stored schema lacks feature columns to the full OHLCV + FEATURE schema. Context: PR #76 (short-history first-class) persisted some symbols as OHLCV-only. PR #78 (per-feature graceful degrade) now writes full schema on every daily_append pass. ArcticDB update() enforces schema match, so the transitional OHLCV-only symbols fail today's daily_append with n_err and their row never lands. 2026-04-21 post-#78 run reported n_err=2. The migration reads each candidate's OHLCV history, runs compute_features (partial-feature semantics per PR #78), and calls lib.write() to replace the symbol. write() is authoritative for schema; update() is incremental and cannot widen columns. One-shot, idempotent — symbols already at full schema are skipped. Supports --dry-run for plan review and --ticker X for targeted retries. Regression tests lock: - write() (not update()) for the rewrite - exhaustive FEATURE-column detection (no heuristic subsets) - explicit error reason on empty compute_features (no silent skip) - --dry-run guards the write() call - partial-features structured log matches daily_append convention Co-Authored-By: Claude Opus 4.7 (1M context) --- builders/promote_ohlcv_only_schema.py | 322 ++++++++++++++++++++++++ tests/test_promote_ohlcv_only_schema.py | 104 ++++++++ 2 files changed, 426 insertions(+) create mode 100644 builders/promote_ohlcv_only_schema.py create mode 100644 tests/test_promote_ohlcv_only_schema.py diff --git a/builders/promote_ohlcv_only_schema.py b/builders/promote_ohlcv_only_schema.py new file mode 100644 index 0000000..89656ab --- /dev/null +++ b/builders/promote_ohlcv_only_schema.py @@ -0,0 +1,322 @@ +""" +builders/promote_ohlcv_only_schema.py — one-shot schema migration. + +Identifies ArcticDB universe symbols whose stored schema lacks feature +columns (OHLCV-only), recomputes features against the current feature +set, and rewrites the symbol so daily_append's ``universe_lib.update()`` +stops rejecting today's row with a column-mismatch error. + +Context: + PR #76 (2026-04-21 morning) introduced a short-history write branch + that persisted OHLCV-only rows for new-listing tickers (SNDK after + the 2026 WDC spinoff, plus a handful of others). PR #78 (same day + evening) unified the write path so every row now includes the full + FEATURE schema — with NaN for features whose rolling-window warmup + exceeds available history. + + The two regimes left some symbols in a transitional state: their + stored rows are OHLCV-only, but ``daily_append`` now wants to write + full-schema rows. ArcticDB's ``update()`` enforces schema match — + mismatched updates surface as n_err and the symbol's today row never + lands. 2026-04-21 daily_append post-#78 reported n_err=2. + + This migration reads each affected symbol, runs ``compute_features`` + on its OHLCV history (which now returns partial-feature rows per PR + #78), and calls ``lib.write()`` to replace the symbol with the full + schema. ``write()`` is authoritative for schema; ``update()`` is + incremental. After this runs once, every symbol shares the canonical + full-feature schema and daily_append's update() succeeds on every + row. + + This is NOT a pipeline component — it runs once (or once per incident + if a similar schema split ever recurs) and is safe to run multiple + times (idempotent — symbols already at full schema are skipped). + +Usage: + python -m builders.promote_ohlcv_only_schema # migrate + python -m builders.promote_ohlcv_only_schema --dry-run # report only + python -m builders.promote_ohlcv_only_schema --ticker SNDK # single symbol +""" + +from __future__ import annotations + +import argparse +import logging +import sys +import time +from datetime import datetime, timezone + +import numpy as np +import pandas as pd + +from features.feature_engineer import FEATURES, compute_features +from features.compute import ( + DEFAULT_BUCKET, + _load_sector_map, + _load_cached_fundamentals, + _load_cached_alternative, +) +from store.arctic_store import get_universe_lib, get_macro_lib + +log = logging.getLogger(__name__) + +OHLCV_COLS = ["Open", "High", "Low", "Close", "Volume", "VWAP"] + +# Macro keys daily_append pulls for feature compute. Mirrored here so +# migrated rows use the same inputs the live pipeline uses. +MACRO_KEYS = ["SPY", "VIX", "VIX3M", "TNX", "IRX", "GLD", "USO"] + + +def _needs_promotion(df: pd.DataFrame) -> bool: + """True if the symbol is OHLCV-only (missing FEATURE columns). + + Any symbol whose schema is missing *any* canonical feature column is + a promotion candidate — PR #78's unified write path will reject + partial-schema symbols on the next daily_append run. + """ + return any(f not in df.columns for f in FEATURES) + + +def _load_macro_series(macro_lib, bucket: str) -> dict[str, pd.Series]: + """Load SPY/VIX/TNX/IRX/GLD/USO/VIX3M + every sector ETF as Close series. + + Matches daily_append's macro-loading semantics (loud fail on missing + or schema-drifted series — these are critical feature inputs). + """ + macro: dict[str, pd.Series] = {} + for key in MACRO_KEYS: + mdf = macro_lib.read(key).data + if "Close" not in mdf.columns: + raise RuntimeError( + f"Macro series {key} has no Close column — ArcticDB schema drift" + ) + macro[key] = mdf["Close"].dropna() + + # Sector ETFs — discover dynamically from the macro library so we + # don't miss any that the live feature pipeline uses. + for sym in macro_lib.list_symbols(): + if sym.startswith("XL"): + mdf = macro_lib.read(sym).data + if "Close" not in mdf.columns: + raise RuntimeError( + f"Sector ETF {sym} has no Close column — ArcticDB schema drift" + ) + macro[sym] = mdf["Close"].dropna() + return macro + + +def _promote_symbol( + ticker: str, + universe_lib, + macro: dict[str, pd.Series], + sector_map: dict[str, str], + fundamentals: dict, + alt_data: dict, + dry_run: bool, +) -> dict: + """Migrate one symbol: recompute features against stored OHLCV and + replace via ``lib.write()``. + + Returns a per-ticker status dict including which features resolved + vs stayed NaN — the same observability daily_append's n_partial log + emits. + """ + hist = universe_lib.read(ticker).data + + ohlcv_cols = [c for c in OHLCV_COLS if c in hist.columns] + if not ohlcv_cols: + return {"ticker": ticker, "status": "skipped", "reason": "no OHLCV columns"} + + combined = hist[ohlcv_cols].copy() + + sector_etf_sym = sector_map.get(ticker) + sector_etf_series = macro.get(sector_etf_sym) if sector_etf_sym else None + ticker_alt = alt_data.get(ticker, {}) + + featured = compute_features( + combined, + spy_series=macro.get("SPY"), + vix_series=macro.get("VIX"), + sector_etf_series=sector_etf_series, + tnx_series=macro.get("TNX"), + irx_series=macro.get("IRX"), + gld_series=macro.get("GLD"), + uso_series=macro.get("USO"), + vix3m_series=macro.get("VIX3M"), + earnings_data=ticker_alt.get("earnings"), + revision_data=ticker_alt.get("revisions"), + options_data=ticker_alt.get("options"), + fundamental_data=fundamentals.get(ticker), + ) + + if featured.empty: + return {"ticker": ticker, "status": "error", "reason": "compute_features empty"} + + keep_cols = [c for c in OHLCV_COLS if c in featured.columns] + \ + [f for f in FEATURES if f in featured.columns] + out = featured[keep_cols].copy() + + # Match stored OHLCV dtypes per-column; features uniformly float32 + # (the training schema, and the default for new feature columns). + for col in out.columns: + if col in hist.columns: + out[col] = out[col].astype(hist.dtypes[col]) + elif col in FEATURES: + out[col] = out[col].astype("float32") + out.index.name = "date" + + # Per-ticker feature coverage on the most recent row, for + # observability parity with daily_append's n_partial log. + last = out.iloc[-1] + nan_features = [ + f for f in FEATURES + if f in out.columns and isinstance(last[f], float) and np.isnan(last[f]) + ] + + if not dry_run: + # write() (not update()) replaces the whole symbol with this + # frame. That's the whole point: switching from OHLCV-only to + # OHLCV + FEATURE schema requires a full rewrite. + universe_lib.write(ticker, out) + + return { + "ticker": ticker, + "status": "promoted" if not dry_run else "would_promote", + "rows": len(out), + "features_ok": len(FEATURES) - len(nan_features), + "features_nan": len(nan_features), + "nan_feature_names": nan_features, + } + + +def promote_schemas( + bucket: str = DEFAULT_BUCKET, + dry_run: bool = False, + ticker_filter: str | None = None, +) -> dict: + """Scan every universe symbol and promote those with OHLCV-only schema.""" + t0 = time.time() + universe_lib = get_universe_lib(bucket) + macro_lib = get_macro_lib(bucket) + + symbols = universe_lib.list_symbols() + if ticker_filter: + symbols = [s for s in symbols if s == ticker_filter] + if not symbols: + raise RuntimeError(f"Ticker {ticker_filter} not in universe library") + + log.info("Scanning %d symbols for OHLCV-only schema ...", len(symbols)) + + # Identify candidates first. A fast read per symbol; avoids loading + # supporting data if nothing needs promotion. + candidates: list[str] = [] + errors: list[dict] = [] + for sym in symbols: + try: + df = universe_lib.read(sym).data + except Exception as exc: + errors.append({"ticker": sym, "status": "error", "reason": f"read failed: {exc}"}) + continue + if _needs_promotion(df): + candidates.append(sym) + + log.info("Found %d symbols needing promotion: %s", len(candidates), candidates) + if not candidates: + return { + "status": "ok", + "scanned": len(symbols), + "needs_promotion": 0, + "promoted": [], + "errors": errors, + "total_seconds": round(time.time() - t0, 1), + "dry_run": dry_run, + } + + # Load supporting data only when we actually have work to do. + sector_map = _load_sector_map(None, bucket) if not dry_run or candidates else {} + fundamentals = _load_cached_fundamentals( + None, bucket, + datetime.now(timezone.utc).strftime("%Y-%m-%d"), + ) if not dry_run or candidates else {} + alt_data = _load_cached_alternative(None, bucket) if not dry_run or candidates else {} + macro = _load_macro_series(macro_lib, bucket) + + results: list[dict] = [] + for sym in candidates: + try: + r = _promote_symbol( + sym, universe_lib, macro, + sector_map, fundamentals, alt_data, + dry_run, + ) + log.info( + "%s ticker=%s rows=%s features_ok=%s features_nan=%s", + r.get("status"), r["ticker"], + r.get("rows"), r.get("features_ok"), r.get("features_nan"), + ) + if r.get("features_nan", 0) > 0: + log.warning( + "partial-features ticker=%s nan=%d/%d features=%s", + r["ticker"], r["features_nan"], len(FEATURES), + r.get("nan_feature_names", []), + ) + results.append(r) + except Exception as exc: + log.warning("Failed to promote %s: %s", sym, exc) + errors.append({"ticker": sym, "status": "error", "reason": str(exc)}) + + return { + "status": "ok", + "scanned": len(symbols), + "needs_promotion": len(candidates), + "promoted": results, + "errors": errors, + "total_seconds": round(time.time() - t0, 1), + "dry_run": dry_run, + } + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Promote OHLCV-only universe symbols to the full FEATURE schema" + ) + parser.add_argument( + "--bucket", default=DEFAULT_BUCKET, + help=f"S3 bucket (default: {DEFAULT_BUCKET})", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Scan + report which symbols would be promoted; no writes", + ) + parser.add_argument( + "--ticker", default=None, + help="Migrate a single ticker (for targeted retries)", + ) + parser.add_argument( + "--verbose", "-v", action="store_true", + help="Enable debug logging", + ) + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", + datefmt="%H:%M:%S", + ) + + result = promote_schemas( + bucket=args.bucket, + dry_run=args.dry_run, + ticker_filter=args.ticker, + ) + + import json + print(json.dumps(result, indent=2, default=str)) + + if result["errors"]: + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_promote_ohlcv_only_schema.py b/tests/test_promote_ohlcv_only_schema.py new file mode 100644 index 0000000..f035a53 --- /dev/null +++ b/tests/test_promote_ohlcv_only_schema.py @@ -0,0 +1,104 @@ +"""Regression: builders/promote_ohlcv_only_schema.py must rewrite +symbols via ``lib.write()`` (not ``lib.update()``) so the stored schema +is replaced with the full OHLCV + FEATURE set. + +Context: + PR #76 introduced short-history OHLCV-only rows before the unified + path landed in PR #78. Two symbols in the 2026-04-21 post-#78 + daily_append run hit an ArcticDB column-mismatch (``update()`` + enforces schema match). This migration promotes their stored + schema; the tests below lock its invariants so a regression to + ``update()`` or a silent skip on empty results fails loudly. +""" + +from pathlib import Path + + +_SCRIPT = Path(__file__).parent.parent / "builders" / "promote_ohlcv_only_schema.py" + + +def _source() -> str: + return _SCRIPT.read_text() + + +def test_script_exists(): + assert _SCRIPT.exists(), f"promote_ohlcv_only_schema.py missing at {_SCRIPT}" + + +def test_promotion_uses_write_not_update(): + """Schema promotion MUST call ``lib.write()`` — ``update()`` would + fail with the same column-mismatch that motivated this script. + + A regression that swaps back to ``update()`` would silently no-op + every promotion while reporting success. + """ + src = _source() + assert "universe_lib.write(ticker, out)" in src, ( + "Schema promotion must call universe_lib.write(ticker, out). " + "update() enforces schema match and cannot widen OHLCV-only to " + "OHLCV+FEATURE — that's the whole point of this migration." + ) + # update() has no legitimate use here — forbid it in the write path. + assert "universe_lib.update(ticker" not in src, ( + "Found universe_lib.update() — regression of the column-mismatch " + "bug the script exists to fix." + ) + + +def test_candidate_detection_scans_every_feature(): + """``_needs_promotion`` must flag a symbol when ANY feature column + is missing, not just some heuristic subset. + + Any partial-schema symbol will fail the next daily_append update(), + so the detector must be exhaustive. + """ + src = _source() + # The check walks FEATURES and returns True on any missing column. + assert "any(f not in df.columns for f in FEATURES)" in src, ( + "_needs_promotion must check every f in FEATURES against " + "df.columns. Partial subsets miss real mismatches." + ) + + +def test_empty_compute_features_is_an_error_not_a_skip(): + """If compute_features returns empty for a candidate symbol, + something upstream is broken — report as error, not silent skip. + + Silent skips on migration are especially dangerous: user thinks the + symbol was handled, next daily_append run errors anyway. + """ + src = _source() + assert 'compute_features empty' in src, ( + "Empty compute_features result must surface as an explicit error " + "reason, per feedback_no_silent_fails." + ) + + +def test_dry_run_does_not_write(): + """--dry-run must never call lib.write(). The mode exists exactly to + let a reviewer see the promotion plan before committing.""" + src = _source() + lines = src.splitlines() + # Find the write() line and confirm it sits inside an `if not dry_run:` + # block (string-level check — we can't execute without ArcticDB). + for i, line in enumerate(lines): + if "universe_lib.write(ticker, out)" in line: + prior = "\n".join(lines[max(0, i - 6):i]) + assert "if not dry_run" in prior, ( + f"Line {i+1}: universe_lib.write() call not guarded by " + f"`if not dry_run:` — dry-run would still write." + ) + break + else: + raise AssertionError("universe_lib.write() call site not found") + + +def test_partial_coverage_is_logged_loudly(): + """Symbols promoted with NaN features (short history) must emit a + structured ``partial-features ticker=X nan=N/total features=[...]`` + log, matching daily_append's convention.""" + src = _source() + assert "partial-features ticker=" in src, ( + "Missing the structured partial-features log — silent partial " + "coverage on migration is forbidden." + )