feat(rag): Gate A — wire Wave 1 producers into Saturday SF (single-PR scope)#233
Merged
Conversation
… scope)
Gate A of the Wave 1 institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md).
Cadence-corrected: research runs weekly so daily ingestion would
write to nobody. Everything plugs into existing Saturday SF
RAGIngestion orchestration. No new schedules, no new Lambda, no new
EventBridge — per feedback_dont_add_redundant_scheduled_infra.
New modules:
rag/pipelines/_signals_universe.py
Shared --from-signals helper. Lifts the duplicated inline loader
from ingest_8k_filings + ingest_sec_filings + ingest_earnings_finnhub
into one module. Tolerates universe-of-dicts AND universe-of-strings
shapes; uppercases tickers; picks lexicographically-last (most
recent) signals/ prefix.
rag/pipelines/run_news_pipeline.py
Orchestrator CLI for the 4-step news producer chain:
1. NewsAggregator fetch (Polygon + GDELT + Yahoo RSS, dedup +
trust weighting)
2. NewsNLPPipeline (Loughran-McDonald sentiment + Anthropic-
Haiku event extraction)
3. data.derived.news_aggregates.aggregate_and_write
→ s3://.../data/news_aggregates/{date}.parquet
4. rag.pipelines.ingest_news.ingest_articles
→ RAG corpus (one doc per (ticker, article); idempotent)
Flags: --from-signals | --tickers | --hours (default 168 = 7d)
| --aggregate-date | --skip-rag | --skip-nlp | --dry-run
rag/pipelines/run_analyst_pipeline.py
Orchestrator CLI for the analyst chain:
1. snapshot_universe(tickers, [yfinance, finnhub], ...)
→ s3://.../data/analyst_snapshots/{ticker}/{date}.json
2. compute_and_write_revisions(tickers, as_of_date, ...)
→ s3://.../data/analyst_revisions/{date}.parquet
Flags: --from-signals | --tickers | --snapshot-date
| --skip-revisions (first run before time series) | --dry-run
Modified:
rag/pipelines/ingest_form4.py
Added --from-signals flag (mutually exclusive with --tickers) via
the shared _signals_universe helper. Underlying ingest_for_tickers
already shipped in PR B.
rag/pipelines/run_weekly_ingestion.sh
Step header comment block updated 5→9 steps. Steps 0-4
renumbered to /9. New steps 5-7 between earnings (step 4) and
filing-change-detection (now step 8) + manifest emit (now step 9).
LM dict just-in-time bootstrap step before step 5 (idempotent —
skips if collectors/nlp/data/lm_master_dict.csv already exists).
Completion email collectors dict extended with news_pipeline /
form4_insider / analyst_pipeline keys.
+16 unit tests (tests/test_gate_a_orchestrators.py):
- _signals_universe: loads dict shape / flat shape backward compat /
uppercases / picks most recent prefix / no signals → empty +
error log / S3 read failure → empty
- run_news_pipeline: explicit tickers path / dry-run skips writes /
skip-rag runs aggregates only / empty tickers → exit 1 /
required args mutually exclusive
- run_analyst_pipeline: explicit tickers path / skip-revisions
runs snapshot only / dry-run passes dry_run=True down /
empty tickers → exit 1
- ingest_form4 --from-signals: loads tickers then runs
Suite: 1019 passing (1 skipped).
Acceptance (per ROADMAP Gate A):
Next Saturday SF firing produces all 4 parquet prefixes for that
Saturday's date:
s3://alpha-engine-research/data/news_aggregates/{YYYY-MM-DD}.parquet
s3://alpha-engine-research/data/insider_transactions/{YYYY-MM-DD}.parquet
s3://alpha-engine-research/data/analyst_snapshots/{TICKER}/{YYYY-MM-DD}.json
s3://alpha-engine-research/data/analyst_revisions/{YYYY-MM-DD}.parquet
AND the RAG corpus contains new news + form-4 entries.
Gate B starts the 4-Saturday soak clock automatically once the
first Saturday SF firing succeeds end-to-end.
Composes with:
- alpha-engine-config PR #164 (ROADMAP cadence correction)
- Wave 1 PRs A.2 / A.3 / B / C / D (the producer modules this
orchestrates)
- feedback_dont_add_redundant_scheduled_infra (memory)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2 tasks
cipher813
added a commit
that referenced
this pull request
May 13, 2026
….json shape (#234) Per architectural review, all Wave 1 parquet/JSON producers now use the canonical alpha_engine_lib.eval_artifacts shape: artifact: {prefix}/{YYMMDDHHMM}.json (or {YYMMDDHHMM}_result.parquet) latest: {prefix}/latest.json (sidecar with run_id pointer + metadata) Why: research runs weekly; consumer accesses "the latest run" by default. YYMMDDHHMM run_id encodes the date directly so calendar-date sub-partitioning is redundant (per memory id 2013 — YYMMDDHHMM-encoded run_ids make {calendar_date}/ partition redundant). Re-runs preserve audit trail. Composes with eval-judge YYMMDDHHMM artifacts for consistent system-wide partition shape. Migrated: - data/derived/news_aggregates.py — write_news_aggregates_parquet - data/derived/analyst_revisions.py — write_revisions_parquet + load_snapshot_time_series (lists prefix + parses body payload's snapshot_date) - data/snapshotter/analyst_daily.py — write_snapshot_document + read_snapshot_document (lists per-ticker prefix + filters by YYMMDD run_id prefix) - rag/pipelines/ingest_form4.py — write_form4_parquet + ingest_for_tickers (one consolidated parquet per run, not one per filed_date; filed_date preserved as row column) Backward-compat shim during transition: readers try canonical key/sidecar first, fall back to legacy {date}.parquet shape if missing. After 1 Saturday SF firing under canonical shape, shim can be removed (separate cleanup PR). Removed deprecated helpers: - news_aggregates.s3_key_for_date - analyst_revisions.s3_key_for_date - analyst_daily.s3_key_for - ingest_form4.s3_key_for_filed_date Tests updated to pin the canonical shape (35 + 16 + 20 + 16 = 87 adjusted; no behavior changes outside the artifact-key shape). Suite: 1018 passing (1 skipped). Pre-Saturday-SF migration: since no production data has been written under either shape yet, this PR migrates the schema cleanly with no data backfill. Composes with: - alpha-engine-config PR #164 (cadence correction) - alpha-engine-data PR #233 (Gate A Saturday SF wiring) - alpha-engine-research PR #174 (SubstrateReader — migration follows in PR 2) - alpha_engine_lib.eval_artifacts module (v0.8.0 — institutional canonical for artifact partitioning) PR 2 (alpha-engine-research SubstrateReader migration) follows. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Gate A of the Wave 1 institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md). Single-PR wiring of the new producer modules into the existing Saturday SFRAGIngestionorchestration.Cadence-corrected: research runs weekly so daily ingestion would write to nobody. Everything plugs into existing Saturday SF. No new schedules, no new Lambda, no new EventBridge — per
feedback_dont_add_redundant_scheduled_infra.What's in
New modules
_signals_universe.py— shared--from-signalshelper (lifts the duplicated inline loader from 3 existing pipelines; tolerates universe-of-dicts AND universe-of-strings; uppercases; picks most recent prefix)run_news_pipeline.py— orchestrator CLI for the 4-step news producer chain (NewsAggregator → NLP pipeline → news_aggregates parquet → RAG corpus ingest)run_analyst_pipeline.py— orchestrator CLI for the analyst chain (snapshot_universe → compute_and_write_revisions)Modified
ingest_form4.py— added--from-signalsflag via the shared helperrun_weekly_ingestion.sh— extended from 5 steps to 9: new steps 5 (news) / 6 (Form 4) / 7 (analyst) inserted between earnings (step 4) and filing-change-detection (now step 8) + manifest emit (now step 9). LM dict just-in-time bootstrap step before step 5 (idempotent skip on subsequent runs). Completion email collectors dict extended.Acceptance (per ROADMAP Gate A)
Next Saturday SF firing produces all 4 parquet prefixes for that Saturday's date:
AND the RAG corpus contains new news + Form 4 entries.
Gate B (4-Saturday soak) starts automatically once the first Saturday SF firing succeeds end-to-end.
Test plan
Composes with
feedback_dont_add_redundant_scheduled_infra(memory)🤖 Generated with Claude Code