From c432635b76a21d6fb8078a23a97e842acbd91d4b Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Sun, 10 May 2026 08:43:16 -0700 Subject: [PATCH 1/2] feat(daily_closes): TWO + HYOAS forward-only daily ingestion (Stage 2.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 2.5 of the regime-conditioning rebuild (plan doc: alpha-engine-docs/private/regime-conditioning-260510.md). Adds two FRED series to the daily_closes ingestion map: - TWO → DGS2 (2Y constant-maturity treasury, percent) - HYOAS → BAMLH0A0HYM2 (ICE BofA US HY Index Option-Adjusted Spread, percent) Why this matters: the predictor's macro feature set currently includes yield curve slope as 10Y-3M (TNX-IRX), which is the cyclical curve. Adding DGS2 enables the 10Y-2Y curve (recession-focused canonical) as a parallel macro. HY OAS is a major regime indicator that VIX misses — credit typically widens before vol spikes in macro-driven cycles, and stays wide during recoveries when vol has already calmed. Real institutional risk-factor models (AQR, Two Sigma, BlackRock Aladdin) include credit spreads in their regime taxonomy. Scope discipline: this PR is forward-only daily collection. Historical backfill (which requires a new FRED history fetcher — current ``_fetch_fred_closes`` is single-latest-only) is gated as Stage 2.5b. Predictor-side wiring (extending compute_features + regime_predictor + MACRO_NORM_FEATURES with the new symbols) is gated as Stage 2c. What ships: - _FRED_INDEX_MAP: TWO + HYOAS entries - config.yaml.example: TWO + HYOAS in always_download - 5 tests locking the new mappings + the no-caret-for-FRED-only contract Operator follow-up: - Update production config.yaml to include TWO + HYOAS in always_download (config.yaml is gitignored; this PR only updates the example file). - Stage 2.5b PR will add FRED history fetcher + run historical backfill before Stage 2c (predictor consumption) lands. Without that, predictor training has no historical TWO/HYOAS data to learn from — only forward-collected rows from this PR's deploy date onward. 633 tests pass + 1 skipped (5 new + 628 existing). Co-Authored-By: Claude Opus 4.7 (1M context) --- collectors/daily_closes.py | 17 ++++++++- config.yaml.example | 5 +++ tests/test_dgs2_hyoas_ingestion.py | 56 ++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 tests/test_dgs2_hyoas_ingestion.py diff --git a/collectors/daily_closes.py b/collectors/daily_closes.py index d5651c1..5e0d891 100644 --- a/collectors/daily_closes.py +++ b/collectors/daily_closes.py @@ -60,12 +60,27 @@ # Map our ArcticDB ticker key (after stripping ^) to FRED series id. # Both yfinance (^VIX, ^TNX, ...) and FRED (VIXCLS, DGS10, ...) publish # these in the same scale (raw index level for VIX/VIX3M, percent for -# TNX/IRX), so no conversion is needed before appending to ArcticDB. +# TNX/IRX/TWO/HYOAS), so no conversion is needed before appending to +# ArcticDB. +# +# TWO + HYOAS added 2026-05-10 (Stage 2.5 of regime-conditioning rebuild +# — plan doc: alpha-engine-docs/private/regime-conditioning-260510.md). +# Both are FRED-only (no yfinance proxy), so they only flow through the +# FRED fallback path. Historical backfill is gated on a follow-up PR +# adding a FRED history fetcher; this PR begins forward-only collection. _FRED_INDEX_MAP = { "VIX": "VIXCLS", "VIX3M": "VXVCLS", "TNX": "DGS10", "IRX": "DTB3", + # 2Y treasury — enables 10Y-2Y curve slope (recession-focused canonical) + # alongside the existing 10Y-3M (TNX-IRX cyclical). + "TWO": "DGS2", + # ICE BofA US High Yield Index Option-Adjusted Spread, percent. + # Major regime indicator that VIX misses — credit widens before vol + # spikes in many cycles, and stays wide during recoveries when vol + # has already calmed. Institutional risk-factor models include it. + "HYOAS": "BAMLH0A0HYM2", } diff --git a/config.yaml.example b/config.yaml.example index 604eeac..d4d33c9 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -93,12 +93,17 @@ caret_symbols: - IRX # Always-download tickers (benchmarks, macro, sector ETFs) +# TWO + HYOAS added 2026-05-10 (Stage 2.5 regime-conditioning rebuild) — +# FRED-only, no yfinance caret. Forward-only collection from this date; +# historical backfill is a follow-up PR. always_download: - SPY - VIX - VIX3M - TNX - IRX + - TWO + - HYOAS - GLD - USO - XLK diff --git a/tests/test_dgs2_hyoas_ingestion.py b/tests/test_dgs2_hyoas_ingestion.py new file mode 100644 index 0000000..7d6b22a --- /dev/null +++ b/tests/test_dgs2_hyoas_ingestion.py @@ -0,0 +1,56 @@ +"""Tests for Stage 2.5 of the regime-conditioning rebuild — DGS2 (2Y +treasury) and HY OAS credit spread added to daily_closes._FRED_INDEX_MAP +for forward-only daily ingestion. Historical backfill is a follow-up PR. + +Plan doc: ~/Development/alpha-engine-docs/private/regime-conditioning-260510.md + +These tests lock the contract that: +- TWO maps to FRED series DGS2 (2Y constant maturity treasury) +- HYOAS maps to FRED series BAMLH0A0HYM2 (HY OAS, percent) +- Both are exposed via the same FRED-fallback path the existing index + tickers use, so daily_closes will write parquet records when polygon + doesn't carry the symbol. +""" +from __future__ import annotations + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from collectors.daily_closes import _FRED_INDEX_MAP + + +class TestFredIndexMapAdditions: + + def test_two_maps_to_dgs2(self): + # 2Y treasury — enables 10Y-2Y curve slope (recession-canonical) + # alongside the existing 10Y-3M (TNX-IRX cyclical). + assert _FRED_INDEX_MAP.get("TWO") == "DGS2" + + def test_hyoas_maps_to_bamlh0a0hym2(self): + # ICE BofA US High Yield Index Option-Adjusted Spread, percent. + # Major regime indicator that VIX misses — credit widens before + # vol spikes in many cycles. + assert _FRED_INDEX_MAP.get("HYOAS") == "BAMLH0A0HYM2" + + def test_existing_mappings_unchanged(self): + # Regression: ensure the Stage 2.5 additions didn't perturb the + # existing mappings. + assert _FRED_INDEX_MAP["VIX"] == "VIXCLS" + assert _FRED_INDEX_MAP["VIX3M"] == "VXVCLS" + assert _FRED_INDEX_MAP["TNX"] == "DGS10" + assert _FRED_INDEX_MAP["IRX"] == "DTB3" + + def test_no_yfinance_caret_for_fred_only_symbols(self): + # TWO and HYOAS are FRED-only — no yfinance caret prefix should + # be configured for them. Verifies the integration path + # routes them through FRED fallback only. + from collectors.prices import _CARET_SYMBOLS + assert "TWO" not in _CARET_SYMBOLS + assert "HYOAS" not in _CARET_SYMBOLS + + def test_index_map_total_size(self): + # Stage 2.5 adds exactly 2 entries — TWO + HYOAS. Lock so a + # future drive-by addition doesn't slip through unreviewed. + assert len(_FRED_INDEX_MAP) == 6 From a2160152853437e3ea3d32fc327c8ce175abdbdc Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Sun, 10 May 2026 11:09:08 -0700 Subject: [PATCH 2/2] feat(collectors): FRED date-range history fetcher (Stage 2.5b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 2.5b of the regime-conditioning rebuild (plan doc: alpha-engine-docs/private/regime-conditioning-260510.md). Adds the historical-fetch counterpart to ``daily_closes._fetch_fred_closes`` (single-latest-only) so FRED-only macro symbols (TWO, HYOAS, and any future FRED series) can populate the 10-year price-cache parquets the predictor reads. Without this, Stage 2.5's daily ingestion of TWO + HYOAS only collects data going forward — predictor training has no historical data to learn from. This module unblocks Stage 2c-full (10Y-2Y curve + HY OAS macro features in regime_predictor.build_features). What ships: - ``collectors/fred_history.py``: - ``FRED_HISTORY_MAP``: TWO → DGS2, HYOAS → BAMLH0A0HYM2 - ``fetch_fred_history(series_id, period_years=10)``: date-range FRED API call with retries + missing-value handling - ``fred_history_to_ohlcv(df_fred)``: schema reshape so output matches the parquet contract yfinance produces (Open/High/Low/Close /Adj_Close/Volume/VWAP/source). FRED single-value → OHLC replicate, Volume=0, VWAP=None, source="fred" - ``backfill_to_s3(bucket, tickers, dry_run)``: orchestrator — fetches each ticker, reshapes, writes parquet, uploads to S3. Per-ticker error isolation; partial completion reported via ``status="partial"`` rather than raising - CLI entry point: ``python -m collectors.fred_history --bucket ... --dry-run`` Tests: - 20 new tests covering: FRED_HISTORY_MAP contract, fetch parse + retries + missing handling + no-API-key error, OHLCV schema invariant, backfill orchestration including dry-run, unknown-ticker validation, per-ticker error isolation, S3 upload path. Operator follow-up (post-merge, after Stage 2.5 PR #203 also lands): - Run ``python -m collectors.fred_history --bucket alpha-engine-research`` to seed historical TWO.parquet + HYOAS.parquet in s3://alpha-engine-research/predictor/price_cache/. - Verify with ``aws s3 ls s3://alpha-engine-research/predictor/price_cache/ | grep -E "(TWO|HYOAS)"``. - After backfill completes, Stage 2c-full can ship the predictor-side consumption (yield_curve_10y_2y, hy_oas_level, hy_oas_change_21d). Stacks on Stage 2.5 (alpha-engine-data PR #203). Merge order: #203 first, then this PR. Both can also merge in either order — they touch different files. 653 tests pass + 1 skipped (20 new + 633 existing). Co-Authored-By: Claude Opus 4.7 (1M context) --- collectors/fred_history.py | 312 +++++++++++++++++++++++++++++ tests/test_fred_history_fetcher.py | 283 ++++++++++++++++++++++++++ 2 files changed, 595 insertions(+) create mode 100644 collectors/fred_history.py create mode 100644 tests/test_fred_history_fetcher.py diff --git a/collectors/fred_history.py b/collectors/fred_history.py new file mode 100644 index 0000000..de5a2f1 --- /dev/null +++ b/collectors/fred_history.py @@ -0,0 +1,312 @@ +""" +collectors/fred_history.py — FRED date-range time-series fetcher. + +Stage 2.5b of the regime-conditioning rebuild (plan doc: +alpha-engine-docs/private/regime-conditioning-260510.md). Provides the +historical-fetch counterpart to ``collectors/daily_closes._fetch_fred_closes`` +(which is single-latest-only) so FRED-only macro symbols (TWO, HYOAS, +and any future FRED series) can populate the 10-year price-cache +parquets the predictor reads. + +The yfinance refresh path in ``collectors/prices.py`` covers symbols +yfinance carries (^VIX/^VIX3M/^TNX/^IRX); this module covers the FRED- +only ones (DGS2 → TWO, BAMLH0A0HYM2 → HYOAS). + +Output schema matches what ``predictor`` reads from ``predictor/price_cache/``: +DatetimeIndex, columns = [Open, High, Low, Close, Adj_Close, Volume, +VWAP, source]. FRED publishes a single value per date; we replicate it +to OHLC and emit Volume=0 + VWAP=None to match the existing per-day +parquet contract. +""" + +from __future__ import annotations + +import logging +import os +import tempfile +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Optional + +import boto3 +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + +_FRED_BASE = "https://api.stlouisfed.org/fred/series/observations" +_FRED_TIMEOUT = 30 # longer than _fred_latest's 15s — date-range responses are bigger + +# FRED-only symbols that need historical backfill via this module. Map +# our parquet/ArcticDB ticker key → FRED series id. Mirrors the +# Stage 2.5 entries in ``collectors/daily_closes._FRED_INDEX_MAP`` for +# the symbols not on yfinance. Kept separate so changes here don't +# impact daily_closes' single-latest fetch behaviour. +FRED_HISTORY_MAP: dict[str, str] = { + "TWO": "DGS2", + "HYOAS": "BAMLH0A0HYM2", +} + + +def fetch_fred_history( + series_id: str, + period_years: int = 10, + api_key: str | None = None, +) -> pd.DataFrame: + """Fetch a multi-year date-range time series from FRED. + + Args: + series_id: FRED series identifier (e.g., ``"DGS2"`` or + ``"BAMLH0A0HYM2"``). + period_years: trailing window in years. Default 10 to match the + yfinance refresh ``period="10y"``. + api_key: optional override; defaults to ``FRED_API_KEY`` env var. + + Returns: + DataFrame indexed by date (DatetimeIndex, ascending) with a + single ``value`` column of floats. Missing values (FRED's "." + marker) are dropped. Raises ``RuntimeError`` if no API key is + available or the request fails after retries. + """ + if api_key is None: + api_key = os.environ.get("FRED_API_KEY", "") + if not api_key: + raise RuntimeError( + "FRED_API_KEY not set — cannot fetch historical FRED series. " + "Set the env var or pass api_key explicitly." + ) + + end_date = datetime.now(timezone.utc).date() + start_date = end_date - timedelta(days=int(period_years * 365.25) + 7) + + params = { + "series_id": series_id, + "api_key": api_key, + "file_type": "json", + "observation_start": start_date.isoformat(), + "observation_end": end_date.isoformat(), + "sort_order": "asc", + } + + last_err: Exception | None = None + for attempt in range(1, 4): + try: + resp = requests.get(_FRED_BASE, params=params, timeout=_FRED_TIMEOUT) + resp.raise_for_status() + payload = resp.json() + obs = payload.get("observations", []) + if not obs: + raise RuntimeError( + f"FRED returned no observations for {series_id} " + f"in [{start_date}, {end_date}]" + ) + + rows = [] + for o in obs: + val = o.get("value", ".") + if val == "." or val is None: + continue + try: + rows.append((pd.Timestamp(o["date"]), float(val))) + except (KeyError, ValueError): + continue + + if not rows: + raise RuntimeError( + f"FRED {series_id}: every observation in window was " + f"missing or unparseable" + ) + + df = pd.DataFrame(rows, columns=["date", "value"]).set_index("date") + df = df.sort_index() + logger.info( + "FRED history %s: %d observations from %s to %s", + series_id, len(df), df.index.min().date(), df.index.max().date(), + ) + return df + + except requests.exceptions.RequestException as e: + last_err = e + if attempt < 3: + logger.warning( + "FRED %s history attempt %d failed: %s — retrying in %ds", + series_id, attempt, e, attempt * 3, + ) + time.sleep(attempt * 3) + else: + logger.error( + "FRED %s history failed after 3 attempts: %s", + series_id, e, + ) + raise RuntimeError( + f"FRED history fetch failed for {series_id} after retries: {last_err}" + ) + + +def fred_history_to_ohlcv( + df_fred: pd.DataFrame, +) -> pd.DataFrame: + """Convert a FRED single-value time series to the OHLCV-shape parquet. + + FRED publishes a single value per date; the predictor's ``compute_features`` + + downstream readers expect parquets with ``[Open, High, Low, Close, + Adj_Close, Volume, VWAP, source]`` columns matching what yfinance produces. + This helper replicates the value to OHLC and emits Volume=0 + VWAP=None + so the schema matches. + + Args: + df_fred: output of ``fetch_fred_history`` — DatetimeIndex with a + single ``value`` column. + + Returns: + OHLCV-shape DataFrame with the same DatetimeIndex. + """ + if "value" not in df_fred.columns: + raise ValueError( + f"Expected 'value' column in FRED DataFrame, got {list(df_fred.columns)}" + ) + val = df_fred["value"].astype(float) + out = pd.DataFrame( + { + "Open": val, + "High": val, + "Low": val, + "Close": val, + "Adj_Close": val, + "Volume": 0, + "VWAP": None, + "source": "fred", + }, + index=df_fred.index, + ) + return out + + +def backfill_to_s3( + bucket: str, + s3_prefix: str = "predictor/price_cache/", + tickers: list[str] | None = None, + period_years: int = 10, + dry_run: bool = False, +) -> dict: + """Backfill TWO + HYOAS (or any subset of ``FRED_HISTORY_MAP``) to S3. + + One-shot operator step. Run after Stage 2.5 ships and before Stage 2c-full + consumes the new parquets. Idempotent — full rewrite each call (matches + the yfinance ``auto_adjust=True`` rewrite semantics). + + Args: + bucket: S3 bucket name (typically ``alpha-engine-research``). + s3_prefix: S3 key prefix; default matches yfinance refresh path. + tickers: subset of ``FRED_HISTORY_MAP`` keys; default = all. + period_years: trailing history window. + dry_run: if True, fetch but skip S3 upload. + + Returns: + dict with status, refreshed count, and per-ticker row counts. + """ + if tickers is None: + tickers = sorted(FRED_HISTORY_MAP.keys()) + + unknown = [t for t in tickers if t not in FRED_HISTORY_MAP] + if unknown: + raise ValueError( + f"Unknown FRED-history tickers {unknown}. " + f"Known: {sorted(FRED_HISTORY_MAP.keys())}" + ) + + s3 = boto3.client("s3") if not dry_run else None + results: dict[str, dict] = {} + + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = Path(tmpdir) + + for ticker in tickers: + series_id = FRED_HISTORY_MAP[ticker] + try: + fred_df = fetch_fred_history(series_id, period_years=period_years) + ohlcv = fred_history_to_ohlcv(fred_df) + parquet_path = local_dir / f"{ticker}.parquet" + ohlcv.to_parquet(parquet_path, engine="pyarrow", compression="snappy") + results[ticker] = { + "status": "ok", + "rows": len(ohlcv), + "first_date": ohlcv.index.min().date().isoformat(), + "last_date": ohlcv.index.max().date().isoformat(), + } + if not dry_run: + s3.upload_file( + str(parquet_path), + bucket, + f"{s3_prefix}{ticker}.parquet", + ) + logger.info( + "Uploaded s3://%s/%s%s.parquet (%d rows, %s → %s)", + bucket, s3_prefix, ticker, len(ohlcv), + results[ticker]["first_date"], results[ticker]["last_date"], + ) + except Exception as e: + logger.error("Backfill failed for %s (%s): %s", ticker, series_id, e) + results[ticker] = {"status": "error", "error": str(e)} + + n_ok = sum(1 for r in results.values() if r["status"] == "ok") + return { + "status": "ok" if n_ok == len(tickers) else "partial", + "refreshed": n_ok, + "total": len(tickers), + "per_ticker": results, + "dry_run": dry_run, + } + + +def main(): + """CLI entry point — one-shot backfill of all FRED_HISTORY_MAP tickers. + + Usage: + python -m collectors.fred_history --bucket alpha-engine-research + python -m collectors.fred_history --dry-run + python -m collectors.fred_history --tickers TWO HYOAS --period-years 10 + """ + import argparse + import json + import sys + + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") + + parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0]) + parser.add_argument( + "--bucket", default="alpha-engine-research", + help="S3 bucket name. Default: alpha-engine-research", + ) + parser.add_argument( + "--prefix", default="predictor/price_cache/", + help="S3 prefix. Default: predictor/price_cache/", + ) + parser.add_argument( + "--tickers", nargs="+", default=None, + help="Subset of FRED_HISTORY_MAP keys. Default: all.", + ) + parser.add_argument( + "--period-years", type=int, default=10, + help="Trailing history window. Default 10 (matches yfinance).", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Fetch but skip S3 upload.", + ) + args = parser.parse_args() + + result = backfill_to_s3( + bucket=args.bucket, + s3_prefix=args.prefix, + tickers=args.tickers, + period_years=args.period_years, + dry_run=args.dry_run, + ) + print(json.dumps(result, indent=2, default=str)) + sys.exit(0 if result["status"] == "ok" else 2) + + +if __name__ == "__main__": + main() diff --git a/tests/test_fred_history_fetcher.py b/tests/test_fred_history_fetcher.py new file mode 100644 index 0000000..1c04158 --- /dev/null +++ b/tests/test_fred_history_fetcher.py @@ -0,0 +1,283 @@ +"""Tests for ``collectors/fred_history.py`` — Stage 2.5b of regime- +conditioning rebuild. + +Validates: +- ``fetch_fred_history`` retry + parse contract (mocked HTTP) +- ``fred_history_to_ohlcv`` schema invariant matches yfinance parquet shape +- ``backfill_to_s3`` orchestration (mocked S3 + FRED) +- ``FRED_HISTORY_MAP`` covers TWO + HYOAS + +Plan doc: ~/Development/alpha-engine-docs/private/regime-conditioning-260510.md +""" +from __future__ import annotations + +import json +import os +import sys +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from collectors.fred_history import ( + FRED_HISTORY_MAP, + backfill_to_s3, + fetch_fred_history, + fred_history_to_ohlcv, +) + + +# ── FRED_HISTORY_MAP contract ────────────────────────────────────────── + + +class TestFredHistoryMap: + + def test_two_maps_to_dgs2(self): + assert FRED_HISTORY_MAP["TWO"] == "DGS2" + + def test_hyoas_maps_to_bamlh0a0hym2(self): + assert FRED_HISTORY_MAP["HYOAS"] == "BAMLH0A0HYM2" + + def test_no_unintended_entries(self): + # Stage 2.5b adds exactly TWO + HYOAS. Lock so a future drive-by + # addition doesn't slip through unreviewed. + assert set(FRED_HISTORY_MAP.keys()) == {"TWO", "HYOAS"} + + +# ── fetch_fred_history ────────────────────────────────────────────────── + + +def _mock_fred_response(observations: list[dict]) -> MagicMock: + """Build a mocked requests.Response carrying a FRED API payload.""" + resp = MagicMock() + resp.status_code = 200 + resp.raise_for_status = MagicMock() + resp.json = MagicMock(return_value={"observations": observations}) + return resp + + +class TestFetchFredHistory: + + def test_basic_parse(self): + observations = [ + {"date": "2018-01-02", "value": "1.95"}, + {"date": "2018-01-03", "value": "2.01"}, + {"date": "2018-01-04", "value": "2.10"}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + df = fetch_fred_history("DGS2", api_key="fake") + assert len(df) == 3 + assert list(df.columns) == ["value"] + assert df.index[0] == pd.Timestamp("2018-01-02") + assert df["value"].iloc[0] == 1.95 + assert df["value"].iloc[-1] == 2.10 + + def test_drops_missing_observations(self): + # FRED uses "." as the missing-value marker. + observations = [ + {"date": "2018-01-02", "value": "1.95"}, + {"date": "2018-01-03", "value": "."}, + {"date": "2018-01-04", "value": "2.10"}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + df = fetch_fred_history("DGS2", api_key="fake") + assert len(df) == 2 + assert df.index.tolist() == [pd.Timestamp("2018-01-02"), pd.Timestamp("2018-01-04")] + + def test_sorts_ascending(self): + # FRED returns asc by request param, but lock the post-condition + # in case server returns out-of-order. + observations = [ + {"date": "2018-01-04", "value": "2.10"}, + {"date": "2018-01-02", "value": "1.95"}, + {"date": "2018-01-03", "value": "2.01"}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + df = fetch_fred_history("DGS2", api_key="fake") + assert df.index.is_monotonic_increasing + + def test_raises_when_no_api_key(self): + original = os.environ.pop("FRED_API_KEY", None) + try: + with pytest.raises(RuntimeError, match="FRED_API_KEY not set"): + fetch_fred_history("DGS2") + finally: + if original is not None: + os.environ["FRED_API_KEY"] = original + + def test_raises_when_no_observations(self): + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response([])): + with pytest.raises(RuntimeError, match="no observations"): + fetch_fred_history("DGS2", api_key="fake") + + def test_raises_when_all_missing(self): + observations = [ + {"date": "2018-01-02", "value": "."}, + {"date": "2018-01-03", "value": "."}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + with pytest.raises(RuntimeError, match="every observation"): + fetch_fred_history("DGS2", api_key="fake") + + def test_retries_on_request_exception(self): + import requests as _requests + call_count = [0] + + def flaky_get(*args, **kwargs): + call_count[0] += 1 + if call_count[0] < 3: + raise _requests.exceptions.ConnectionError("transient") + return _mock_fred_response([{"date": "2018-01-02", "value": "1.95"}]) + + with patch("collectors.fred_history.requests.get", side_effect=flaky_get), \ + patch("collectors.fred_history.time.sleep"): + df = fetch_fred_history("DGS2", api_key="fake") + assert len(df) == 1 + assert call_count[0] == 3 + + +# ── fred_history_to_ohlcv ────────────────────────────────────────────── + + +class TestFredHistoryToOhlcv: + + def test_schema_matches_yfinance_parquet_shape(self): + # Predictor reads parquets with these columns. The OHLCV reshape + # MUST emit them so cfg.MOMENTUM_FEATURES + cfg.VOLATILITY_FEATURES + # downstream readers don't trip on missing columns. + df_fred = pd.DataFrame( + {"value": [1.95, 2.01, 2.10]}, + index=pd.DatetimeIndex(["2018-01-02", "2018-01-03", "2018-01-04"]), + ) + out = fred_history_to_ohlcv(df_fred) + for col in ("Open", "High", "Low", "Close", "Adj_Close", "Volume", "VWAP", "source"): + assert col in out.columns + + def test_ohlc_replicate_value(self): + df_fred = pd.DataFrame( + {"value": [1.95, 2.01]}, + index=pd.DatetimeIndex(["2018-01-02", "2018-01-03"]), + ) + out = fred_history_to_ohlcv(df_fred) + # FRED single-value → all OHLC the same, no intra-day range. + assert (out["Open"] == out["Close"]).all() + assert (out["High"] == out["Close"]).all() + assert (out["Low"] == out["Close"]).all() + assert out["Close"].iloc[0] == 1.95 + + def test_volume_is_zero_vwap_is_none(self): + df_fred = pd.DataFrame( + {"value": [1.95]}, + index=pd.DatetimeIndex(["2018-01-02"]), + ) + out = fred_history_to_ohlcv(df_fred) + assert out["Volume"].iloc[0] == 0 + assert out["VWAP"].iloc[0] is None + + def test_source_is_fred(self): + df_fred = pd.DataFrame( + {"value": [1.95]}, + index=pd.DatetimeIndex(["2018-01-02"]), + ) + out = fred_history_to_ohlcv(df_fred) + assert out["source"].iloc[0] == "fred" + + def test_raises_when_value_column_missing(self): + df = pd.DataFrame({"foo": [1.0]}, index=pd.DatetimeIndex(["2018-01-02"])) + with pytest.raises(ValueError, match="Expected 'value' column"): + fred_history_to_ohlcv(df) + + +# ── backfill_to_s3 ───────────────────────────────────────────────────── + + +class TestBackfillToS3: + + def _patch_fetch_returning(self, value: float = 1.95, n_rows: int = 100): + df = pd.DataFrame( + {"value": [value] * n_rows}, + index=pd.date_range("2018-01-02", periods=n_rows, freq="B"), + ) + return patch( + "collectors.fred_history.fetch_fred_history", + return_value=df, + ) + + def test_dry_run_skips_s3_upload(self): + with self._patch_fetch_returning(): + result = backfill_to_s3( + bucket="test-bucket", + tickers=["TWO"], + dry_run=True, + ) + assert result["status"] == "ok" + assert result["dry_run"] is True + assert result["refreshed"] == 1 + assert result["per_ticker"]["TWO"]["status"] == "ok" + + def test_unknown_ticker_raises(self): + with pytest.raises(ValueError, match="Unknown FRED-history tickers"): + backfill_to_s3( + bucket="test-bucket", + tickers=["UNKNOWN"], + dry_run=True, + ) + + def test_default_tickers_is_all_known(self): + with self._patch_fetch_returning(): + result = backfill_to_s3( + bucket="test-bucket", + tickers=None, # → all known + dry_run=True, + ) + assert result["total"] == len(FRED_HISTORY_MAP) + for ticker in FRED_HISTORY_MAP: + assert ticker in result["per_ticker"] + + def test_per_ticker_error_does_not_abort_others(self): + # First ticker fails; second succeeds. backfill should report + # partial status, not raise. + df_ok = pd.DataFrame( + {"value": [1.95] * 10}, + index=pd.date_range("2018-01-02", periods=10, freq="B"), + ) + + def maybe_fail(series_id, period_years=10, api_key=None): + if series_id == "DGS2": + raise RuntimeError("synthetic failure for TWO") + return df_ok + + with patch("collectors.fred_history.fetch_fred_history", side_effect=maybe_fail): + result = backfill_to_s3( + bucket="test-bucket", + tickers=["TWO", "HYOAS"], + dry_run=True, + ) + assert result["status"] == "partial" + assert result["refreshed"] == 1 + assert result["per_ticker"]["TWO"]["status"] == "error" + assert result["per_ticker"]["HYOAS"]["status"] == "ok" + + def test_uploads_to_s3_when_not_dry_run(self): + with self._patch_fetch_returning(), \ + patch("collectors.fred_history.boto3.client") as mock_boto: + mock_s3 = MagicMock() + mock_boto.return_value = mock_s3 + result = backfill_to_s3( + bucket="test-bucket", + tickers=["TWO"], + dry_run=False, + ) + assert result["dry_run"] is False + # upload_file called with (local_path, bucket, key) + assert mock_s3.upload_file.called + call = mock_s3.upload_file.call_args + assert call[0][1] == "test-bucket" + assert call[0][2] == "predictor/price_cache/TWO.parquet"