diff --git a/data/derived/analyst_revisions.py b/data/derived/analyst_revisions.py index c9fefc8..0398067 100644 --- a/data/derived/analyst_revisions.py +++ b/data/derived/analyst_revisions.py @@ -268,8 +268,7 @@ def load_snapshot_time_series( Canonical shape: list ``{prefix}/{ticker}/`` once + parse each artifact's body to extract ``snapshot_date``; index by that date. Cheaper than per-date GETs since one LIST + small body reads - cover the entire window. Tolerates the legacy - ``{ticker}/{date}.json`` shape during transition. + cover the entire window. """ import json as _json out: dict[Date, dict] = {} diff --git a/data/derived/news_aggregates.py b/data/derived/news_aggregates.py index 2a1a64d..d9ca86d 100644 --- a/data/derived/news_aggregates.py +++ b/data/derived/news_aggregates.py @@ -390,29 +390,23 @@ def write_news_aggregates_parquet( def read_news_aggregates_parquet( - aggregate_date: Date | None = None, *, s3_client: Any, bucket: str = DEFAULT_S3_BUCKET, prefix: str = DEFAULT_S3_PREFIX, ) -> pd.DataFrame: - """Consumer-side read. Resolves the canonical artifact via - ``latest.json`` sidecar; falls back to the legacy - ``{aggregate_date}.parquet`` key shape during the transition - window. + """Consumer-side read. Resolves the canonical artifact via the + ``latest.json`` sidecar. - ``aggregate_date`` is only used for the legacy-shape fallback. - Under the canonical shape, ``latest.json`` always points at the - most recent run regardless of date; the parquet itself carries - ``aggregate_date`` per row so filtering happens at the DataFrame - layer. + ``latest.json`` always points at the most recent run regardless of + date; the parquet itself carries ``aggregate_date`` per row so any + date filtering happens at the DataFrame layer. Returns an empty DataFrame with the canonical schema when no artifact exists. """ from alpha_engine_lib.eval_artifacts import eval_latest_key - # Canonical path: read latest.json → resolve key latest_key = eval_latest_key(prefix) try: obj = s3_client.get_object(Bucket=bucket, Key=latest_key) @@ -423,24 +417,10 @@ def read_news_aggregates_parquet( return pd.read_parquet(BytesIO(body["Body"].read()), engine="pyarrow") except Exception as e: logger.info( - "[news_aggregates] canonical sidecar read failed for %s (%s) — " - "trying legacy date-key fallback", + "[news_aggregates] canonical sidecar read failed for %s (%s)", latest_key, type(e).__name__, ) - # Legacy fallback during transition - if aggregate_date is not None: - legacy_key = f"{prefix}/{aggregate_date.isoformat()}.parquet" - try: - obj = s3_client.get_object(Bucket=bucket, Key=legacy_key) - logger.info( - "[news_aggregates] read via legacy key %s — " - "will be removed after canonical-shape soak", legacy_key, - ) - return pd.read_parquet(BytesIO(obj["Body"].read()), engine="pyarrow") - except Exception: - pass - return _empty_df() diff --git a/data/snapshotter/analyst_daily.py b/data/snapshotter/analyst_daily.py index 06b106f..6d853a0 100644 --- a/data/snapshotter/analyst_daily.py +++ b/data/snapshotter/analyst_daily.py @@ -205,13 +205,6 @@ def snapshot_universe( # ── S3 key + serialization helpers ──────────────────────────────────── -def s3_key_for( - ticker: str, snapshot_date: Date, *, prefix: str = DEFAULT_S3_PREFIX, -) -> str: - """Canonical key for one (ticker, date) snapshot document.""" - return f"{prefix}/{ticker.upper()}/{snapshot_date.isoformat()}.json" - - def _serialize_snapshot(snap: AnalystSnapshot) -> dict: """Pydantic AnalystSnapshot → JSON-safe dict. @@ -236,9 +229,6 @@ def read_snapshot_document( whose YYMMDDHHMM run_id starts with the date's YYMMDD prefix. Picks the most recent intra-day run when multiple exist. - Legacy fallback: tries the old ``{prefix}/{ticker}/{date}.json`` - key shape during the transition. - Returns the raw JSON dict or None if no document exists. """ # Canonical: list prefix + find by run_id date prefix. @@ -261,15 +251,7 @@ def read_snapshot_document( return json.loads(obj["Body"].read()) except Exception as e: logger.debug( - "[analyst_snapshotter] canonical list failed for %s/%s (%s) — " - "trying legacy date-key fallback", + "[analyst_snapshotter] canonical list failed for %s/%s (%s)", ticker, snapshot_date, type(e).__name__, ) - - # Legacy fallback: {prefix}/{ticker}/{date}.json - legacy_key = f"{prefix}/{ticker.upper()}/{snapshot_date.isoformat()}.json" - try: - obj = s3_client.get_object(Bucket=bucket, Key=legacy_key) - return json.loads(obj["Body"].read()) - except Exception: - return None + return None diff --git a/tests/test_analyst_substrate.py b/tests/test_analyst_substrate.py index 781b20d..23f45b3 100644 --- a/tests/test_analyst_substrate.py +++ b/tests/test_analyst_substrate.py @@ -303,6 +303,32 @@ def test_read_missing_document_returns_none(self): s3 = _InMemoryS3() assert read_snapshot_document("X", date(2026, 1, 1), s3_client=s3) is None + def test_legacy_date_keyed_json_is_ignored(self): + """Regression guard: post-#234 canonical-key migration, a + bare ``{prefix}/{ticker}/{date}.json`` file (the pre-migration + shape) must NOT be read by the canonical lister. The list + prefix is ``{ticker}/{YYMMDD}`` so an ``YYYY-MM-DD.json`` + file does not match it. + """ + s3 = _InMemoryS3() + legacy_body = json.dumps({ + "ticker": "AAPL", + "snapshot_date": "2026-05-13", + "schema_version": SNAPSHOT_SCHEMA_VERSION, + "snapshots_by_source": {}, + }).encode("utf-8") + s3.put_object( + Bucket="alpha-engine-research", + Key="data/analyst_snapshots/AAPL/2026-05-13.json", + Body=legacy_body, + ) + # Canonical reader lists by YYMMDD prefix; legacy YYYY-MM-DD key + # does not match → None. + doc = read_snapshot_document( + "AAPL", date(2026, 5, 13), s3_client=s3, + ) + assert doc is None + def test_universe_orchestrator(self): s3 = _InMemoryS3() sources = [_StaticSource("yfinance", _make_snapshot())] diff --git a/tests/test_news_aggregates.py b/tests/test_news_aggregates.py index 87a6ae3..63e8bdd 100644 --- a/tests/test_news_aggregates.py +++ b/tests/test_news_aggregates.py @@ -362,22 +362,43 @@ def test_write_then_read_preserves_rows(self): # latest.json sidecar points at it assert ("alpha-engine-research", "data/news_aggregates/latest.json") in s3._store - df_out = read_news_aggregates_parquet( - aggregate_date=date(2026, 5, 13), s3_client=s3, - ) + df_out = read_news_aggregates_parquet(s3_client=s3) assert len(df_out) == 1 assert df_out.iloc[0]["ticker"] == "AAPL" assert df_out.iloc[0]["lm_sentiment_mean"] == pytest.approx(0.42) def test_missing_parquet_returns_empty_schema_df(self): s3 = _InMemoryS3() - df = read_news_aggregates_parquet( - aggregate_date=date(2026, 1, 1), s3_client=s3, - ) + df = read_news_aggregates_parquet(s3_client=s3) assert len(df) == 0 for col in NewsTickerDailyAggregate.__dataclass_fields__: assert col in df.columns + def test_legacy_date_keyed_parquet_is_ignored(self): + """Regression guard: post-#234 canonical-key migration, a + bare ``{prefix}/{date}.parquet`` file with NO ``latest.json`` + sidecar must NOT be read. Canonical-only contract.""" + s3 = _InMemoryS3() + articles = [_make_aggregated(fingerprint="a", tickers=("AAPL",))] + df_in = build_news_aggregates_df( + articles=articles, + nlp_output=NewsNLPOutput(sentiment_scores=[ + _lm_score("a", composite=0.42, pos=2, neg=0, total=10), + ]), + aggregate_date=date(2026, 5, 13), + ) + # Plant a legacy-shape parquet that the OLD fallback would have read. + buf = BytesIO() + df_in.to_parquet(buf, engine="pyarrow", index=False) + s3.put_object( + Bucket="alpha-engine-research", + Key="data/news_aggregates/2026-05-13.parquet", + Body=buf.getvalue(), + ) + # No latest.json sidecar present → canonical read finds nothing. + df = read_news_aggregates_parquet(s3_client=s3) + assert len(df) == 0 + def test_overwrite_existing_parquet(self): s3 = _InMemoryS3() # v1: 1 row @@ -401,9 +422,7 @@ def test_overwrite_existing_parquet(self): write_news_aggregates_parquet( df_v2, aggregate_date=date(2026, 5, 13), s3_client=s3, ) - df_read = read_news_aggregates_parquet( - aggregate_date=date(2026, 5, 13), s3_client=s3, - ) + df_read = read_news_aggregates_parquet(s3_client=s3) assert len(df_read) == 2 def test_canonical_artifact_and_latest_keys(self):