From 8471f5a6a1fd7b4a8867e31a13c28b7a23024f63 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Sun, 10 May 2026 11:33:25 -0700 Subject: [PATCH 1/2] feat(signal_returns): write calibrator-v1 context on score_performance seed + backfill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root-cause closure for the 2026-05-09 Saturday SF evaluator P0 (weight_optimizer ERROR: "None of [Index(['quant_score','qual_score'])] are in the [columns]"; auto-rollback Sharpe -42.2% vs baseline). Producer audit revealed two parallel writers diverged silently after research migration #12 (2026-05-08): * scoring/performance_tracker.py::record_new_buy_scores writes ALL 5 canonical context columns — but has zero production callers. * collectors/signal_returns.py::_seed_score_performance is the actual production writer (runs weekly in DataPhase1) and only wrote (symbol, score_date, score, price_on_date). The 5 canonical columns (quant_score, qual_score, conviction, sector_modifier, market_regime) were never populated. Single-fact-single-writer rebuild: * _seed_score_performance now extracts the 5 context fields from the same signals.json payload that drives the BUY filter — single source-of-truth fetch per signals.json, no second round-trip. * New _backfill_score_context repairs legacy rows whose canonical columns are NULL. UPDATE-WHERE-NULL so re-runs are no-ops once every row has a source. * _ensure_score_performance_schema mirrors research migration #12 defensively in case DataPhase1 ever fires against a fresh research.db before research's cold-start migrations run. Composes with backtester #176 (PR-day consumer-side coalesce fix). With this PR the producer becomes authoritative; the next backtester PR can retire the S3 round-trip in weight_optimizer.load_with_subscores. Co-Authored-By: Claude Opus 4.7 (1M context) --- collectors/signal_returns.py | 198 ++++++++++++- tests/test_signal_returns_score_context.py | 312 +++++++++++++++++++++ 2 files changed, 502 insertions(+), 8 deletions(-) create mode 100644 tests/test_signal_returns_score_context.py diff --git a/collectors/signal_returns.py b/collectors/signal_returns.py index ec67910..a1a04e2 100644 --- a/collectors/signal_returns.py +++ b/collectors/signal_returns.py @@ -72,6 +72,13 @@ def collect( s3, bucket, db_path, signals_prefix, dry_run, ) + # Step 1b: Backfill calibrator-v1 context for any legacy rows whose + # canonical columns are NULL (rows seeded before this collector + # learned to write them). UPDATE-WHERE-NULL so re-runs are no-ops. + results["backfill_score_context"] = _backfill_score_context( + s3, bucket, db_path, signals_prefix, dry_run, + ) + # Step 2: Backfill score_performance returns via universe_returns JOIN results["backfill_score_returns"] = _backfill_score_returns(db_path, dry_run) @@ -105,10 +112,42 @@ def collect( # ── Step 1: Seed score_performance ──────────────────────────────────────────── +def _extract_signal_context(payload: dict, ticker: str) -> dict: + """Pull calibrator-v1 context fields for a ticker from a signals.json + payload. Mirrors the extraction logic in alpha-engine-research's + scripts/backfill_calibrator_v1_context.py so the producer-side seed + and the legacy backfill agree on every field's source. + + Returns a dict with all 5 keys; values may be None when the source + payload omits them (older schemas, partial outputs). + """ + sigs = payload.get("signals") or {} + sig = sigs.get(ticker) or {} + sector = sig.get("sector") + sector_modifiers = payload.get("sector_modifiers") or {} + return { + "quant_score": sig.get("quant_score"), + "qual_score": sig.get("qual_score"), + "conviction": sig.get("conviction"), + "sector_modifier": sector_modifiers.get(sector) if sector else None, + "market_regime": payload.get("market_regime"), + } + + def _seed_score_performance( s3, bucket: str, db_path: str, signals_prefix: str, dry_run: bool, ) -> dict: - """Insert BUY-rated signals into score_performance with entry prices from universe_returns.""" + """Insert BUY-rated signals into score_performance with entry prices + from universe_returns. Includes the 5 calibrator-v1 context columns + (quant_score, qual_score, conviction, sector_modifier, market_regime) + on initial INSERT — values come from the same signals.json payload + that drives the BUY filter, so no second round-trip is needed. + + Pre-2026-05-10 seed inserts wrote only (symbol, score_date, score, + price_on_date) and left the canonical columns NULL — that bug is the + root cause behind the 2026-05-09 evaluator weight_optimizer ERROR. + Existing NULL rows get repaired by ``_backfill_score_context``. + """ try: conn = sqlite3.connect(db_path) _ensure_score_performance_schema(conn) @@ -121,7 +160,10 @@ def _seed_score_performance( # List signal dates from S3 signal_dates = _list_signal_dates(s3, bucket, signals_prefix) - rows_to_insert = [] + # rows_to_insert carries (ticker, sig_date, score, context_dict) so + # the canonical context is captured at the same point the BUY + # filter runs — single source of truth per signals.json payload. + rows_to_insert: list[tuple[str, str, float, dict]] = [] for sig_date in signal_dates: try: obj = s3.get_object(Bucket=bucket, Key=f"{signals_prefix}/{sig_date}/signals.json") @@ -135,7 +177,9 @@ def _seed_score_performance( rating = stock.get("rating", "") if not ticker or rating != "BUY" or (ticker, sig_date) in existing: continue - rows_to_insert.append((ticker, sig_date, score)) + rows_to_insert.append( + (ticker, sig_date, score, _extract_signal_context(signals, ticker)) + ) # v1 format fallback sigs = signals.get("signals", {}) @@ -145,7 +189,9 @@ def _seed_score_performance( rating = s.get("rating", "") if rating != "BUY" or (ticker, sig_date) in existing: continue - rows_to_insert.append((ticker, sig_date, score)) + rows_to_insert.append( + (ticker, sig_date, score, _extract_signal_context(signals, ticker)) + ) if not rows_to_insert: conn.close() @@ -153,7 +199,7 @@ def _seed_score_performance( # Get entry prices from universe_returns (already in the DB) inserted = 0 - for ticker, sig_date, score in rows_to_insert: + for ticker, sig_date, score, ctx in rows_to_insert: if score is None: continue # Look up entry price from universe_returns @@ -167,8 +213,20 @@ def _seed_score_performance( if not dry_run: conn.execute( - "INSERT OR IGNORE INTO score_performance (symbol, score_date, score, price_on_date) VALUES (?, ?, ?, ?)", - (ticker, sig_date, round(float(score), 2), round(price, 2)), + """ + INSERT OR IGNORE INTO score_performance ( + symbol, score_date, score, price_on_date, + quant_score, qual_score, conviction, + sector_modifier, market_regime + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + ticker, sig_date, + round(float(score), 2), round(price, 2), + ctx["quant_score"], ctx["qual_score"], + ctx["conviction"], ctx["sector_modifier"], + ctx["market_regime"], + ), ) inserted += 1 @@ -185,6 +243,116 @@ def _seed_score_performance( return {"status": "error", "error": str(e), "rows_written": 0} +def _backfill_score_context( + s3, bucket: str, db_path: str, signals_prefix: str, dry_run: bool, +) -> dict: + """Repair existing score_performance rows whose calibrator-v1 context + columns are NULL (rows seeded before _seed_score_performance learned + to write them). + + Groups NULL-bearing rows by score_date so each signals.json fetch + serves all symbols for that date. UPDATE-WHERE-NULL means re-runs are + no-ops once the population converges. + """ + try: + conn = sqlite3.connect(db_path) + _ensure_score_performance_schema(conn) + + rows = conn.execute( + """ + SELECT symbol, score_date + FROM score_performance + WHERE quant_score IS NULL + OR qual_score IS NULL + OR conviction IS NULL + OR sector_modifier IS NULL + OR market_regime IS NULL + ORDER BY score_date, symbol + """ + ).fetchall() + + if not rows: + conn.close() + return {"status": "ok", "rows_written": 0, "note": "no NULL context rows"} + + by_date: dict[str, list[str]] = {} + for symbol, score_date in rows: + by_date.setdefault(score_date, []).append(symbol) + + updated = 0 + for sig_date, symbols in by_date.items(): + try: + obj = s3.get_object( + Bucket=bucket, + Key=f"{signals_prefix}/{sig_date}/signals.json", + ) + payload = json.loads(obj["Body"].read()) + except (ClientError, json.JSONDecodeError): + continue + + for symbol in symbols: + ctx = _extract_signal_context(payload, symbol) + # Skip if signals.json has no useful context for this symbol + # (rare — typically only legacy archive shapes). + if all(v is None for v in ctx.values()): + continue + + cur = conn.execute( + """ + SELECT quant_score, qual_score, conviction, + sector_modifier, market_regime + FROM score_performance + WHERE symbol = ? AND score_date = ? + """, + (symbol, sig_date), + ).fetchone() + if cur is None: + continue + + cur_q, cur_qu, cur_c, cur_s, cur_r = cur + updates: list[tuple[str, object]] = [] + if cur_q is None and ctx["quant_score"] is not None: + updates.append(("quant_score", ctx["quant_score"])) + if cur_qu is None and ctx["qual_score"] is not None: + updates.append(("qual_score", ctx["qual_score"])) + if cur_c is None and ctx["conviction"] is not None: + updates.append(("conviction", ctx["conviction"])) + if cur_s is None and ctx["sector_modifier"] is not None: + updates.append(("sector_modifier", ctx["sector_modifier"])) + if cur_r is None and ctx["market_regime"] is not None: + updates.append(("market_regime", ctx["market_regime"])) + + if not updates: + continue + if dry_run: + updated += 1 + continue + + set_clause = ", ".join(f"{col} = ?" for col, _ in updates) + values = [v for _, v in updates] + [symbol, sig_date] + conn.execute( + f"UPDATE score_performance SET {set_clause} " + f"WHERE symbol = ? AND score_date = ?", + values, + ) + updated += 1 + + if not dry_run: + conn.commit() + conn.close() + + if updated: + logger.info( + "Backfilled calibrator-v1 context on %d score_performance rows", + updated, + ) + return {"status": "ok", "rows_written": updated} + + except Exception as e: + logger.error("backfill_score_context failed: %s", e) + return {"status": "error", "error": str(e), "rows_written": 0} + + # ── Step 2: Backfill score_performance returns ──────────────────────────────── @@ -452,7 +620,17 @@ def _backfill_predictor_returns( def _ensure_score_performance_schema(conn) -> None: - """Add 5d return columns to score_performance if they don't exist yet.""" + """Add forward-return + calibrator-v1 context columns to + score_performance if they don't exist yet. + + Belt-and-suspenders with alpha-engine-research's archive/schema.py + migrations (v11 forward-returns, v12 calibrator-v1 context). The + Saturday SF runs DataPhase1 first; if it ever fires against a fresh + research.db before the research Lambda has cold-started and applied + its migrations, this ensures the seed/backfill INSERT/UPDATE + statements still target valid columns. Idempotent — each ALTER is + skipped when the column already exists. + """ cols = {r[1] for r in conn.execute("PRAGMA table_info(score_performance)").fetchall()} for col, col_type in [ ("price_5d", "REAL"), ("return_5d", "REAL"), ("spy_5d_return", "REAL"), @@ -461,6 +639,10 @@ def _ensure_score_performance_schema(conn) -> None: ("beat_spy_10d", "INTEGER"), ("eval_date_10d", "TEXT"), ("price_30d", "REAL"), ("return_30d", "REAL"), ("spy_30d_return", "REAL"), ("beat_spy_30d", "INTEGER"), ("eval_date_30d", "TEXT"), + # Calibrator-v1 context (alpha-engine-research migration #12) + ("quant_score", "REAL"), ("qual_score", "REAL"), + ("conviction", "TEXT"), ("sector_modifier", "REAL"), + ("market_regime", "TEXT"), ]: if col not in cols: conn.execute(f"ALTER TABLE score_performance ADD COLUMN {col} {col_type}") diff --git a/tests/test_signal_returns_score_context.py b/tests/test_signal_returns_score_context.py new file mode 100644 index 0000000..4651241 --- /dev/null +++ b/tests/test_signal_returns_score_context.py @@ -0,0 +1,312 @@ +"""Tests for score_performance calibrator-v1 context seeding + backfill. + +Covers the 2026-05-10 producer-side fix: `_seed_score_performance` was +inserting only `(symbol, score_date, score, price_on_date)` and leaving +the 5 canonical context columns (quant_score, qual_score, conviction, +sector_modifier, market_regime) NULL. Saturday 2026-05-09's evaluator +tripped on this when weight_optimizer's downstream lookup expected those +columns post research migration #12. + +These tests pin two contracts: + - Initial INSERT carries all 5 canonical context fields, sourced from + the same signals.json payload that drives the BUY filter. + - `_backfill_score_context` repairs legacy rows that were seeded + before the producer learned to write them. UPDATE-WHERE-NULL means + re-runs converge to a no-op once every row has at least one source. +""" +from __future__ import annotations + +import json +import sqlite3 +import tempfile +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from collectors.signal_returns import ( + _backfill_score_context, + _ensure_score_performance_schema, + _extract_signal_context, + _seed_score_performance, +) + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +@pytest.fixture +def tmp_db(tmp_path: Path) -> str: + db = tmp_path / "research.db" + with sqlite3.connect(db) as conn: + conn.execute( + """ + CREATE TABLE score_performance ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + symbol TEXT NOT NULL, + score_date TEXT NOT NULL, + score REAL, + price_on_date REAL, + UNIQUE(symbol, score_date) + ) + """ + ) + conn.execute( + """ + CREATE TABLE universe_returns ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT NOT NULL, + eval_date TEXT NOT NULL, + close_price REAL, + UNIQUE(ticker, eval_date) + ) + """ + ) + conn.commit() + return str(db) + + +def _signals_payload() -> dict: + """Representative signals.json with sub_scores + context populated.""" + return { + "date": "2026-05-01", + "market_regime": "bull", + "sector_modifiers": {"Technology": 1.05, "Healthcare": 0.95}, + "signals": { + "AAPL": { + "score": 78.0, "rating": "BUY", "sector": "Technology", + "quant_score": 80.0, "qual_score": 72.0, "conviction": "rising", + }, + "MSFT": { + "score": 70.0, "rating": "BUY", "sector": "Technology", + "quant_score": 68.0, "qual_score": 71.0, "conviction": "stable", + }, + "JNJ": { + "score": 65.0, "rating": "HOLD", "sector": "Healthcare", + # HOLD — should be skipped by BUY filter + "quant_score": 60.0, "qual_score": 65.0, "conviction": "stable", + }, + "PFE": { + "score": 76.0, "rating": "BUY", "sector": "Healthcare", + "quant_score": 75.0, "qual_score": 78.0, "conviction": "declining", + }, + }, + } + + +def _mock_s3_for(payload: dict) -> MagicMock: + body = MagicMock() + body.read.return_value = json.dumps(payload).encode() + s3 = MagicMock() + s3.get_object.return_value = {"Body": body} + # Single date in the listing + page = {"CommonPrefixes": [{"Prefix": "signals/2026-05-01/"}]} + paginator = MagicMock() + paginator.paginate.return_value = [page] + s3.get_paginator.return_value = paginator + return s3 + + +def _seed_universe_close(db: str, ticker: str, eval_date: str, close: float) -> None: + with sqlite3.connect(db) as conn: + conn.execute( + "INSERT INTO universe_returns (ticker, eval_date, close_price) VALUES (?, ?, ?)", + (ticker, eval_date, close), + ) + conn.commit() + + +# ── _extract_signal_context ─────────────────────────────────────────────────── + + +class TestExtractSignalContext: + + def test_resolves_all_five_fields(self): + payload = _signals_payload() + ctx = _extract_signal_context(payload, "AAPL") + assert ctx == { + "quant_score": 80.0, + "qual_score": 72.0, + "conviction": "rising", + "sector_modifier": 1.05, + "market_regime": "bull", + } + + def test_unknown_ticker_returns_all_none(self): + ctx = _extract_signal_context(_signals_payload(), "NVDA") + assert ctx == { + "quant_score": None, "qual_score": None, "conviction": None, + "sector_modifier": None, "market_regime": "bull", # market_regime is payload-level + } + + def test_missing_sector_modifier_when_sector_absent(self): + payload = { + "market_regime": "neutral", + "sector_modifiers": {"Technology": 1.10}, + "signals": { + "AAPL": {"quant_score": 70, "qual_score": 60}, # no sector + }, + } + ctx = _extract_signal_context(payload, "AAPL") + assert ctx["sector_modifier"] is None + assert ctx["market_regime"] == "neutral" + + +# ── _seed_score_performance — initial INSERT carries canonical context ────── + + +class TestSeedScorePerformanceCanonicalInsert: + + def test_buy_rows_get_canonical_context_on_insert(self, tmp_db): + _seed_universe_close(tmp_db, "AAPL", "2026-05-01", 205.50) + _seed_universe_close(tmp_db, "MSFT", "2026-05-01", 430.25) + _seed_universe_close(tmp_db, "PFE", "2026-05-01", 28.10) + + s3 = _mock_s3_for(_signals_payload()) + out = _seed_score_performance(s3, "bucket", tmp_db, "signals", dry_run=False) + assert out["status"] == "ok" + assert out["rows_written"] == 3 # AAPL + MSFT + PFE; JNJ is HOLD + + with sqlite3.connect(tmp_db) as conn: + rows = conn.execute( + "SELECT symbol, quant_score, qual_score, conviction, " + "sector_modifier, market_regime FROM score_performance " + "ORDER BY symbol" + ).fetchall() + by_sym = {r[0]: r[1:] for r in rows} + assert by_sym["AAPL"] == (80.0, 72.0, "rising", 1.05, "bull") + assert by_sym["MSFT"] == (68.0, 71.0, "stable", 1.05, "bull") + assert by_sym["PFE"] == (75.0, 78.0, "declining", 0.95, "bull") + + def test_skips_hold_rated_rows(self, tmp_db): + _seed_universe_close(tmp_db, "JNJ", "2026-05-01", 152.00) + s3 = _mock_s3_for(_signals_payload()) + _seed_score_performance(s3, "bucket", tmp_db, "signals", dry_run=False) + with sqlite3.connect(tmp_db) as conn: + assert conn.execute( + "SELECT COUNT(*) FROM score_performance WHERE symbol='JNJ'" + ).fetchone()[0] == 0 + + def test_existing_rows_are_not_reseeded(self, tmp_db): + """INSERT OR IGNORE means a re-run doesn't overwrite. Canonical + context backfill is a separate step (_backfill_score_context).""" + _seed_universe_close(tmp_db, "AAPL", "2026-05-01", 205.50) + with sqlite3.connect(tmp_db) as conn: + conn.execute( + "INSERT INTO score_performance (symbol, score_date, score, price_on_date) " + "VALUES ('AAPL', '2026-05-01', 78.0, 205.50)" + ) + conn.commit() + + s3 = _mock_s3_for(_signals_payload()) + out = _seed_score_performance(s3, "bucket", tmp_db, "signals", dry_run=False) + # Pre-existing row filtered by `existing` set; seeder reports 0 written. + assert out["rows_written"] == 0 + + with sqlite3.connect(tmp_db) as conn: + row = conn.execute( + "SELECT quant_score, qual_score FROM score_performance WHERE symbol='AAPL'" + ).fetchone() + # Still NULL — that's what _backfill_score_context exists to repair. + assert row == (None, None) + + +# ── _backfill_score_context — UPDATE-WHERE-NULL repair for legacy rows ────── + + +class TestBackfillScoreContext: + + def test_repairs_legacy_null_rows(self, tmp_db): + """Rows seeded before the canonical-context fix should pick up + all 5 fields on backfill.""" + with sqlite3.connect(tmp_db) as conn: + # Add the canonical columns (mirrors prior schema-ensure run) + _ensure_score_performance_schema(conn) + conn.execute( + "INSERT INTO score_performance (symbol, score_date, score, price_on_date) " + "VALUES ('AAPL', '2026-05-01', 78.0, 205.50)" + ) + conn.commit() + + s3 = _mock_s3_for(_signals_payload()) + out = _backfill_score_context(s3, "bucket", tmp_db, "signals", dry_run=False) + assert out["status"] == "ok" + assert out["rows_written"] == 1 + + with sqlite3.connect(tmp_db) as conn: + row = conn.execute( + "SELECT quant_score, qual_score, conviction, " + "sector_modifier, market_regime " + "FROM score_performance WHERE symbol='AAPL'" + ).fetchone() + assert row == (80.0, 72.0, "rising", 1.05, "bull") + + def test_rerun_is_noop_once_populated(self, tmp_db): + """Repeat invocations after backfill should converge to 0 updates.""" + _seed_universe_close(tmp_db, "AAPL", "2026-05-01", 205.50) + s3 = _mock_s3_for(_signals_payload()) + _seed_score_performance(s3, "bucket", tmp_db, "signals", dry_run=False) + out = _backfill_score_context(s3, "bucket", tmp_db, "signals", dry_run=False) + assert out["rows_written"] == 0 + assert "no NULL context rows" in (out.get("note") or "") + + def test_dry_run_does_not_persist(self, tmp_db): + with sqlite3.connect(tmp_db) as conn: + _ensure_score_performance_schema(conn) + conn.execute( + "INSERT INTO score_performance (symbol, score_date, score, price_on_date) " + "VALUES ('AAPL', '2026-05-01', 78.0, 205.50)" + ) + conn.commit() + + s3 = _mock_s3_for(_signals_payload()) + out = _backfill_score_context(s3, "bucket", tmp_db, "signals", dry_run=True) + assert out["rows_written"] == 1 + + with sqlite3.connect(tmp_db) as conn: + row = conn.execute( + "SELECT quant_score FROM score_performance WHERE symbol='AAPL'" + ).fetchone() + assert row[0] is None # not actually written + + def test_partial_null_only_fills_missing(self, tmp_db): + """A row that already has quant_score should keep it; only NULL + fields get backfilled.""" + with sqlite3.connect(tmp_db) as conn: + _ensure_score_performance_schema(conn) + conn.execute( + "INSERT INTO score_performance " + "(symbol, score_date, score, price_on_date, quant_score, conviction) " + "VALUES ('AAPL', '2026-05-01', 78.0, 205.50, 99.0, 'manual')" + ) + conn.commit() + + s3 = _mock_s3_for(_signals_payload()) + _backfill_score_context(s3, "bucket", tmp_db, "signals", dry_run=False) + + with sqlite3.connect(tmp_db) as conn: + row = conn.execute( + "SELECT quant_score, qual_score, conviction, " + "sector_modifier, market_regime " + "FROM score_performance WHERE symbol='AAPL'" + ).fetchone() + # Pre-existing values preserved; NULLs filled. + assert row == (99.0, 72.0, "manual", 1.05, "bull") + + +# ── Schema-ensure mirrors migration #12 ────────────────────────────────────── + + +class TestEnsureScorePerformanceSchema: + + def test_adds_canonical_columns_idempotently(self, tmp_db): + with sqlite3.connect(tmp_db) as conn: + _ensure_score_performance_schema(conn) + _ensure_score_performance_schema(conn) # second call is a no-op + cols = {r[1] for r in conn.execute( + "PRAGMA table_info(score_performance)" + ).fetchall()} + + for canonical_col in ("quant_score", "qual_score", "conviction", + "sector_modifier", "market_regime"): + assert canonical_col in cols, f"missing {canonical_col}" From 0c0983584ba0b0875a01043ccfdc2ceeb9e0d793 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Sun, 10 May 2026 11:36:59 -0700 Subject: [PATCH 2/2] =?UTF-8?q?feat(signal=5Freturns):=20drift=20gate=20?= =?UTF-8?q?=E2=80=94=20canonical=20context=20coverage=20CW=20gauge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Locks the producer-side contract established in the previous commit: after seed + backfill complete, query score_performance for rows with score_date >= 2026-05-17 (first Sat SF after this PR merges) and emit the coverage percentage as a CloudWatch gauge: AlphaEngine/Data/score_performance_canonical_coverage_pct Coverage = fraction of post-cutover rows with ALL 5 canonical context columns populated (quant_score, qual_score, conviction, sector_modifier, market_regime). 100% is the contract; the gauge is always emitted (including 100.0) so alarm baselines stay continuous. Mirrors the chronic-gap drift detection pattern at weekly_collector.py:_check_chronic_gap_polygon_recovery — same best-effort emit, same observability-not-load-bearing posture. A follow-up alpha-engine-lib transparency_inventory entry can wire this into the substrate health alarm if desired; the metric itself is the drift signal. Tripwire test asserts _CANONICAL_CONTEXT_COLUMNS stays in lockstep with the seed INSERT — adding a 6th column to one without the other would make the drift gate blind to that field. Co-Authored-By: Claude Opus 4.7 (1M context) --- collectors/signal_returns.py | 119 +++++++++++++++++++++ tests/test_signal_returns_score_context.py | 119 +++++++++++++++++++++ 2 files changed, 238 insertions(+) diff --git a/collectors/signal_returns.py b/collectors/signal_returns.py index a1a04e2..b73b7fe 100644 --- a/collectors/signal_returns.py +++ b/collectors/signal_returns.py @@ -82,6 +82,13 @@ def collect( # Step 2: Backfill score_performance returns via universe_returns JOIN results["backfill_score_returns"] = _backfill_score_returns(db_path, dry_run) + # Step 2b: Drift gate — emit canonical-context coverage as a CW gauge + # so an alarm fires if the producer ever regresses (e.g. signals.json + # shape drift, seed-path bug, schema migration skew). Closes the loop + # on the 2026-05-09 producer-side bug class. + if not dry_run: + results["context_coverage_drift"] = _emit_context_coverage_metric(db_path) + # Step 3: Seed predictor_outcomes results["seed_predictor_outcomes"] = _seed_predictor_outcomes( s3, bucket, db_path, dry_run, @@ -671,6 +678,118 @@ def _ensure_predictor_outcomes_schema(conn) -> None: conn.commit() +# Canonical context columns the producer-side seed must populate. +# Source of truth for the drift-gate query; mirrors the columns added by +# alpha-engine-research migration #12 (calibrator-v1 context). +_CANONICAL_CONTEXT_COLUMNS = ( + "quant_score", + "qual_score", + "conviction", + "sector_modifier", + "market_regime", +) + +# Effective date for the drift gate — first Saturday SF run AFTER this PR +# merges. Rows with `score_date >= _DRIFT_EFFECTIVE_DATE` are counted +# against the producer's coverage contract. Pre-cutover rows are excluded +# so the gate isn't polluted by legacy NULLs the backfill step is still +# catching up on. +_DRIFT_EFFECTIVE_DATE = "2026-05-17" + + +def _emit_context_coverage_metric(db_path: str) -> dict: + """Drift-detection CloudWatch gauge for the canonical-context contract. + + Producer-side: after seed + backfill complete, query score_performance + for rows with score_date >= _DRIFT_EFFECTIVE_DATE and compute the + percentage that have ALL 5 canonical context columns populated + (non-NULL). Emit as ``AlphaEngine/Data/score_performance_canonical_coverage_pct`` + so an alarm can fire if the percentage drops below the contract + threshold for any cycle. + + Always emits (including 100.0) so alarm baselines are continuous; + CloudWatch missing-data is harder to alarm against than a steady + series. Best-effort: read / metric-emit errors log a warning but + never raise — this is observability, not a load-bearing path. + + Mirrors the chronic-gap drift detection pattern at + weekly_collector.py:_check_chronic_gap_polygon_recovery. + """ + summary: dict = {"status": "ok"} + try: + conn = sqlite3.connect(db_path) + try: + total = conn.execute( + "SELECT COUNT(*) FROM score_performance WHERE score_date >= ?", + (_DRIFT_EFFECTIVE_DATE,), + ).fetchone()[0] + if total == 0: + # Pre-cutover, or no new data this cycle. Coverage is + # undefined; report 100.0 so the alarm doesn't fire on a + # legitimately-empty window. + summary.update( + rows_post_cutoff=0, + rows_fully_populated=0, + coverage_pct=100.0, + note="no rows past effective_date — coverage undefined", + ) + else: + null_clause = " OR ".join( + f"{col} IS NULL" for col in _CANONICAL_CONTEXT_COLUMNS + ) + nulls = conn.execute( + f"SELECT COUNT(*) FROM score_performance " + f"WHERE score_date >= ? AND ({null_clause})", + (_DRIFT_EFFECTIVE_DATE,), + ).fetchone()[0] + populated = total - nulls + coverage_pct = (populated / total) * 100.0 + summary.update( + rows_post_cutoff=total, + rows_fully_populated=populated, + coverage_pct=round(coverage_pct, 2), + ) + finally: + conn.close() + except Exception as exc: + logger.warning( + "context_coverage_metric: DB read failed — drift alarm " + "cadence may degrade until next cycle. %s", exc, + ) + summary["status"] = "skipped" + summary["error"] = str(exc) + return summary + + try: + cw = boto3.client("cloudwatch") + cw.put_metric_data( + Namespace="AlphaEngine/Data", + MetricData=[{ + "MetricName": "score_performance_canonical_coverage_pct", + "Value": float(summary["coverage_pct"]), + "Unit": "Percent", + }], + ) + except Exception as exc: + logger.warning( + "score_performance_canonical_coverage_pct metric emit failed: " + "%s — drift alarm cadence may degrade until next cycle.", exc, + ) + summary["status"] = "skipped" + summary["error"] = str(exc) + + if summary["status"] == "ok" and summary.get("coverage_pct", 100.0) < 100.0: + logger.warning( + "Canonical context coverage drift: %.2f%% (%d/%d post-%s rows " + "fully populated). Expected 100%%. Producer-side regression — " + "investigate _seed_score_performance / signals.json shape.", + summary["coverage_pct"], summary["rows_fully_populated"], + summary["rows_post_cutoff"], _DRIFT_EFFECTIVE_DATE, + ) + + return summary + + def _list_signal_dates(s3, bucket: str, prefix: str) -> list[str]: """List all signal dates from S3.""" dates = [] diff --git a/tests/test_signal_returns_score_context.py b/tests/test_signal_returns_score_context.py index 4651241..14d7a4d 100644 --- a/tests/test_signal_returns_score_context.py +++ b/tests/test_signal_returns_score_context.py @@ -25,7 +25,10 @@ import pytest from collectors.signal_returns import ( + _CANONICAL_CONTEXT_COLUMNS, + _DRIFT_EFFECTIVE_DATE, _backfill_score_context, + _emit_context_coverage_metric, _ensure_score_performance_schema, _extract_signal_context, _seed_score_performance, @@ -310,3 +313,119 @@ def test_adds_canonical_columns_idempotently(self, tmp_db): for canonical_col in ("quant_score", "qual_score", "conviction", "sector_modifier", "market_regime"): assert canonical_col in cols, f"missing {canonical_col}" + + +# ── _emit_context_coverage_metric — producer-side drift gate ───────────────── + + +class TestEmitContextCoverageMetric: + """The CW gauge AlphaEngine/Data/score_performance_canonical_coverage_pct + is the runtime drift detector. Contract: every row written with + score_date >= _DRIFT_EFFECTIVE_DATE must have ALL 5 canonical context + columns populated. Coverage_pct drops below 100 → alarm fires.""" + + def _seed_row(self, db, symbol, score_date, **extras): + with sqlite3.connect(db) as conn: + _ensure_score_performance_schema(conn) + cols = ["symbol", "score_date", "score", "price_on_date", *extras.keys()] + vals = [symbol, score_date, 78.0, 200.0, *extras.values()] + placeholders = ", ".join("?" for _ in cols) + conn.execute( + f"INSERT INTO score_performance ({', '.join(cols)}) VALUES ({placeholders})", + vals, + ) + conn.commit() + + def test_full_coverage_emits_100(self, tmp_db, monkeypatch): + ctx = dict(quant_score=80.0, qual_score=72.0, conviction="rising", + sector_modifier=1.05, market_regime="bull") + self._seed_row(tmp_db, "AAPL", _DRIFT_EFFECTIVE_DATE, **ctx) + self._seed_row(tmp_db, "MSFT", _DRIFT_EFFECTIVE_DATE, **ctx) + + cw = MagicMock() + monkeypatch.setattr("collectors.signal_returns.boto3.client", + lambda svc: cw if svc == "cloudwatch" else MagicMock()) + + out = _emit_context_coverage_metric(tmp_db) + assert out["status"] == "ok" + assert out["coverage_pct"] == 100.0 + assert out["rows_post_cutoff"] == 2 + assert out["rows_fully_populated"] == 2 + + cw.put_metric_data.assert_called_once() + call = cw.put_metric_data.call_args + assert call.kwargs["Namespace"] == "AlphaEngine/Data" + metric = call.kwargs["MetricData"][0] + assert metric["MetricName"] == "score_performance_canonical_coverage_pct" + assert metric["Value"] == 100.0 + assert metric["Unit"] == "Percent" + + def test_partial_coverage_emits_below_100(self, tmp_db, monkeypatch): + full_ctx = dict(quant_score=80.0, qual_score=72.0, conviction="rising", + sector_modifier=1.05, market_regime="bull") + partial_ctx = dict(quant_score=80.0) # missing 4 fields + self._seed_row(tmp_db, "AAPL", _DRIFT_EFFECTIVE_DATE, **full_ctx) + self._seed_row(tmp_db, "MSFT", _DRIFT_EFFECTIVE_DATE, **partial_ctx) + + cw = MagicMock() + monkeypatch.setattr("collectors.signal_returns.boto3.client", + lambda svc: cw if svc == "cloudwatch" else MagicMock()) + + out = _emit_context_coverage_metric(tmp_db) + assert out["coverage_pct"] == 50.0 + assert out["rows_post_cutoff"] == 2 + assert out["rows_fully_populated"] == 1 + + def test_pre_cutover_rows_excluded_from_gate(self, tmp_db, monkeypatch): + """Legacy NULL rows seeded before the producer fix must not pollute + the drift gauge — the gauge is forward-looking.""" + # Row pre-effective-date with all NULLs — must not count against coverage + self._seed_row(tmp_db, "AAPL", "2026-04-01") # no canonical kwargs + # Row post-effective-date with full context + full_ctx = dict(quant_score=80.0, qual_score=72.0, conviction="rising", + sector_modifier=1.05, market_regime="bull") + self._seed_row(tmp_db, "MSFT", _DRIFT_EFFECTIVE_DATE, **full_ctx) + + cw = MagicMock() + monkeypatch.setattr("collectors.signal_returns.boto3.client", + lambda svc: cw if svc == "cloudwatch" else MagicMock()) + + out = _emit_context_coverage_metric(tmp_db) + assert out["rows_post_cutoff"] == 1 + assert out["coverage_pct"] == 100.0 + + def test_empty_post_cutoff_reports_100(self, tmp_db, monkeypatch): + """No rows past effective_date — coverage undefined, report 100 + so the alarm doesn't fire on a legitimately-empty window.""" + cw = MagicMock() + monkeypatch.setattr("collectors.signal_returns.boto3.client", + lambda svc: cw if svc == "cloudwatch" else MagicMock()) + + out = _emit_context_coverage_metric(tmp_db) + assert out["rows_post_cutoff"] == 0 + assert out["coverage_pct"] == 100.0 + assert "no rows past effective_date" in out["note"] + + def test_metric_emit_failure_is_non_fatal(self, tmp_db, monkeypatch): + """CW throttling / network errors must not break the collector.""" + full_ctx = dict(quant_score=80.0, qual_score=72.0, conviction="rising", + sector_modifier=1.05, market_regime="bull") + self._seed_row(tmp_db, "AAPL", _DRIFT_EFFECTIVE_DATE, **full_ctx) + + cw = MagicMock() + cw.put_metric_data.side_effect = RuntimeError("CW throttled") + monkeypatch.setattr("collectors.signal_returns.boto3.client", + lambda svc: cw if svc == "cloudwatch" else MagicMock()) + + out = _emit_context_coverage_metric(tmp_db) + assert out["status"] == "skipped" + assert "CW throttled" in out["error"] + + def test_canonical_columns_constant_matches_seed_insert(self): + """Tripwire: if someone adds a 6th canonical column to the seed + INSERT they must also add it here, or the drift gate becomes + blind to that field's NULLs.""" + assert set(_CANONICAL_CONTEXT_COLUMNS) == { + "quant_score", "qual_score", "conviction", + "sector_modifier", "market_regime", + }