Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 63 additions & 25 deletions data/derived/analyst_revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,40 +265,57 @@ def load_snapshot_time_series(
"""Load all snapshot documents for ``ticker`` in the window
[as_of - days_back, as_of]. Returns a mapping Date → JSON document.

Missing dates are simply absent from the mapping — caller logic in
``build_revision_row`` tolerates this.

Uses S3 GetObject per-date (small JSON docs, cheap). For very long
windows (>365 days) a list-prefix + batch read would be cheaper;
not needed for 30-day windows.
Canonical shape: list ``{prefix}/{ticker}/`` once + parse each
artifact's body to extract ``snapshot_date``; index by that date.
Cheaper than per-date GETs since one LIST + small body reads
cover the entire window. Tolerates the legacy
``{ticker}/{date}.json`` shape during transition.
"""
import json as _json
out: dict[Date, dict] = {}
for offset in range(days_back + 7 + 1):
d = as_of_date - timedelta(days=offset)
key = f"{snapshot_prefix}/{ticker.upper()}/{d.isoformat()}.json"
cutoff_earliest = as_of_date - timedelta(days=days_back + 7)

# Canonical: list the per-ticker prefix + filter to the window
per_ticker_prefix = f"{snapshot_prefix}/{ticker.upper()}/"
try:
resp = s3_client.list_objects_v2(
Bucket=bucket, Prefix=per_ticker_prefix,
)
keys = [
obj["Key"] for obj in (resp.get("Contents") or [])
if obj["Key"].endswith(".json")
and not obj["Key"].endswith("/latest.json")
]
except Exception:
keys = []

for key in keys:
try:
obj = s3_client.get_object(Bucket=bucket, Key=key)
except Exception:
continue
import json as _json
try:
out[d] = _json.loads(obj["Body"].read())
doc = _json.loads(obj["Body"].read())
except Exception as e:
logger.warning(
"[analyst_revisions] failed to parse %s: %s", key, e,
)
continue
try:
snap_date = Date.fromisoformat(doc.get("snapshot_date", "")[:10])
except (ValueError, TypeError):
continue
if cutoff_earliest <= snap_date <= as_of_date:
# If multiple runs landed for the same date, keep the
# one with the most-recent fetched_at timestamp.
existing = out.get(snap_date)
if existing is None or (
doc.get("fetched_at", "") > existing.get("fetched_at", "")
):
out[snap_date] = doc
return out


# ── Parquet writer ─────────────────────────────────────────────────────


def s3_key_for_date(
as_of_date: Date, *, prefix: str = DEFAULT_S3_PREFIX,
) -> str:
return f"{prefix}/{as_of_date.isoformat()}.parquet"


def rows_to_dataframe(rows: Iterable[AnalystRevisionRow]) -> pd.DataFrame:
rows_list = list(rows)
if not rows_list:
Expand All @@ -314,21 +331,42 @@ def write_revisions_parquet(
s3_client: Any,
bucket: str = DEFAULT_S3_BUCKET,
prefix: str = DEFAULT_S3_PREFIX,
run_id: str | None = None,
) -> str:
"""Canonical eval-artifacts shape (YYMMDDHHMM + latest.json sidecar)."""
import json as _json
from alpha_engine_lib.eval_artifacts import (
eval_artifact_key, eval_latest_key, new_eval_run_id,
)

df = rows_to_dataframe(rows)
key = s3_key_for_date(as_of_date, prefix=prefix)
run_id = run_id or new_eval_run_id()
artifact_key = eval_artifact_key(prefix, run_id, basename="result.parquet")
latest_key = eval_latest_key(prefix)

buf = BytesIO()
df.to_parquet(buf, engine="pyarrow", index=False)
buf.seek(0)
s3_client.put_object(
Bucket=bucket, Key=key, Body=buf.getvalue(),
Bucket=bucket, Key=artifact_key, Body=buf.getvalue(),
ContentType="application/octet-stream",
)
s3_client.put_object(
Bucket=bucket, Key=latest_key,
Body=_json.dumps({
"run_id": run_id,
"artifact_key": artifact_key,
"as_of_date": as_of_date.isoformat(),
"schema_version": SCHEMA_VERSION,
"row_count": int(len(df)),
"written_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}).encode("utf-8"),
ContentType="application/json",
)
logger.info(
"[analyst_revisions] wrote %d rows to s3://%s/%s",
len(df), bucket, key,
len(df), bucket, artifact_key,
)
return key
return artifact_key


# ── End-to-end orchestrator ────────────────────────────────────────────
Expand Down
122 changes: 87 additions & 35 deletions data/derived/news_aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import logging
from dataclasses import dataclass
from datetime import date as Date
from datetime import datetime
from datetime import datetime, timezone
from io import BytesIO
from typing import Any, Iterable, Sequence

Expand Down Expand Up @@ -324,72 +324,124 @@ def _empty_df() -> pd.DataFrame:
DEFAULT_S3_PREFIX = "data/news_aggregates"


def s3_key_for_date(aggregate_date: Date, *, prefix: str = DEFAULT_S3_PREFIX) -> str:
"""Canonical S3 key for one date's aggregates parquet.

Format: ``{prefix}/{YYYY-MM-DD}.parquet``. Consumers read by
composing date → key with this same function.
"""
return f"{prefix}/{aggregate_date.isoformat()}.parquet"


def write_news_aggregates_parquet(
df: pd.DataFrame,
*,
aggregate_date: Date,
s3_client: Any,
bucket: str = DEFAULT_S3_BUCKET,
prefix: str = DEFAULT_S3_PREFIX,
run_id: str | None = None,
) -> str:
"""Write the aggregates DataFrame to S3 as parquet. Returns the key.
"""Write the aggregates DataFrame to S3 as parquet using the
canonical ``alpha_engine_lib.eval_artifacts`` shape: flat layout +
YYMMDDHHMM-encoded run_id + ``latest.json`` sidecar.

Returns the artifact S3 key.

Overwrites any existing object at the key (idempotent re-run).
Uses pyarrow engine — no compression argument so we get default
snappy; safe across pyarrow/pandas versions.
Re-runs on the same calendar day produce distinct artifacts
(different YYMMDDHHMM run_ids) and update ``latest.json`` to
point at the newest one. Audit-friendly: every Saturday SF
invocation preserved + latest is always discoverable.

``aggregate_date`` is stamped onto every row of the parquet
(already a column in ``NewsTickerDailyAggregate``) so consumers
can filter by the canonical date without parsing the run_id.
"""
key = s3_key_for_date(aggregate_date, prefix=prefix)
from alpha_engine_lib.eval_artifacts import (
eval_artifact_key,
eval_latest_key,
new_eval_run_id,
)

run_id = run_id or new_eval_run_id()
artifact_key = eval_artifact_key(prefix, run_id, basename="result.parquet")
latest_key = eval_latest_key(prefix)

# Write parquet body
buf = BytesIO()
df.to_parquet(buf, engine="pyarrow", index=False)
buf.seek(0)
s3_client.put_object(
Bucket=bucket,
Key=key,
Key=artifact_key,
Body=buf.getvalue(),
ContentType="application/octet-stream",
)
# Write latest.json sidecar pointing to the artifact run_id
latest_payload = {
"run_id": run_id,
"artifact_key": artifact_key,
"aggregate_date": aggregate_date.isoformat(),
"schema_version": SCHEMA_VERSION,
"row_count": int(len(df)),
"written_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
s3_client.put_object(
Bucket=bucket,
Key=latest_key,
Body=json.dumps(latest_payload).encode("utf-8"),
ContentType="application/json",
)
logger.info(
"[news_aggregates] wrote %d rows to s3://%s/%s",
len(df), bucket, key,
"[news_aggregates] wrote %d rows to s3://%s/%s (latest=%s)",
len(df), bucket, artifact_key, latest_key,
)
return key
return artifact_key


def read_news_aggregates_parquet(
aggregate_date: Date,
aggregate_date: Date | None = None,
*,
s3_client: Any,
bucket: str = DEFAULT_S3_BUCKET,
prefix: str = DEFAULT_S3_PREFIX,
) -> pd.DataFrame:
"""Consumer-side read. Returns the parquet's DataFrame, or an
empty DataFrame with the canonical schema if no parquet exists for
the date.

Schema-version-aware: callers should check the ``schema_version``
column and apply forward-compat shims if needed.
"""Consumer-side read. Resolves the canonical artifact via
``latest.json`` sidecar; falls back to the legacy
``{aggregate_date}.parquet`` key shape during the transition
window.

``aggregate_date`` is only used for the legacy-shape fallback.
Under the canonical shape, ``latest.json`` always points at the
most recent run regardless of date; the parquet itself carries
``aggregate_date`` per row so filtering happens at the DataFrame
layer.

Returns an empty DataFrame with the canonical schema when no
artifact exists.
"""
key = s3_key_for_date(aggregate_date, prefix=prefix)
from alpha_engine_lib.eval_artifacts import eval_latest_key

# Canonical path: read latest.json → resolve key
latest_key = eval_latest_key(prefix)
try:
obj = s3_client.get_object(Bucket=bucket, Key=key)
obj = s3_client.get_object(Bucket=bucket, Key=latest_key)
sidecar = json.loads(obj["Body"].read())
artifact_key = sidecar.get("artifact_key")
if artifact_key:
body = s3_client.get_object(Bucket=bucket, Key=artifact_key)
return pd.read_parquet(BytesIO(body["Body"].read()), engine="pyarrow")
except Exception as e:
logger.info(
"[news_aggregates] no parquet at s3://%s/%s (%s) — "
"returning empty DataFrame",
bucket, key, type(e).__name__,
"[news_aggregates] canonical sidecar read failed for %s (%s) — "
"trying legacy date-key fallback",
latest_key, type(e).__name__,
)
return _empty_df()
buf = BytesIO(obj["Body"].read())
return pd.read_parquet(buf, engine="pyarrow")

# Legacy fallback during transition
if aggregate_date is not None:
legacy_key = f"{prefix}/{aggregate_date.isoformat()}.parquet"
try:
obj = s3_client.get_object(Bucket=bucket, Key=legacy_key)
logger.info(
"[news_aggregates] read via legacy key %s — "
"will be removed after canonical-shape soak", legacy_key,
)
return pd.read_parquet(BytesIO(obj["Body"].read()), engine="pyarrow")
except Exception:
pass

return _empty_df()


# ── Orchestrator-friendly helper ──────────────────────────────────────
Expand Down
Loading
Loading