Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 309 additions & 8 deletions collectors/signal_returns.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,23 @@ def collect(
s3, bucket, db_path, signals_prefix, dry_run,
)

# Step 1b: Backfill calibrator-v1 context for any legacy rows whose
# canonical columns are NULL (rows seeded before this collector
# learned to write them). UPDATE-WHERE-NULL so re-runs are no-ops.
results["backfill_score_context"] = _backfill_score_context(
s3, bucket, db_path, signals_prefix, dry_run,
)

# Step 2: Backfill score_performance returns via universe_returns JOIN
results["backfill_score_returns"] = _backfill_score_returns(db_path, dry_run)

# Step 2b: Drift gate — emit canonical-context coverage as a CW gauge
# so an alarm fires if the producer ever regresses (e.g. signals.json
# shape drift, seed-path bug, schema migration skew). Closes the loop
# on the 2026-05-09 producer-side bug class.
if not dry_run:
results["context_coverage_drift"] = _emit_context_coverage_metric(db_path)

# Step 3: Seed predictor_outcomes
results["seed_predictor_outcomes"] = _seed_predictor_outcomes(
s3, bucket, db_path, dry_run,
Expand Down Expand Up @@ -105,10 +119,42 @@ def collect(
# ── Step 1: Seed score_performance ────────────────────────────────────────────


def _extract_signal_context(payload: dict, ticker: str) -> dict:
"""Pull calibrator-v1 context fields for a ticker from a signals.json
payload. Mirrors the extraction logic in alpha-engine-research's
scripts/backfill_calibrator_v1_context.py so the producer-side seed
and the legacy backfill agree on every field's source.

Returns a dict with all 5 keys; values may be None when the source
payload omits them (older schemas, partial outputs).
"""
sigs = payload.get("signals") or {}
sig = sigs.get(ticker) or {}
sector = sig.get("sector")
sector_modifiers = payload.get("sector_modifiers") or {}
return {
"quant_score": sig.get("quant_score"),
"qual_score": sig.get("qual_score"),
"conviction": sig.get("conviction"),
"sector_modifier": sector_modifiers.get(sector) if sector else None,
"market_regime": payload.get("market_regime"),
}


def _seed_score_performance(
s3, bucket: str, db_path: str, signals_prefix: str, dry_run: bool,
) -> dict:
"""Insert BUY-rated signals into score_performance with entry prices from universe_returns."""
"""Insert BUY-rated signals into score_performance with entry prices
from universe_returns. Includes the 5 calibrator-v1 context columns
(quant_score, qual_score, conviction, sector_modifier, market_regime)
on initial INSERT — values come from the same signals.json payload
that drives the BUY filter, so no second round-trip is needed.

Pre-2026-05-10 seed inserts wrote only (symbol, score_date, score,
price_on_date) and left the canonical columns NULL — that bug is the
root cause behind the 2026-05-09 evaluator weight_optimizer ERROR.
Existing NULL rows get repaired by ``_backfill_score_context``.
"""
try:
conn = sqlite3.connect(db_path)
_ensure_score_performance_schema(conn)
Expand All @@ -121,7 +167,10 @@ def _seed_score_performance(
# List signal dates from S3
signal_dates = _list_signal_dates(s3, bucket, signals_prefix)

rows_to_insert = []
# rows_to_insert carries (ticker, sig_date, score, context_dict) so
# the canonical context is captured at the same point the BUY
# filter runs — single source of truth per signals.json payload.
rows_to_insert: list[tuple[str, str, float, dict]] = []
for sig_date in signal_dates:
try:
obj = s3.get_object(Bucket=bucket, Key=f"{signals_prefix}/{sig_date}/signals.json")
Expand All @@ -135,7 +184,9 @@ def _seed_score_performance(
rating = stock.get("rating", "")
if not ticker or rating != "BUY" or (ticker, sig_date) in existing:
continue
rows_to_insert.append((ticker, sig_date, score))
rows_to_insert.append(
(ticker, sig_date, score, _extract_signal_context(signals, ticker))
)

# v1 format fallback
sigs = signals.get("signals", {})
Expand All @@ -145,15 +196,17 @@ def _seed_score_performance(
rating = s.get("rating", "")
if rating != "BUY" or (ticker, sig_date) in existing:
continue
rows_to_insert.append((ticker, sig_date, score))
rows_to_insert.append(
(ticker, sig_date, score, _extract_signal_context(signals, ticker))
)

if not rows_to_insert:
conn.close()
return {"status": "ok", "rows_written": 0, "note": "all rows already seeded"}

# Get entry prices from universe_returns (already in the DB)
inserted = 0
for ticker, sig_date, score in rows_to_insert:
for ticker, sig_date, score, ctx in rows_to_insert:
if score is None:
continue
# Look up entry price from universe_returns
Expand All @@ -167,8 +220,20 @@ def _seed_score_performance(

if not dry_run:
conn.execute(
"INSERT OR IGNORE INTO score_performance (symbol, score_date, score, price_on_date) VALUES (?, ?, ?, ?)",
(ticker, sig_date, round(float(score), 2), round(price, 2)),
"""
INSERT OR IGNORE INTO score_performance (
symbol, score_date, score, price_on_date,
quant_score, qual_score, conviction,
sector_modifier, market_regime
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
ticker, sig_date,
round(float(score), 2), round(price, 2),
ctx["quant_score"], ctx["qual_score"],
ctx["conviction"], ctx["sector_modifier"],
ctx["market_regime"],
),
)
inserted += 1

Expand All @@ -185,6 +250,116 @@ def _seed_score_performance(
return {"status": "error", "error": str(e), "rows_written": 0}


def _backfill_score_context(
s3, bucket: str, db_path: str, signals_prefix: str, dry_run: bool,
) -> dict:
"""Repair existing score_performance rows whose calibrator-v1 context
columns are NULL (rows seeded before _seed_score_performance learned
to write them).

Groups NULL-bearing rows by score_date so each signals.json fetch
serves all symbols for that date. UPDATE-WHERE-NULL means re-runs are
no-ops once the population converges.
"""
try:
conn = sqlite3.connect(db_path)
_ensure_score_performance_schema(conn)

rows = conn.execute(
"""
SELECT symbol, score_date
FROM score_performance
WHERE quant_score IS NULL
OR qual_score IS NULL
OR conviction IS NULL
OR sector_modifier IS NULL
OR market_regime IS NULL
ORDER BY score_date, symbol
"""
).fetchall()

if not rows:
conn.close()
return {"status": "ok", "rows_written": 0, "note": "no NULL context rows"}

by_date: dict[str, list[str]] = {}
for symbol, score_date in rows:
by_date.setdefault(score_date, []).append(symbol)

updated = 0
for sig_date, symbols in by_date.items():
try:
obj = s3.get_object(
Bucket=bucket,
Key=f"{signals_prefix}/{sig_date}/signals.json",
)
payload = json.loads(obj["Body"].read())
except (ClientError, json.JSONDecodeError):
continue

for symbol in symbols:
ctx = _extract_signal_context(payload, symbol)
# Skip if signals.json has no useful context for this symbol
# (rare — typically only legacy archive shapes).
if all(v is None for v in ctx.values()):
continue

cur = conn.execute(
"""
SELECT quant_score, qual_score, conviction,
sector_modifier, market_regime
FROM score_performance
WHERE symbol = ? AND score_date = ?
""",
(symbol, sig_date),
).fetchone()
if cur is None:
continue

cur_q, cur_qu, cur_c, cur_s, cur_r = cur
updates: list[tuple[str, object]] = []
if cur_q is None and ctx["quant_score"] is not None:
updates.append(("quant_score", ctx["quant_score"]))
if cur_qu is None and ctx["qual_score"] is not None:
updates.append(("qual_score", ctx["qual_score"]))
if cur_c is None and ctx["conviction"] is not None:
updates.append(("conviction", ctx["conviction"]))
if cur_s is None and ctx["sector_modifier"] is not None:
updates.append(("sector_modifier", ctx["sector_modifier"]))
if cur_r is None and ctx["market_regime"] is not None:
updates.append(("market_regime", ctx["market_regime"]))

if not updates:
continue
if dry_run:
updated += 1
continue

set_clause = ", ".join(f"{col} = ?" for col, _ in updates)
values = [v for _, v in updates] + [symbol, sig_date]
conn.execute(
f"UPDATE score_performance SET {set_clause} "
f"WHERE symbol = ? AND score_date = ?",
values,
)
updated += 1

if not dry_run:
conn.commit()
conn.close()

if updated:
logger.info(
"Backfilled calibrator-v1 context on %d score_performance rows",
updated,
)
return {"status": "ok", "rows_written": updated}

except Exception as e:
logger.error("backfill_score_context failed: %s", e)
return {"status": "error", "error": str(e), "rows_written": 0}


# ── Step 2: Backfill score_performance returns ────────────────────────────────


Expand Down Expand Up @@ -452,7 +627,17 @@ def _backfill_predictor_returns(


def _ensure_score_performance_schema(conn) -> None:
"""Add 5d return columns to score_performance if they don't exist yet."""
"""Add forward-return + calibrator-v1 context columns to
score_performance if they don't exist yet.

Belt-and-suspenders with alpha-engine-research's archive/schema.py
migrations (v11 forward-returns, v12 calibrator-v1 context). The
Saturday SF runs DataPhase1 first; if it ever fires against a fresh
research.db before the research Lambda has cold-started and applied
its migrations, this ensures the seed/backfill INSERT/UPDATE
statements still target valid columns. Idempotent — each ALTER is
skipped when the column already exists.
"""
cols = {r[1] for r in conn.execute("PRAGMA table_info(score_performance)").fetchall()}
for col, col_type in [
("price_5d", "REAL"), ("return_5d", "REAL"), ("spy_5d_return", "REAL"),
Expand All @@ -461,6 +646,10 @@ def _ensure_score_performance_schema(conn) -> None:
("beat_spy_10d", "INTEGER"), ("eval_date_10d", "TEXT"),
("price_30d", "REAL"), ("return_30d", "REAL"), ("spy_30d_return", "REAL"),
("beat_spy_30d", "INTEGER"), ("eval_date_30d", "TEXT"),
# Calibrator-v1 context (alpha-engine-research migration #12)
("quant_score", "REAL"), ("qual_score", "REAL"),
("conviction", "TEXT"), ("sector_modifier", "REAL"),
("market_regime", "TEXT"),
]:
if col not in cols:
conn.execute(f"ALTER TABLE score_performance ADD COLUMN {col} {col_type}")
Expand Down Expand Up @@ -489,6 +678,118 @@ def _ensure_predictor_outcomes_schema(conn) -> None:
conn.commit()


# Canonical context columns the producer-side seed must populate.
# Source of truth for the drift-gate query; mirrors the columns added by
# alpha-engine-research migration #12 (calibrator-v1 context).
_CANONICAL_CONTEXT_COLUMNS = (
"quant_score",
"qual_score",
"conviction",
"sector_modifier",
"market_regime",
)

# Effective date for the drift gate — first Saturday SF run AFTER this PR
# merges. Rows with `score_date >= _DRIFT_EFFECTIVE_DATE` are counted
# against the producer's coverage contract. Pre-cutover rows are excluded
# so the gate isn't polluted by legacy NULLs the backfill step is still
# catching up on.
_DRIFT_EFFECTIVE_DATE = "2026-05-17"


def _emit_context_coverage_metric(db_path: str) -> dict:
"""Drift-detection CloudWatch gauge for the canonical-context contract.

Producer-side: after seed + backfill complete, query score_performance
for rows with score_date >= _DRIFT_EFFECTIVE_DATE and compute the
percentage that have ALL 5 canonical context columns populated
(non-NULL). Emit as ``AlphaEngine/Data/score_performance_canonical_coverage_pct``
so an alarm can fire if the percentage drops below the contract
threshold for any cycle.

Always emits (including 100.0) so alarm baselines are continuous;
CloudWatch missing-data is harder to alarm against than a steady
series. Best-effort: read / metric-emit errors log a warning but
never raise — this is observability, not a load-bearing path.

Mirrors the chronic-gap drift detection pattern at
weekly_collector.py:_check_chronic_gap_polygon_recovery.
"""
summary: dict = {"status": "ok"}
try:
conn = sqlite3.connect(db_path)
try:
total = conn.execute(
"SELECT COUNT(*) FROM score_performance WHERE score_date >= ?",
(_DRIFT_EFFECTIVE_DATE,),
).fetchone()[0]
if total == 0:
# Pre-cutover, or no new data this cycle. Coverage is
# undefined; report 100.0 so the alarm doesn't fire on a
# legitimately-empty window.
summary.update(
rows_post_cutoff=0,
rows_fully_populated=0,
coverage_pct=100.0,
note="no rows past effective_date — coverage undefined",
)
else:
null_clause = " OR ".join(
f"{col} IS NULL" for col in _CANONICAL_CONTEXT_COLUMNS
)
nulls = conn.execute(
f"SELECT COUNT(*) FROM score_performance "
f"WHERE score_date >= ? AND ({null_clause})",
(_DRIFT_EFFECTIVE_DATE,),
).fetchone()[0]
populated = total - nulls
coverage_pct = (populated / total) * 100.0
summary.update(
rows_post_cutoff=total,
rows_fully_populated=populated,
coverage_pct=round(coverage_pct, 2),
)
finally:
conn.close()
except Exception as exc:
logger.warning(
"context_coverage_metric: DB read failed — drift alarm "
"cadence may degrade until next cycle. %s", exc,
)
summary["status"] = "skipped"
summary["error"] = str(exc)
return summary

try:
cw = boto3.client("cloudwatch")
cw.put_metric_data(
Namespace="AlphaEngine/Data",
MetricData=[{
"MetricName": "score_performance_canonical_coverage_pct",
"Value": float(summary["coverage_pct"]),
"Unit": "Percent",
}],
)
except Exception as exc:
logger.warning(
"score_performance_canonical_coverage_pct metric emit failed: "
"%s — drift alarm cadence may degrade until next cycle.", exc,
)
summary["status"] = "skipped"
summary["error"] = str(exc)

if summary["status"] == "ok" and summary.get("coverage_pct", 100.0) < 100.0:
logger.warning(
"Canonical context coverage drift: %.2f%% (%d/%d post-%s rows "
"fully populated). Expected 100%%. Producer-side regression — "
"investigate _seed_score_performance / signals.json shape.",
summary["coverage_pct"], summary["rows_fully_populated"],
summary["rows_post_cutoff"], _DRIFT_EFFECTIVE_DATE,
)

return summary


def _list_signal_dates(s3, bucket: str, prefix: str) -> list[str]:
"""List all signal dates from S3."""
dates = []
Expand Down
Loading
Loading