From 3a5d0b855049d0391856a27635574483d1911032 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 7 Apr 2026 07:15:27 -0700 Subject: [PATCH 1/4] chore: add gitleaks pre-commit hook for secret scanning Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 5 +++++ .pre-commit-config.yaml | 6 ++++++ 2 files changed, 11 insertions(+) create mode 100644 .pre-commit-config.yaml diff --git a/.gitignore b/.gitignore index 621e91f..3c4af8b 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,8 @@ Thumbs.db # Design docs *-design-*.md + +# Gitleaks config (symlink to shared config) +.gitleaks.toml + +.gitleaksbaseline.json diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..abe3316 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,6 @@ +repos: + - repo: https://github.com/gitleaks/gitleaks + rev: v8.24.0 + hooks: + - id: gitleaks + args: ['--baseline-path', '.gitleaksbaseline.json'] From eb74c33ce0ec3ceef952acf8579411acd1a32185 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 7 Apr 2026 11:48:14 -0700 Subject: [PATCH 2/4] Phase 1: ArcticDB store + historical backfill script --- builders/__init__.py | 0 builders/backfill.py | 452 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + store/__init__.py | 0 store/arctic_store.py | 63 ++++++ 5 files changed, 516 insertions(+) create mode 100644 builders/__init__.py create mode 100644 builders/backfill.py create mode 100644 store/__init__.py create mode 100644 store/arctic_store.py diff --git a/builders/__init__.py b/builders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/builders/backfill.py b/builders/backfill.py new file mode 100644 index 0000000..c0e2f14 --- /dev/null +++ b/builders/backfill.py @@ -0,0 +1,452 @@ +""" +builders/backfill.py — Historical backfill of ArcticDB universe from S3 price cache. + +Loads the full 10-year price cache from S3, computes all 53 features for every +ticker's full history, and writes each ticker as a symbol in the ArcticDB +universe library. Also writes macro features to the macro library. + +This is a one-time migration script (Phase 1 of the unified data layer plan). +After initial backfill, the weekly Saturday pipeline rebuilds from fresh data, +and the daily weekday pipeline appends new rows. + +Usage: + python -m builders.backfill # full backfill + python -m builders.backfill --dry-run # compute but skip ArcticDB write + python -m builders.backfill --ticker AAPL # single ticker (for testing) + python -m builders.backfill --validate # backfill + spot-check validation +""" + +from __future__ import annotations + +import argparse +import io +import json +import logging +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone + +import boto3 +import numpy as np +import pandas as pd + +from features.feature_engineer import ( + FEATURES, + MIN_ROWS_FOR_FEATURES, + compute_features, +) +from features.compute import ( + DEFAULT_BUCKET, + _SKIP_TICKERS, + _is_sector_etf, + _load_parquet_from_s3, + _load_sector_map, + _load_cached_fundamentals, + _load_cached_alternative, +) +from store.arctic_store import get_universe_lib, get_macro_lib + +log = logging.getLogger(__name__) + +# OHLCV columns to keep alongside features in ArcticDB +OHLCV_COLS = ["Open", "High", "Low", "Close", "Volume"] + + +def _load_full_cache(s3, bucket: str, prefix: str = "predictor/price_cache/") -> dict[str, pd.DataFrame]: + """Load all 10-year price cache parquets from S3 (concurrent).""" + keys = [] + paginator = s3.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"].endswith(".parquet"): + keys.append(obj["Key"]) + + if not keys: + log.error("No parquets found in s3://%s/%s", bucket, prefix) + return {} + + log.info("Downloading %d full cache parquets from s3://%s/%s ...", len(keys), bucket, prefix) + + price_data: dict[str, pd.DataFrame] = {} + errors = 0 + + def _download(key: str) -> tuple[str, pd.DataFrame | None]: + ticker = key.split("/")[-1].replace(".parquet", "") + try: + df = _load_parquet_from_s3(s3, bucket, key) + if df.empty: + return ticker, None + return ticker, df + except Exception: + return ticker, None + + with ThreadPoolExecutor(max_workers=20) as pool: + futures = {pool.submit(_download, k): k for k in keys} + for fut in as_completed(futures): + ticker, df = fut.result() + if df is not None: + price_data[ticker] = df + else: + errors += 1 + + log.info("Full cache loaded: %d tickers OK, %d errors", len(price_data), errors) + return price_data + + +def _extract_macro_series(price_data: dict[str, pd.DataFrame]) -> dict[str, pd.Series]: + """Extract macro/ETF Close series from price data.""" + macro_keys = { + "SPY": "SPY", "VIX": "VIX", "VIX3M": "VIX3M", + "TNX": "TNX", "IRX": "IRX", "GLD": "GLD", "USO": "USO", + } + macro: dict[str, pd.Series] = {} + for key, stem in macro_keys.items(): + df = price_data.get(stem) + if df is not None and "Close" in df.columns: + macro[key] = df["Close"].dropna() + + # Sector ETFs + for stem, df in price_data.items(): + if stem.startswith("XL") and len(stem) <= 4 and "Close" in df.columns: + macro[stem] = df["Close"].dropna() + + return macro + + +def _build_macro_features_df(macro: dict[str, pd.Series]) -> pd.DataFrame: + """Build a DataFrame of macro features (one row per date) for the macro library.""" + vix = macro.get("VIX") + tnx = macro.get("TNX") + irx = macro.get("IRX") + gld = macro.get("GLD") + uso = macro.get("USO") + vix3m = macro.get("VIX3M") + spy = macro.get("SPY") + + if vix is None or spy is None: + log.warning("Missing VIX or SPY — macro features will be incomplete") + return pd.DataFrame() + + # Build on the VIX index (available for all trading dates) + idx = vix.index + df = pd.DataFrame(index=idx) + + df["vix_level"] = (vix.reindex(idx) / 20.0).astype("float32") + if tnx is not None: + df["yield_10y"] = (tnx.reindex(idx) / 10.0).astype("float32") + if tnx is not None and irx is not None: + df["yield_curve_slope"] = ((tnx.reindex(idx) - irx.reindex(idx)) / 10.0).astype("float32") + if gld is not None: + df["gold_mom_5d"] = gld.reindex(idx).pct_change(5).astype("float32") + if uso is not None: + df["oil_mom_5d"] = uso.reindex(idx).pct_change(5).astype("float32") + if vix3m is not None: + vix_r = vix.reindex(idx) + vix3m_r = vix3m.reindex(idx) + df["vix_term_slope"] = ((vix3m_r - vix_r) / vix_r.clip(lower=1.0)).astype("float32") + + # Cross-sectional dispersion placeholder (requires per-ticker returns, set to 0) + df["xsect_dispersion"] = np.float32(0.0) + + df = df.dropna(subset=["vix_level"]) + df.index.name = "date" + return df + + +def backfill( + bucket: str = DEFAULT_BUCKET, + dry_run: bool = False, + ticker_filter: str | None = None, + validate: bool = False, +) -> dict: + """ + Run the full historical backfill: load 10y prices, compute features, write to ArcticDB. + + Args: + bucket: S3 bucket name + dry_run: compute but skip ArcticDB writes + ticker_filter: if set, only process this single ticker (for testing) + validate: if True, run spot-check validation after backfill + + Returns: + Summary dict with counts and timing. + """ + s3 = boto3.client("s3") + t0 = time.time() + + # ── 1. Load data ───────────────────────────────────────────────────────── + log.info("Loading full 10-year price cache...") + price_data = _load_full_cache(s3, bucket) + if not price_data: + return {"status": "error", "error": "no_price_data"} + + macro = _extract_macro_series(price_data) + sector_map = _load_sector_map(s3, bucket) + + # Use today's date for fundamentals/alt data lookup + today_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + fundamentals = _load_cached_fundamentals(s3, bucket, today_str) + alt_data = _load_cached_alternative(s3, bucket) + + t_load = time.time() - t0 + log.info( + "Data loaded in %.1fs: %d tickers, %d macro series, %d sector mappings", + t_load, len(price_data), len(macro), len(sector_map), + ) + + # ── 2. Filter to stock tickers ─────────────────────────────────────────── + universe_tickers = [ + t for t in price_data + if t not in _SKIP_TICKERS + and not _is_sector_etf(t) + and price_data[t] is not None + and len(price_data[t]) >= MIN_ROWS_FOR_FEATURES + ] + + if ticker_filter: + if ticker_filter not in universe_tickers: + log.error("Ticker %s not found in universe (or insufficient history)", ticker_filter) + return {"status": "error", "error": f"ticker_not_found: {ticker_filter}"} + universe_tickers = [ticker_filter] + + log.info("Computing features for %d tickers...", len(universe_tickers)) + + # ── 3. Extract macro series ────────────────────────────────────────────── + spy_series = macro.get("SPY") + vix_series = macro.get("VIX") + tnx_series = macro.get("TNX") + irx_series = macro.get("IRX") + gld_series = macro.get("GLD") + uso_series = macro.get("USO") + vix3m_series = macro.get("VIX3M") + + # ── 4. Compute features and write to ArcticDB ──────────────────────────── + if not dry_run: + universe_lib = get_universe_lib(bucket) + macro_lib = get_macro_lib(bucket) + + n_ok = 0 + n_skip = 0 + n_err = 0 + t_compute_start = time.time() + + for i, ticker in enumerate(universe_tickers): + try: + df = price_data[ticker] + sector_etf_sym = sector_map.get(ticker) + sector_etf_series = macro.get(sector_etf_sym) if sector_etf_sym else None + + ticker_alt = alt_data.get(ticker, {}) + + featured_df = compute_features( + df, + spy_series=spy_series, + vix_series=vix_series, + sector_etf_series=sector_etf_series, + tnx_series=tnx_series, + irx_series=irx_series, + gld_series=gld_series, + uso_series=uso_series, + vix3m_series=vix3m_series, + earnings_data=ticker_alt.get("earnings"), + revision_data=ticker_alt.get("revisions"), + options_data=ticker_alt.get("options"), + fundamental_data=fundamentals.get(ticker), + ) + + if featured_df.empty: + n_skip += 1 + continue + + # Keep OHLCV + all feature columns + keep_cols = [c for c in OHLCV_COLS if c in featured_df.columns] + \ + [f for f in FEATURES if f in featured_df.columns] + symbol_df = featured_df[keep_cols].copy() + + # Ensure float32 for feature columns (OHLCV stays float64/int64) + for f in FEATURES: + if f in symbol_df.columns: + symbol_df[f] = symbol_df[f].astype("float32") + + symbol_df.index.name = "date" + + if not dry_run: + universe_lib.write(ticker, symbol_df) + + n_ok += 1 + + if (i + 1) % 100 == 0: + log.info("Progress: %d / %d tickers processed (%d OK)", i + 1, len(universe_tickers), n_ok) + + except Exception as exc: + log.warning("Failed to compute features for %s: %s", ticker, exc) + n_err += 1 + + t_compute = time.time() - t_compute_start + + # ── 5. Write macro features ────────────────────────────────────────────── + macro_df = _build_macro_features_df(macro) + if not macro_df.empty and not dry_run: + macro_lib.write("features", macro_df) + log.info("Wrote macro features: %d dates", len(macro_df)) + + # Write raw macro series (SPY, VIX, etc.) for consumers that need them + if not dry_run: + for key in ["SPY", "VIX", "VIX3M", "TNX", "IRX", "GLD", "USO"]: + series = macro.get(key) + if series is not None: + macro_series_df = pd.DataFrame({"Close": series}, index=series.index) + macro_series_df.index.name = "date" + macro_lib.write(key, macro_series_df) + + # Write sector ETFs + for key in macro: + if key.startswith("XL"): + sector_df = pd.DataFrame({"Close": macro[key]}, index=macro[key].index) + sector_df.index.name = "date" + macro_lib.write(key, sector_df) + + # ── 6. Snapshot ────────────────────────────────────────────────────────── + if not dry_run: + snapshot_name = f"backfill-{today_str}" + try: + universe_lib.snapshot(snapshot_name) + log.info("Created snapshot: %s", snapshot_name) + except Exception as exc: + log.warning("Snapshot creation failed (non-fatal): %s", exc) + + t_total = time.time() - t0 + + result = { + "status": "ok", + "tickers_written": n_ok, + "tickers_skipped": n_skip, + "tickers_errored": n_err, + "macro_dates": len(macro_df) if not macro_df.empty else 0, + "load_seconds": round(t_load, 1), + "compute_seconds": round(t_compute, 1), + "total_seconds": round(t_total, 1), + "dry_run": dry_run, + } + + log.info("Backfill complete: %s", json.dumps(result, default=str)) + + # ── 7. Validation (optional) ───────────────────────────────────────────── + if validate and not dry_run: + _run_validation(universe_lib, price_data, macro, sector_map, fundamentals, alt_data) + + return result + + +def _run_validation( + universe_lib, + price_data: dict[str, pd.DataFrame], + macro: dict[str, pd.Series], + sector_map: dict[str, str], + fundamentals: dict[str, dict], + alt_data: dict[str, dict], +): + """Spot-check: recompute features inline for 10 tickers and compare to ArcticDB.""" + symbols = universe_lib.list_symbols() + check_tickers = sorted(symbols)[:10] + + log.info("Running validation on %d tickers: %s", len(check_tickers), check_tickers) + + spy_series = macro.get("SPY") + vix_series = macro.get("VIX") + tnx_series = macro.get("TNX") + irx_series = macro.get("IRX") + gld_series = macro.get("GLD") + uso_series = macro.get("USO") + vix3m_series = macro.get("VIX3M") + + passed = 0 + failed = 0 + + for ticker in check_tickers: + try: + stored = universe_lib.read(ticker).data + + df = price_data[ticker] + sector_etf_sym = sector_map.get(ticker) + sector_etf_series = macro.get(sector_etf_sym) if sector_etf_sym else None + ticker_alt = alt_data.get(ticker, {}) + + recomputed = compute_features( + df, + spy_series=spy_series, + vix_series=vix_series, + sector_etf_series=sector_etf_series, + tnx_series=tnx_series, + irx_series=irx_series, + gld_series=gld_series, + uso_series=uso_series, + vix3m_series=vix3m_series, + earnings_data=ticker_alt.get("earnings"), + revision_data=ticker_alt.get("revisions"), + options_data=ticker_alt.get("options"), + fundamental_data=fundamentals.get(ticker), + ) + + # Compare row counts + if len(stored) != len(recomputed): + log.warning( + "FAIL %s: row count mismatch (stored=%d, recomputed=%d)", + ticker, len(stored), len(recomputed), + ) + failed += 1 + continue + + # Compare feature values on last 10 rows + feature_cols = [f for f in FEATURES if f in stored.columns and f in recomputed.columns] + tail_stored = stored[feature_cols].tail(10).values + tail_recomputed = recomputed[feature_cols].tail(10).values.astype("float32") + + if np.allclose(tail_stored, tail_recomputed, atol=1e-5, equal_nan=True): + log.info("PASS %s: features match (%d rows, %d features)", ticker, len(stored), len(feature_cols)) + passed += 1 + else: + max_diff = np.nanmax(np.abs(tail_stored - tail_recomputed)) + log.warning("FAIL %s: max feature diff = %.6f", ticker, max_diff) + failed += 1 + + except Exception as exc: + log.warning("FAIL %s: validation error: %s", ticker, exc) + failed += 1 + + log.info("Validation complete: %d passed, %d failed", passed, failed) + + +def main(): + parser = argparse.ArgumentParser(description="Backfill ArcticDB universe from S3 price cache") + parser.add_argument("--dry-run", action="store_true", help="Compute but skip ArcticDB writes") + parser.add_argument("--ticker", default=None, help="Process single ticker (for testing)") + parser.add_argument("--validate", action="store_true", help="Run spot-check validation after backfill") + parser.add_argument("--bucket", default=DEFAULT_BUCKET, help=f"S3 bucket (default: {DEFAULT_BUCKET})") + parser.add_argument("--verbose", "-v", action="store_true", help="Enable debug logging") + + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", + datefmt="%H:%M:%S", + ) + + result = backfill( + bucket=args.bucket, + dry_run=args.dry_run, + ticker_filter=args.ticker, + validate=args.validate, + ) + + if result["status"] != "ok": + log.error("Backfill failed: %s", result.get("error")) + sys.exit(1) + + print(json.dumps(result, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index 915e510..de20666 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ edgartools>=2.0 psycopg2-binary>=2.9 voyageai>=0.3 jsonschema>=4.20 +arcticdb>=6.11 diff --git a/store/__init__.py b/store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/store/arctic_store.py b/store/arctic_store.py new file mode 100644 index 0000000..9f8e9bd --- /dev/null +++ b/store/arctic_store.py @@ -0,0 +1,63 @@ +""" +store/arctic_store.py — ArcticDB connection manager. + +Thin wrapper over ArcticDB that provides library access for all modules. +Uses S3 backend — no additional infrastructure beyond the existing bucket. + +Usage: + from store.arctic_store import get_universe_lib, get_macro_lib + + universe = get_universe_lib() + df = universe.read("AAPL").data + +Libraries: + universe — per-ticker time series (OHLCV + 53 computed features) + macro — market-wide time series (VIX, yields, commodities, macro features) +""" + +from __future__ import annotations + +import logging +import os + +import arcticdb as adb + +log = logging.getLogger(__name__) + +DEFAULT_BUCKET = "alpha-engine-research" +ARCTIC_PREFIX = "arcticdb" + +_arctic_instance: adb.Arctic | None = None + + +def _get_arctic(bucket: str | None = None) -> adb.Arctic: + """Get or create the ArcticDB connection singleton.""" + global _arctic_instance + if _arctic_instance is not None: + return _arctic_instance + + bucket = bucket or os.environ.get("ARCTIC_BUCKET", DEFAULT_BUCKET) + region = os.environ.get("AWS_REGION", "us-east-1") + uri = f"s3s://s3.{region}.amazonaws.com:{bucket}?path_prefix={ARCTIC_PREFIX}&aws_auth=true" + + log.info("Connecting to ArcticDB: s3://%s/%s (region=%s)", bucket, ARCTIC_PREFIX, region) + _arctic_instance = adb.Arctic(uri) + return _arctic_instance + + +def get_universe_lib(bucket: str | None = None) -> adb.library.Library: + """Get the universe library (per-ticker OHLCV + features).""" + arctic = _get_arctic(bucket) + return arctic.get_library("universe", create_if_missing=True) + + +def get_macro_lib(bucket: str | None = None) -> adb.library.Library: + """Get the macro library (market-wide time series).""" + arctic = _get_arctic(bucket) + return arctic.get_library("macro", create_if_missing=True) + + +def reset_connection(): + """Reset the singleton (useful for testing or credential rotation).""" + global _arctic_instance + _arctic_instance = None From ea4c55a5ce4a2bc4d120df8c6fb3f237f8c1a53b Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 7 Apr 2026 12:11:30 -0700 Subject: [PATCH 3/4] Fix float(None) TypeError in alternative/fundamental feature computation --- features/feature_engineer.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/features/feature_engineer.py b/features/feature_engineer.py index 994bc0b..63d7a00 100644 --- a/features/feature_engineer.py +++ b/features/feature_engineer.py @@ -459,10 +459,14 @@ def compute_features( # ── v2.0 features — alternative data signals (O10-O12) ──────────────────── + def _safe_float(val, default: float = 0.0) -> float: + """Coerce to float, treating None as the default.""" + return float(val) if val is not None else default + # O10: PEAD if earnings_data: - df["earnings_surprise_pct"] = float(earnings_data.get("surprise_pct", 0.0)) - days_since = float(earnings_data.get("days_since_earnings", 90)) + df["earnings_surprise_pct"] = _safe_float(earnings_data.get("surprise_pct"), 0.0) + days_since = _safe_float(earnings_data.get("days_since_earnings"), 90.0) df["days_since_earnings"] = days_since / 90.0 else: df["earnings_surprise_pct"] = 0.0 @@ -470,17 +474,17 @@ def compute_features( # O11: EPS revision momentum if revision_data: - df["eps_revision_4w"] = float(revision_data.get("eps_revision_4w", 0.0)) - df["revision_streak"] = float(revision_data.get("revision_streak", 0)) + df["eps_revision_4w"] = _safe_float(revision_data.get("eps_revision_4w"), 0.0) + df["revision_streak"] = _safe_float(revision_data.get("revision_streak"), 0.0) else: df["eps_revision_4w"] = 0.0 df["revision_streak"] = 0.0 # O12: Options-derived signals if options_data: - df["put_call_ratio"] = float(options_data.get("put_call_ratio", 0.0)) - df["iv_rank"] = float(options_data.get("iv_rank", 0.5)) - atm_iv = float(options_data.get("atm_iv", 0.0)) + df["put_call_ratio"] = _safe_float(options_data.get("put_call_ratio"), 0.0) + df["iv_rank"] = _safe_float(options_data.get("iv_rank"), 0.5) + atm_iv = _safe_float(options_data.get("atm_iv"), 0.0) realized_vol = df["realized_vol_20d"].iloc[-1] if "realized_vol_20d" in df.columns else 0.0 if realized_vol > 0 and atm_iv > 0: df["iv_vs_rv"] = atm_iv / realized_vol @@ -493,14 +497,14 @@ def compute_features( # ── v3.0 features — fundamental ratios (quarterly, from FMP) ───────────── if fundamental_data: - df["pe_ratio"] = float(fundamental_data.get("pe_ratio", 0.0)) - df["pb_ratio"] = float(fundamental_data.get("pb_ratio", 0.0)) - df["debt_to_equity"] = float(fundamental_data.get("debt_to_equity", 0.0)) - df["revenue_growth_yoy"] = float(fundamental_data.get("revenue_growth_yoy", 0.0)) - df["fcf_yield"] = float(fundamental_data.get("fcf_yield", 0.0)) - df["gross_margin"] = float(fundamental_data.get("gross_margin", 0.0)) - df["roe"] = float(fundamental_data.get("roe", 0.0)) - df["current_ratio"] = float(fundamental_data.get("current_ratio", 0.0)) + df["pe_ratio"] = _safe_float(fundamental_data.get("pe_ratio"), 0.0) + df["pb_ratio"] = _safe_float(fundamental_data.get("pb_ratio"), 0.0) + df["debt_to_equity"] = _safe_float(fundamental_data.get("debt_to_equity"), 0.0) + df["revenue_growth_yoy"] = _safe_float(fundamental_data.get("revenue_growth_yoy"), 0.0) + df["fcf_yield"] = _safe_float(fundamental_data.get("fcf_yield"), 0.0) + df["gross_margin"] = _safe_float(fundamental_data.get("gross_margin"), 0.0) + df["roe"] = _safe_float(fundamental_data.get("roe"), 0.0) + df["current_ratio"] = _safe_float(fundamental_data.get("current_ratio"), 0.0) else: df["pe_ratio"] = 0.0 df["pb_ratio"] = 0.0 From ee4977c56329e68fb18fd23502e9333fe6e6bbf7 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 7 Apr 2026 13:13:03 -0700 Subject: [PATCH 4/4] Phase 2: wire ArcticDB weekly rebuild + daily append into pipeline --- builders/daily_append.py | 328 +++++++++++++++++++++++++++++++++++++++ weekly_collector.py | 31 +++- 2 files changed, 358 insertions(+), 1 deletion(-) create mode 100644 builders/daily_append.py diff --git a/builders/daily_append.py b/builders/daily_append.py new file mode 100644 index 0000000..f5287be --- /dev/null +++ b/builders/daily_append.py @@ -0,0 +1,328 @@ +""" +builders/daily_append.py — Append today's OHLCV + features to ArcticDB universe. + +Reads today's daily_closes from S3 (already written by daily_closes.py), +loads recent history from ArcticDB for feature warmup, computes today's +features, and appends a single row per ticker to the universe library. + +Usage: + python -m builders.daily_append # today + python -m builders.daily_append --date 2026-04-07 # specific date + python -m builders.daily_append --dry-run # compute but skip write +""" + +from __future__ import annotations + +import argparse +import io +import json +import logging +import sys +import time +from datetime import datetime, timezone + +import boto3 +import numpy as np +import pandas as pd + +from features.feature_engineer import ( + FEATURES, + MIN_ROWS_FOR_FEATURES, + compute_features, +) +from features.compute import ( + DEFAULT_BUCKET, + _SKIP_TICKERS, + _is_sector_etf, + _load_sector_map, + _load_cached_fundamentals, + _load_cached_alternative, +) +from store.arctic_store import get_universe_lib, get_macro_lib + +log = logging.getLogger(__name__) + +OHLCV_COLS = ["Open", "High", "Low", "Close", "Volume"] + + +def _load_daily_closes(s3, bucket: str, date_str: str) -> dict[str, dict]: + """Load today's daily_closes parquet from S3. Returns {ticker: {Open, High, Low, Close, Volume}}.""" + key = f"predictor/daily_closes/{date_str}.parquet" + try: + obj = s3.get_object(Bucket=bucket, Key=key) + buf = io.BytesIO(obj["Body"].read()) + df = pd.read_parquet(buf, engine="pyarrow") + + records = {} + for ticker, row in df.iterrows(): + records[str(ticker)] = { + "Open": float(row.get("Open", np.nan)), + "High": float(row.get("High", np.nan)), + "Low": float(row.get("Low", np.nan)), + "Close": float(row.get("Close", np.nan)), + "Volume": int(row.get("Volume", 0)), + } + log.info("Loaded daily closes for %s: %d tickers", date_str, len(records)) + return records + except Exception as exc: + log.error("Failed to load daily_closes/%s.parquet: %s", date_str, exc) + return {} + + +def daily_append( + date_str: str | None = None, + bucket: str = DEFAULT_BUCKET, + dry_run: bool = False, +) -> dict: + """ + Append today's features to ArcticDB universe. + + For each ticker: + 1. Read recent history from ArcticDB (tail ~300 rows for feature warmup) + 2. Append today's OHLCV row + 3. Compute features for the combined series + 4. Extract the last row (today) and append to ArcticDB + + Returns summary dict. + """ + s3 = boto3.client("s3") + date_str = date_str or datetime.now(timezone.utc).strftime("%Y-%m-%d") + today_ts = pd.Timestamp(date_str) + t0 = time.time() + + # ── 1. Load today's OHLCV ──────────────────────────────────────────────── + closes = _load_daily_closes(s3, bucket, date_str) + if not closes: + return {"status": "error", "error": "no daily_closes for date"} + + # ── 2. Load supporting data ────────────────────────────────────────────── + sector_map = _load_sector_map(s3, bucket) + fundamentals = _load_cached_fundamentals(s3, bucket, date_str) + alt_data = _load_cached_alternative(s3, bucket) + + if not dry_run: + universe_lib = get_universe_lib(bucket) + macro_lib = get_macro_lib(bucket) + else: + universe_lib = None + macro_lib = None + + # ── 3. Load macro series from ArcticDB ─────────────────────────────────── + macro: dict[str, pd.Series] = {} + macro_keys = ["SPY", "VIX", "VIX3M", "TNX", "IRX", "GLD", "USO"] + + if not dry_run: + for key in macro_keys: + try: + mdf = macro_lib.read(key).data + if "Close" in mdf.columns: + series = mdf["Close"].dropna() + # Append today's close if available + ticker_close = closes.get(key) + if ticker_close and not np.isnan(ticker_close["Close"]): + new_row = pd.Series( + {"Close": ticker_close["Close"]}, + name=today_ts, + ) + series = pd.concat([series, pd.Series([ticker_close["Close"]], index=[today_ts])]) + series = series[~series.index.duplicated(keep="last")] + macro[key] = series + except Exception: + log.debug("Macro series %s not in ArcticDB", key) + + # Sector ETFs + for sym in macro_lib.list_symbols(): + if sym.startswith("XL"): + try: + mdf = macro_lib.read(sym).data + if "Close" in mdf.columns: + series = mdf["Close"].dropna() + ticker_close = closes.get(sym) + if ticker_close and not np.isnan(ticker_close["Close"]): + series = pd.concat([series, pd.Series([ticker_close["Close"]], index=[today_ts])]) + series = series[~series.index.duplicated(keep="last")] + macro[sym] = series + except Exception: + pass + + t_load = time.time() - t0 + log.info("Data loaded in %.1fs: %d closes, %d macro series", t_load, len(closes), len(macro)) + + # ── 4. Compute features and append ─────────────────────────────────────── + spy_series = macro.get("SPY") + vix_series = macro.get("VIX") + tnx_series = macro.get("TNX") + irx_series = macro.get("IRX") + gld_series = macro.get("GLD") + uso_series = macro.get("USO") + vix3m_series = macro.get("VIX3M") + + # Filter to stock tickers only + stock_tickers = [ + t for t in closes + if t not in _SKIP_TICKERS and not _is_sector_etf(t) + ] + + n_ok = 0 + n_skip = 0 + n_err = 0 + + for ticker in stock_tickers: + try: + # Read recent history from ArcticDB (need ~265 rows for feature warmup) + if dry_run: + n_skip += 1 + continue + + try: + hist = universe_lib.read(ticker).data + except Exception: + log.debug("Ticker %s not in ArcticDB — skipping", ticker) + n_skip += 1 + continue + + if len(hist) < MIN_ROWS_FOR_FEATURES: + n_skip += 1 + continue + + # Check if today already exists + if today_ts in hist.index: + n_skip += 1 + continue + + # Build today's OHLCV row + bar = closes[ticker] + if np.isnan(bar["Close"]): + n_skip += 1 + continue + + new_row = pd.DataFrame( + [{col: bar.get(col, np.nan) for col in OHLCV_COLS}], + index=pd.DatetimeIndex([today_ts]), + ) + + # Combine history OHLCV + today's bar for feature computation + hist_ohlcv = hist[[c for c in OHLCV_COLS if c in hist.columns]] + combined = pd.concat([hist_ohlcv, new_row]) + combined = combined[~combined.index.duplicated(keep="last")].sort_index() + + # Compute features on the combined series + sector_etf_sym = sector_map.get(ticker) + sector_etf_series = macro.get(sector_etf_sym) if sector_etf_sym else None + ticker_alt = alt_data.get(ticker, {}) + + featured = compute_features( + combined, + spy_series=spy_series, + vix_series=vix_series, + sector_etf_series=sector_etf_series, + tnx_series=tnx_series, + irx_series=irx_series, + gld_series=gld_series, + uso_series=uso_series, + vix3m_series=vix3m_series, + earnings_data=ticker_alt.get("earnings"), + revision_data=ticker_alt.get("revisions"), + options_data=ticker_alt.get("options"), + fundamental_data=fundamentals.get(ticker), + ) + + if featured.empty or today_ts not in featured.index: + n_skip += 1 + continue + + # Extract today's row with OHLCV + features + keep_cols = [c for c in OHLCV_COLS if c in featured.columns] + \ + [f for f in FEATURES if f in featured.columns] + today_row = featured.loc[[today_ts], keep_cols].copy() + + for f in FEATURES: + if f in today_row.columns: + today_row[f] = today_row[f].astype("float32") + + today_row.index.name = "date" + + universe_lib.append(ticker, today_row) + n_ok += 1 + + except Exception as exc: + log.debug("Failed to append %s: %s", ticker, exc) + n_err += 1 + + # ── 5. Update macro series ─────────────────────────────────────────────── + if not dry_run: + for key in macro_keys: + bar = closes.get(key) + if bar and not np.isnan(bar.get("Close", np.nan)): + try: + new_row = pd.DataFrame( + [{"Close": bar["Close"]}], + index=pd.DatetimeIndex([today_ts]), + ) + new_row.index.name = "date" + macro_lib.append(key, new_row) + except Exception as exc: + log.debug("Failed to append macro %s: %s", key, exc) + + # Sector ETFs + for sym in closes: + if sym.startswith("XL") and len(sym) <= 4: + bar = closes[sym] + if not np.isnan(bar.get("Close", np.nan)): + try: + new_row = pd.DataFrame( + [{"Close": bar["Close"]}], + index=pd.DatetimeIndex([today_ts]), + ) + new_row.index.name = "date" + macro_lib.append(sym, new_row) + except Exception as exc: + log.debug("Failed to append macro %s: %s", sym, exc) + + t_total = time.time() - t0 + + result = { + "status": "ok", + "date": date_str, + "tickers_appended": n_ok, + "tickers_skipped": n_skip, + "tickers_errored": n_err, + "load_seconds": round(t_load, 1), + "total_seconds": round(t_total, 1), + "dry_run": dry_run, + } + + log.info("Daily append complete: %s", json.dumps(result, default=str)) + return result + + +def main(): + parser = argparse.ArgumentParser(description="Append daily features to ArcticDB universe") + parser.add_argument("--date", default=None, help="Target date (YYYY-MM-DD, default: today UTC)") + parser.add_argument("--dry-run", action="store_true", help="Compute but skip ArcticDB writes") + parser.add_argument("--bucket", default=DEFAULT_BUCKET, help=f"S3 bucket (default: {DEFAULT_BUCKET})") + parser.add_argument("--verbose", "-v", action="store_true", help="Enable debug logging") + + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", + datefmt="%H:%M:%S", + ) + + result = daily_append( + date_str=args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d"), + bucket=args.bucket, + dry_run=args.dry_run, + ) + + if result["status"] != "ok": + log.error("Daily append failed: %s", result.get("error")) + sys.exit(1) + + print(json.dumps(result, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/weekly_collector.py b/weekly_collector.py index d1afe52..a06f907 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -257,6 +257,19 @@ def _run_phase1(config: dict, args: argparse.Namespace) -> dict: logger.error("Feature store compute failed: %s", e) results["collectors"]["features"] = {"status": "error", "error": str(e)} + # ── 8. ArcticDB universe rebuild ───────────────────────────────────────── + if only in (None, "arcticdb"): + logger.info("=" * 60) + logger.info("REBUILDING: ArcticDB universe (full backfill)") + logger.info("=" * 60) + try: + from builders.backfill import backfill + arctic_result = backfill(bucket=bucket, dry_run=dry_run) + results["collectors"]["arcticdb"] = arctic_result + except Exception as e: + logger.error("ArcticDB backfill failed: %s", e) + results["collectors"]["arcticdb"] = {"status": "error", "error": str(e)} + # ── Finalize ───────────────────────────────────────────────────────────── results["completed_at"] = datetime.now(timezone.utc).isoformat() _finalize(results, bucket, market_prefix, run_date, dry_run, only) @@ -365,6 +378,22 @@ def _run_daily(config: dict, args: argparse.Namespace) -> dict: logger.error("Feature store compute failed: %s", e) results["collectors"]["features"] = {"status": "error", "error": str(e)} + # ── ArcticDB daily append ──────────────────────────────────────────────── + logger.info("=" * 60) + logger.info("APPENDING: ArcticDB universe (daily)") + logger.info("=" * 60) + try: + from builders.daily_append import daily_append + arctic_result = daily_append( + date_str=run_date, + bucket=bucket, + dry_run=dry_run, + ) + results["collectors"]["arcticdb"] = arctic_result + except Exception as e: + logger.error("ArcticDB daily append failed: %s", e) + results["collectors"]["arcticdb"] = {"status": "error", "error": str(e)} + results["completed_at"] = datetime.now(timezone.utc).isoformat() # Status @@ -544,7 +573,7 @@ def _parse_args() -> argparse.Namespace: ) parser.add_argument( "--only", - choices=["constituents", "prices", "slim", "macro", "universe_returns", "alternative", "daily_closes"], + choices=["constituents", "prices", "slim", "macro", "universe_returns", "alternative", "daily_closes", "features", "arcticdb"], help="Run a single collector instead of all", ) parser.add_argument(