diff --git a/collectors/fred_history.py b/collectors/fred_history.py new file mode 100644 index 0000000..de5a2f1 --- /dev/null +++ b/collectors/fred_history.py @@ -0,0 +1,312 @@ +""" +collectors/fred_history.py — FRED date-range time-series fetcher. + +Stage 2.5b of the regime-conditioning rebuild (plan doc: +alpha-engine-docs/private/regime-conditioning-260510.md). Provides the +historical-fetch counterpart to ``collectors/daily_closes._fetch_fred_closes`` +(which is single-latest-only) so FRED-only macro symbols (TWO, HYOAS, +and any future FRED series) can populate the 10-year price-cache +parquets the predictor reads. + +The yfinance refresh path in ``collectors/prices.py`` covers symbols +yfinance carries (^VIX/^VIX3M/^TNX/^IRX); this module covers the FRED- +only ones (DGS2 → TWO, BAMLH0A0HYM2 → HYOAS). + +Output schema matches what ``predictor`` reads from ``predictor/price_cache/``: +DatetimeIndex, columns = [Open, High, Low, Close, Adj_Close, Volume, +VWAP, source]. FRED publishes a single value per date; we replicate it +to OHLC and emit Volume=0 + VWAP=None to match the existing per-day +parquet contract. +""" + +from __future__ import annotations + +import logging +import os +import tempfile +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Optional + +import boto3 +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + +_FRED_BASE = "https://api.stlouisfed.org/fred/series/observations" +_FRED_TIMEOUT = 30 # longer than _fred_latest's 15s — date-range responses are bigger + +# FRED-only symbols that need historical backfill via this module. Map +# our parquet/ArcticDB ticker key → FRED series id. Mirrors the +# Stage 2.5 entries in ``collectors/daily_closes._FRED_INDEX_MAP`` for +# the symbols not on yfinance. Kept separate so changes here don't +# impact daily_closes' single-latest fetch behaviour. +FRED_HISTORY_MAP: dict[str, str] = { + "TWO": "DGS2", + "HYOAS": "BAMLH0A0HYM2", +} + + +def fetch_fred_history( + series_id: str, + period_years: int = 10, + api_key: str | None = None, +) -> pd.DataFrame: + """Fetch a multi-year date-range time series from FRED. + + Args: + series_id: FRED series identifier (e.g., ``"DGS2"`` or + ``"BAMLH0A0HYM2"``). + period_years: trailing window in years. Default 10 to match the + yfinance refresh ``period="10y"``. + api_key: optional override; defaults to ``FRED_API_KEY`` env var. + + Returns: + DataFrame indexed by date (DatetimeIndex, ascending) with a + single ``value`` column of floats. Missing values (FRED's "." + marker) are dropped. Raises ``RuntimeError`` if no API key is + available or the request fails after retries. + """ + if api_key is None: + api_key = os.environ.get("FRED_API_KEY", "") + if not api_key: + raise RuntimeError( + "FRED_API_KEY not set — cannot fetch historical FRED series. " + "Set the env var or pass api_key explicitly." + ) + + end_date = datetime.now(timezone.utc).date() + start_date = end_date - timedelta(days=int(period_years * 365.25) + 7) + + params = { + "series_id": series_id, + "api_key": api_key, + "file_type": "json", + "observation_start": start_date.isoformat(), + "observation_end": end_date.isoformat(), + "sort_order": "asc", + } + + last_err: Exception | None = None + for attempt in range(1, 4): + try: + resp = requests.get(_FRED_BASE, params=params, timeout=_FRED_TIMEOUT) + resp.raise_for_status() + payload = resp.json() + obs = payload.get("observations", []) + if not obs: + raise RuntimeError( + f"FRED returned no observations for {series_id} " + f"in [{start_date}, {end_date}]" + ) + + rows = [] + for o in obs: + val = o.get("value", ".") + if val == "." or val is None: + continue + try: + rows.append((pd.Timestamp(o["date"]), float(val))) + except (KeyError, ValueError): + continue + + if not rows: + raise RuntimeError( + f"FRED {series_id}: every observation in window was " + f"missing or unparseable" + ) + + df = pd.DataFrame(rows, columns=["date", "value"]).set_index("date") + df = df.sort_index() + logger.info( + "FRED history %s: %d observations from %s to %s", + series_id, len(df), df.index.min().date(), df.index.max().date(), + ) + return df + + except requests.exceptions.RequestException as e: + last_err = e + if attempt < 3: + logger.warning( + "FRED %s history attempt %d failed: %s — retrying in %ds", + series_id, attempt, e, attempt * 3, + ) + time.sleep(attempt * 3) + else: + logger.error( + "FRED %s history failed after 3 attempts: %s", + series_id, e, + ) + raise RuntimeError( + f"FRED history fetch failed for {series_id} after retries: {last_err}" + ) + + +def fred_history_to_ohlcv( + df_fred: pd.DataFrame, +) -> pd.DataFrame: + """Convert a FRED single-value time series to the OHLCV-shape parquet. + + FRED publishes a single value per date; the predictor's ``compute_features`` + + downstream readers expect parquets with ``[Open, High, Low, Close, + Adj_Close, Volume, VWAP, source]`` columns matching what yfinance produces. + This helper replicates the value to OHLC and emits Volume=0 + VWAP=None + so the schema matches. + + Args: + df_fred: output of ``fetch_fred_history`` — DatetimeIndex with a + single ``value`` column. + + Returns: + OHLCV-shape DataFrame with the same DatetimeIndex. + """ + if "value" not in df_fred.columns: + raise ValueError( + f"Expected 'value' column in FRED DataFrame, got {list(df_fred.columns)}" + ) + val = df_fred["value"].astype(float) + out = pd.DataFrame( + { + "Open": val, + "High": val, + "Low": val, + "Close": val, + "Adj_Close": val, + "Volume": 0, + "VWAP": None, + "source": "fred", + }, + index=df_fred.index, + ) + return out + + +def backfill_to_s3( + bucket: str, + s3_prefix: str = "predictor/price_cache/", + tickers: list[str] | None = None, + period_years: int = 10, + dry_run: bool = False, +) -> dict: + """Backfill TWO + HYOAS (or any subset of ``FRED_HISTORY_MAP``) to S3. + + One-shot operator step. Run after Stage 2.5 ships and before Stage 2c-full + consumes the new parquets. Idempotent — full rewrite each call (matches + the yfinance ``auto_adjust=True`` rewrite semantics). + + Args: + bucket: S3 bucket name (typically ``alpha-engine-research``). + s3_prefix: S3 key prefix; default matches yfinance refresh path. + tickers: subset of ``FRED_HISTORY_MAP`` keys; default = all. + period_years: trailing history window. + dry_run: if True, fetch but skip S3 upload. + + Returns: + dict with status, refreshed count, and per-ticker row counts. + """ + if tickers is None: + tickers = sorted(FRED_HISTORY_MAP.keys()) + + unknown = [t for t in tickers if t not in FRED_HISTORY_MAP] + if unknown: + raise ValueError( + f"Unknown FRED-history tickers {unknown}. " + f"Known: {sorted(FRED_HISTORY_MAP.keys())}" + ) + + s3 = boto3.client("s3") if not dry_run else None + results: dict[str, dict] = {} + + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = Path(tmpdir) + + for ticker in tickers: + series_id = FRED_HISTORY_MAP[ticker] + try: + fred_df = fetch_fred_history(series_id, period_years=period_years) + ohlcv = fred_history_to_ohlcv(fred_df) + parquet_path = local_dir / f"{ticker}.parquet" + ohlcv.to_parquet(parquet_path, engine="pyarrow", compression="snappy") + results[ticker] = { + "status": "ok", + "rows": len(ohlcv), + "first_date": ohlcv.index.min().date().isoformat(), + "last_date": ohlcv.index.max().date().isoformat(), + } + if not dry_run: + s3.upload_file( + str(parquet_path), + bucket, + f"{s3_prefix}{ticker}.parquet", + ) + logger.info( + "Uploaded s3://%s/%s%s.parquet (%d rows, %s → %s)", + bucket, s3_prefix, ticker, len(ohlcv), + results[ticker]["first_date"], results[ticker]["last_date"], + ) + except Exception as e: + logger.error("Backfill failed for %s (%s): %s", ticker, series_id, e) + results[ticker] = {"status": "error", "error": str(e)} + + n_ok = sum(1 for r in results.values() if r["status"] == "ok") + return { + "status": "ok" if n_ok == len(tickers) else "partial", + "refreshed": n_ok, + "total": len(tickers), + "per_ticker": results, + "dry_run": dry_run, + } + + +def main(): + """CLI entry point — one-shot backfill of all FRED_HISTORY_MAP tickers. + + Usage: + python -m collectors.fred_history --bucket alpha-engine-research + python -m collectors.fred_history --dry-run + python -m collectors.fred_history --tickers TWO HYOAS --period-years 10 + """ + import argparse + import json + import sys + + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") + + parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0]) + parser.add_argument( + "--bucket", default="alpha-engine-research", + help="S3 bucket name. Default: alpha-engine-research", + ) + parser.add_argument( + "--prefix", default="predictor/price_cache/", + help="S3 prefix. Default: predictor/price_cache/", + ) + parser.add_argument( + "--tickers", nargs="+", default=None, + help="Subset of FRED_HISTORY_MAP keys. Default: all.", + ) + parser.add_argument( + "--period-years", type=int, default=10, + help="Trailing history window. Default 10 (matches yfinance).", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Fetch but skip S3 upload.", + ) + args = parser.parse_args() + + result = backfill_to_s3( + bucket=args.bucket, + s3_prefix=args.prefix, + tickers=args.tickers, + period_years=args.period_years, + dry_run=args.dry_run, + ) + print(json.dumps(result, indent=2, default=str)) + sys.exit(0 if result["status"] == "ok" else 2) + + +if __name__ == "__main__": + main() diff --git a/tests/test_fred_history_fetcher.py b/tests/test_fred_history_fetcher.py new file mode 100644 index 0000000..1c04158 --- /dev/null +++ b/tests/test_fred_history_fetcher.py @@ -0,0 +1,283 @@ +"""Tests for ``collectors/fred_history.py`` — Stage 2.5b of regime- +conditioning rebuild. + +Validates: +- ``fetch_fred_history`` retry + parse contract (mocked HTTP) +- ``fred_history_to_ohlcv`` schema invariant matches yfinance parquet shape +- ``backfill_to_s3`` orchestration (mocked S3 + FRED) +- ``FRED_HISTORY_MAP`` covers TWO + HYOAS + +Plan doc: ~/Development/alpha-engine-docs/private/regime-conditioning-260510.md +""" +from __future__ import annotations + +import json +import os +import sys +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from collectors.fred_history import ( + FRED_HISTORY_MAP, + backfill_to_s3, + fetch_fred_history, + fred_history_to_ohlcv, +) + + +# ── FRED_HISTORY_MAP contract ────────────────────────────────────────── + + +class TestFredHistoryMap: + + def test_two_maps_to_dgs2(self): + assert FRED_HISTORY_MAP["TWO"] == "DGS2" + + def test_hyoas_maps_to_bamlh0a0hym2(self): + assert FRED_HISTORY_MAP["HYOAS"] == "BAMLH0A0HYM2" + + def test_no_unintended_entries(self): + # Stage 2.5b adds exactly TWO + HYOAS. Lock so a future drive-by + # addition doesn't slip through unreviewed. + assert set(FRED_HISTORY_MAP.keys()) == {"TWO", "HYOAS"} + + +# ── fetch_fred_history ────────────────────────────────────────────────── + + +def _mock_fred_response(observations: list[dict]) -> MagicMock: + """Build a mocked requests.Response carrying a FRED API payload.""" + resp = MagicMock() + resp.status_code = 200 + resp.raise_for_status = MagicMock() + resp.json = MagicMock(return_value={"observations": observations}) + return resp + + +class TestFetchFredHistory: + + def test_basic_parse(self): + observations = [ + {"date": "2018-01-02", "value": "1.95"}, + {"date": "2018-01-03", "value": "2.01"}, + {"date": "2018-01-04", "value": "2.10"}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + df = fetch_fred_history("DGS2", api_key="fake") + assert len(df) == 3 + assert list(df.columns) == ["value"] + assert df.index[0] == pd.Timestamp("2018-01-02") + assert df["value"].iloc[0] == 1.95 + assert df["value"].iloc[-1] == 2.10 + + def test_drops_missing_observations(self): + # FRED uses "." as the missing-value marker. + observations = [ + {"date": "2018-01-02", "value": "1.95"}, + {"date": "2018-01-03", "value": "."}, + {"date": "2018-01-04", "value": "2.10"}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + df = fetch_fred_history("DGS2", api_key="fake") + assert len(df) == 2 + assert df.index.tolist() == [pd.Timestamp("2018-01-02"), pd.Timestamp("2018-01-04")] + + def test_sorts_ascending(self): + # FRED returns asc by request param, but lock the post-condition + # in case server returns out-of-order. + observations = [ + {"date": "2018-01-04", "value": "2.10"}, + {"date": "2018-01-02", "value": "1.95"}, + {"date": "2018-01-03", "value": "2.01"}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + df = fetch_fred_history("DGS2", api_key="fake") + assert df.index.is_monotonic_increasing + + def test_raises_when_no_api_key(self): + original = os.environ.pop("FRED_API_KEY", None) + try: + with pytest.raises(RuntimeError, match="FRED_API_KEY not set"): + fetch_fred_history("DGS2") + finally: + if original is not None: + os.environ["FRED_API_KEY"] = original + + def test_raises_when_no_observations(self): + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response([])): + with pytest.raises(RuntimeError, match="no observations"): + fetch_fred_history("DGS2", api_key="fake") + + def test_raises_when_all_missing(self): + observations = [ + {"date": "2018-01-02", "value": "."}, + {"date": "2018-01-03", "value": "."}, + ] + with patch("collectors.fred_history.requests.get", + return_value=_mock_fred_response(observations)): + with pytest.raises(RuntimeError, match="every observation"): + fetch_fred_history("DGS2", api_key="fake") + + def test_retries_on_request_exception(self): + import requests as _requests + call_count = [0] + + def flaky_get(*args, **kwargs): + call_count[0] += 1 + if call_count[0] < 3: + raise _requests.exceptions.ConnectionError("transient") + return _mock_fred_response([{"date": "2018-01-02", "value": "1.95"}]) + + with patch("collectors.fred_history.requests.get", side_effect=flaky_get), \ + patch("collectors.fred_history.time.sleep"): + df = fetch_fred_history("DGS2", api_key="fake") + assert len(df) == 1 + assert call_count[0] == 3 + + +# ── fred_history_to_ohlcv ────────────────────────────────────────────── + + +class TestFredHistoryToOhlcv: + + def test_schema_matches_yfinance_parquet_shape(self): + # Predictor reads parquets with these columns. The OHLCV reshape + # MUST emit them so cfg.MOMENTUM_FEATURES + cfg.VOLATILITY_FEATURES + # downstream readers don't trip on missing columns. + df_fred = pd.DataFrame( + {"value": [1.95, 2.01, 2.10]}, + index=pd.DatetimeIndex(["2018-01-02", "2018-01-03", "2018-01-04"]), + ) + out = fred_history_to_ohlcv(df_fred) + for col in ("Open", "High", "Low", "Close", "Adj_Close", "Volume", "VWAP", "source"): + assert col in out.columns + + def test_ohlc_replicate_value(self): + df_fred = pd.DataFrame( + {"value": [1.95, 2.01]}, + index=pd.DatetimeIndex(["2018-01-02", "2018-01-03"]), + ) + out = fred_history_to_ohlcv(df_fred) + # FRED single-value → all OHLC the same, no intra-day range. + assert (out["Open"] == out["Close"]).all() + assert (out["High"] == out["Close"]).all() + assert (out["Low"] == out["Close"]).all() + assert out["Close"].iloc[0] == 1.95 + + def test_volume_is_zero_vwap_is_none(self): + df_fred = pd.DataFrame( + {"value": [1.95]}, + index=pd.DatetimeIndex(["2018-01-02"]), + ) + out = fred_history_to_ohlcv(df_fred) + assert out["Volume"].iloc[0] == 0 + assert out["VWAP"].iloc[0] is None + + def test_source_is_fred(self): + df_fred = pd.DataFrame( + {"value": [1.95]}, + index=pd.DatetimeIndex(["2018-01-02"]), + ) + out = fred_history_to_ohlcv(df_fred) + assert out["source"].iloc[0] == "fred" + + def test_raises_when_value_column_missing(self): + df = pd.DataFrame({"foo": [1.0]}, index=pd.DatetimeIndex(["2018-01-02"])) + with pytest.raises(ValueError, match="Expected 'value' column"): + fred_history_to_ohlcv(df) + + +# ── backfill_to_s3 ───────────────────────────────────────────────────── + + +class TestBackfillToS3: + + def _patch_fetch_returning(self, value: float = 1.95, n_rows: int = 100): + df = pd.DataFrame( + {"value": [value] * n_rows}, + index=pd.date_range("2018-01-02", periods=n_rows, freq="B"), + ) + return patch( + "collectors.fred_history.fetch_fred_history", + return_value=df, + ) + + def test_dry_run_skips_s3_upload(self): + with self._patch_fetch_returning(): + result = backfill_to_s3( + bucket="test-bucket", + tickers=["TWO"], + dry_run=True, + ) + assert result["status"] == "ok" + assert result["dry_run"] is True + assert result["refreshed"] == 1 + assert result["per_ticker"]["TWO"]["status"] == "ok" + + def test_unknown_ticker_raises(self): + with pytest.raises(ValueError, match="Unknown FRED-history tickers"): + backfill_to_s3( + bucket="test-bucket", + tickers=["UNKNOWN"], + dry_run=True, + ) + + def test_default_tickers_is_all_known(self): + with self._patch_fetch_returning(): + result = backfill_to_s3( + bucket="test-bucket", + tickers=None, # → all known + dry_run=True, + ) + assert result["total"] == len(FRED_HISTORY_MAP) + for ticker in FRED_HISTORY_MAP: + assert ticker in result["per_ticker"] + + def test_per_ticker_error_does_not_abort_others(self): + # First ticker fails; second succeeds. backfill should report + # partial status, not raise. + df_ok = pd.DataFrame( + {"value": [1.95] * 10}, + index=pd.date_range("2018-01-02", periods=10, freq="B"), + ) + + def maybe_fail(series_id, period_years=10, api_key=None): + if series_id == "DGS2": + raise RuntimeError("synthetic failure for TWO") + return df_ok + + with patch("collectors.fred_history.fetch_fred_history", side_effect=maybe_fail): + result = backfill_to_s3( + bucket="test-bucket", + tickers=["TWO", "HYOAS"], + dry_run=True, + ) + assert result["status"] == "partial" + assert result["refreshed"] == 1 + assert result["per_ticker"]["TWO"]["status"] == "error" + assert result["per_ticker"]["HYOAS"]["status"] == "ok" + + def test_uploads_to_s3_when_not_dry_run(self): + with self._patch_fetch_returning(), \ + patch("collectors.fred_history.boto3.client") as mock_boto: + mock_s3 = MagicMock() + mock_boto.return_value = mock_s3 + result = backfill_to_s3( + bucket="test-bucket", + tickers=["TWO"], + dry_run=False, + ) + assert result["dry_run"] is False + # upload_file called with (local_path, bucket, key) + assert mock_s3.upload_file.called + call = mock_s3.upload_file.call_args + assert call[0][1] == "test-bucket" + assert call[0][2] == "predictor/price_cache/TWO.parquet"