diff --git a/infrastructure/spot_data_weekly.sh b/infrastructure/spot_data_weekly.sh index b29cf66..7e1b74e 100755 --- a/infrastructure/spot_data_weekly.sh +++ b/infrastructure/spot_data_weekly.sh @@ -66,13 +66,32 @@ SUBNET_ID="subnet-e07166ec" IAM_PROFILE="alpha-engine-executor-profile" # ── Parse flags ────────────────────────────────────────────────────────────── -RUN_MODE="full" # full | smoke-only | rag-smoke-only | rag-only | data-only +# RUN_MODE values: +# full — phase1 + rag (legacy bundled, manual/adhoc) +# smoke-only — imports + --phase 1 --dry-run, then terminate +# rag-smoke-only — RAG-via-SSM dry-run, then terminate +# rag-only — only RAG ingestion (DataPhase1 ran earlier) +# data-only — morning-enrich + phase1 + prune (legacy bundled, +# manual/adhoc backward-compat — RAG separate) +# morning-enrich-only — ONLY weekly_collector.py --morning-enrich, then +# terminate (Saturday SF MorningEnrich state) +# phase1-only — ONLY weekly_collector.py --phase 1 + prune, then +# terminate (Saturday SF DataPhase1 state) +# +# The preflight-task-split (2026-05-16, plan +# alpha-engine-docs/private/preflight-task-split-260516.md) introduced +# morning-enrich-only / phase1-only so the Saturday SF runs each +# preflight-bearing action as its own SF task: a phase1 failure no longer +# re-pays the ~28-min morning-enrich. data-only stays for manual reruns. +RUN_MODE="full" while [[ $# -gt 0 ]]; do case "$1" in --smoke-only) RUN_MODE="smoke-only"; shift ;; --rag-smoke-only) RUN_MODE="rag-smoke-only"; shift ;; --rag-only) RUN_MODE="rag-only"; shift ;; --data-only) RUN_MODE="data-only"; shift ;; + --morning-enrich-only) RUN_MODE="morning-enrich-only"; shift ;; + --phase1-only) RUN_MODE="phase1-only"; shift ;; --instance-type) INSTANCE_TYPE="$2"; shift 2 ;; --branch) BRANCH="$2"; shift 2 ;; *) echo "Unknown flag: $1"; exit 1 ;; @@ -402,17 +421,37 @@ RAG_ONLY exit 0 fi -# ── Full run: DataPhase1 + RAGIngestion sequentially ──────────────────────── -# data-only mode skips the RAG block (used when the SF state for RAG -# ingestion runs separately on its own spot — see step_function.json -# DataPhase1 vs RAGIngestion states). -if [ "$RUN_MODE" = "data-only" ]; then - HEADER_LABEL="DATA-ONLY RUN: DataPhase1 (RAG runs separately)" - SKIP_RAG_BLOCK=1 -else - HEADER_LABEL="FULL RUN: DataPhase1 + RAGIngestion" - SKIP_RAG_BLOCK=0 -fi +# ── Full / data-only / morning-enrich-only / phase1-only run ──────────────── +# Each of morning-enrich and phase1+prune is independently gated via the +# DO_MORNING_ENRICH / DO_PHASE1 shell flags derived from RUN_MODE so that +# the Saturday SF can run each preflight-bearing action as its own SF task +# (preflight-task-split 2026-05-16): +# +# full — morning-enrich + phase1 + prune + RAG +# data-only — morning-enrich + phase1 + prune (RAG separate) +# morning-enrich-only — morning-enrich ONLY (RAG separate) +# phase1-only — phase1 + prune ONLY (RAG separate) +# +# MODE_LABEL feeds the spot-side S3 log key + the heartbeat dimension so a +# morning-enrich-only run is not mislabeled "data-phase1". +case "$RUN_MODE" in + data-only) + HEADER_LABEL="DATA-ONLY RUN: MorningEnrich + DataPhase1 (RAG runs separately)" + DO_MORNING_ENRICH=1; DO_PHASE1=1; SKIP_RAG_BLOCK=1 + MODE_LABEL="data-phase1" ;; + morning-enrich-only) + HEADER_LABEL="MORNING-ENRICH-ONLY RUN (phase1 + RAG run separately)" + DO_MORNING_ENRICH=1; DO_PHASE1=0; SKIP_RAG_BLOCK=1 + MODE_LABEL="morning-enrich" ;; + phase1-only) + HEADER_LABEL="PHASE1-ONLY RUN (morning-enrich + RAG run separately)" + DO_MORNING_ENRICH=0; DO_PHASE1=1; SKIP_RAG_BLOCK=1 + MODE_LABEL="data-phase1" ;; + *) + HEADER_LABEL="FULL RUN: MorningEnrich + DataPhase1 + RAGIngestion" + DO_MORNING_ENRICH=1; DO_PHASE1=1; SKIP_RAG_BLOCK=0 + MODE_LABEL="data-phase1" ;; +esac echo "" echo "═══════════════════════════════════════════════════════════════" @@ -428,16 +467,19 @@ ${ENV_SOURCE} # SSM RunCommand truncates StandardOutputContent at 24KB and the spot # terminates before the dispatcher can scp logs back, so post-mortem # debugging requires the full log to land somewhere durable. Tee -# everything below into /tmp/data-phase1.log + upload to S3 on ANY exit +# everything below into /tmp/.log + upload to S3 on ANY exit # path (success, hard-fail, signal). Origin: 2026-05-03 SF failure where # the postflight error message was past the SSM truncation cutoff and -# the spot was already gone by the time triage started. -LOG_FILE=/tmp/data-phase1.log +# the spot was already gone by the time triage started. The S3 key uses +# the per-mode label (preflight-task-split 2026-05-16) so a +# morning-enrich-only run's log does not land under data_phase1_log/. +MODE_LABEL="${MODE_LABEL}" +LOG_FILE=/tmp/\${MODE_LABEL}.log exec > >(tee -a "\$LOG_FILE") 2>&1 upload_log() { local exit_code=\$? - local s3_key="health/data_phase1_log/\$(date +%Y-%m-%d)/\$(date +%Y%m%dT%H%M%SZ -u)-exit\${exit_code}.log" + local s3_key="health/\${MODE_LABEL//-/_}_log/\$(date +%Y-%m-%d)/\$(date +%Y%m%dT%H%M%SZ -u)-exit\${exit_code}.log" aws s3 cp "\$LOG_FILE" "s3://${S3_BUCKET}/\$s3_key" --region "\${AWS_REGION:-us-east-1}" 2>/dev/null \\ && echo "[log-upload] s3://${S3_BUCKET}/\$s3_key" \\ || echo "[log-upload] WARNING: failed to upload \$LOG_FILE to S3" @@ -459,21 +501,33 @@ trap upload_log EXIT # # Order matters: must run BEFORE Phase 1 + builders.prune_delisted_tickers # so universe-state reflects the corrected Friday data. +# +# DO_MORNING_ENRICH / DO_PHASE1 (set on the dispatcher from RUN_MODE, +# interpolated below) gate each preflight-bearing action independently so +# a phase1 failure in its own SF task never re-runs a completed +# morning-enrich (preflight-task-split 2026-05-16). +if [ "${DO_MORNING_ENRICH}" = "1" ]; then echo "──────────────────────────────────────────────────────────────" echo "Starting weekly_collector.py --morning-enrich (Friday polygon-T+1 fill) at \$(date)" echo "──────────────────────────────────────────────────────────────" if ! $REMOTE_PYTHON weekly_collector.py --morning-enrich 2>&1; then - echo "ERROR: weekly_collector.py --morning-enrich failed — Friday's polygon-authoritative daily_closes not collected. Aborting Saturday pipeline so downstream consumers don't read stale data." >&2 + echo "ERROR: weekly_collector.py --morning-enrich failed — Friday's polygon-authoritative daily_closes not collected. Aborting so downstream consumers don't read stale data." >&2 exit 1 fi echo "MorningEnrich complete at \$(date)" +else +echo "──────────────────────────────────────────────────────────────" +echo "Skipping weekly_collector.py --morning-enrich (runs in separate SF state)" +echo "──────────────────────────────────────────────────────────────" +fi +if [ "${DO_PHASE1}" = "1" ]; then echo "" echo "──────────────────────────────────────────────────────────────" echo "Starting weekly_collector.py --phase 1 at \$(date)" echo "──────────────────────────────────────────────────────────────" if ! $REMOTE_PYTHON weekly_collector.py --phase 1 2>&1; then - echo "ERROR: weekly_collector.py --phase 1 failed — aborting bundle before RAG." >&2 + echo "ERROR: weekly_collector.py --phase 1 failed." >&2 exit 1 fi echo "DataPhase1 complete at \$(date)" @@ -490,10 +544,16 @@ echo "──────────────────────── # getting bumped or symbols manually deleted. Constituents.json was # just refreshed by Phase 1 above, so this read is fresh. if ! $REMOTE_PYTHON -m builders.prune_delisted_tickers --apply 2>&1; then - echo "ERROR: prune_delisted_tickers failed — aborting bundle before RAG." >&2 + echo "ERROR: prune_delisted_tickers failed." >&2 exit 1 fi echo "UniversePrune complete at \$(date)" +else +echo "" +echo "──────────────────────────────────────────────────────────────" +echo "Skipping weekly_collector.py --phase 1 + prune (runs in separate SF state)" +echo "──────────────────────────────────────────────────────────────" +fi if [ "${SKIP_RAG_BLOCK}" = "1" ]; then echo "" @@ -543,14 +603,19 @@ echo " Weekly data bundle complete. Instance will be terminated." echo "═══════════════════════════════════════════════════════════════" # Heartbeat — one metric per sub-workload so CloudWatch alarms can -# distinguish between a missed Phase 1, a missed prune, and a missed RAG. -# In data-only mode the rag-ingestion heartbeat is emitted by the -# separate RAG-only spot run, so don't double-emit here. -if [ "$RUN_MODE" = "data-only" ]; then - HEARTBEAT_PROCS=("data-phase1" "universe-prune") -else - HEARTBEAT_PROCS=("data-phase1" "universe-prune" "rag-ingestion") -fi +# distinguish between a missed MorningEnrich, a missed Phase 1, a missed +# prune, and a missed RAG. Per the preflight-task-split (2026-05-16) each +# mode emits only the heartbeats for the actions it actually ran so a +# morning-enrich-only run isn't credited with a data-phase1 heartbeat +# (and vice versa). In data-only / split modes the rag-ingestion +# heartbeat is emitted by the separate RAG-only spot run, so don't +# double-emit here. +case "$RUN_MODE" in + morning-enrich-only) HEARTBEAT_PROCS=("morning-enrich") ;; + phase1-only) HEARTBEAT_PROCS=("data-phase1" "universe-prune") ;; + data-only) HEARTBEAT_PROCS=("morning-enrich" "data-phase1" "universe-prune") ;; + *) HEARTBEAT_PROCS=("morning-enrich" "data-phase1" "universe-prune" "rag-ingestion") ;; +esac for proc in "${HEARTBEAT_PROCS[@]}"; do aws cloudwatch put-metric-data \ --namespace "AlphaEngine" \ diff --git a/infrastructure/step_function.json b/infrastructure/step_function.json index fb8c599..2a60b3b 100644 --- a/infrastructure/step_function.json +++ b/infrastructure/step_function.json @@ -1,5 +1,5 @@ { - "Comment": "Alpha Engine Saturday Pipeline — orchestrates weekly data collection, RAG ingestion, research, eval-judge + agent-justification triple, predictor training, backtesting, and evaluation sequentially with fail-fast error handling. DataPhase1 and RAGIngestion are independent SF states (split 2026-05-06 after a RAG-side import bug failed under the DataPhase1 label, masking that the data layer was healthy). Backtester and Evaluator are independent SF states (split 2026-05-07 — plan: alpha-engine-docs/private/evaluator-split-260507.md — for failure isolation, per-stage email, and independent CW heartbeats). The eval-judge chain (EvalJudge → EvalRollingMean → RationaleClustering → ReplayConcordance → Counterfactual) runs after DataPhase2 and BEFORE PredictorTraining (reorder shipped 2026-05-07 evening) so its persisted S3 artifacts (decision_artifacts/_eval/, _clustering/, _concordance/, _counterfactual/) are available when Evaluator generates the weekly email — pre-reorder the judge chain ran AFTER Evaluator, so its results were never visible in the operator's primary review surface. Each spot-bearing state runs on its own spot — accepts +1 bootstrap cycle (~7 min) per split in exchange for failure isolation, redrive granularity, and per-step health markers. RAG still runs before Research so the knowledge base is fresh. Judge chain only reads decision_artifacts/ written by Research, so it has no dependency on Predictor or Backtester output. Backtester runs AFTER predictor training (depends on model weights); Evaluator runs after Backtester (reads same-cohort backtest artifacts). Supports partial execution via boolean skip_* flags on input (e.g. {\"skip_data_phase1\": true, \"skip_rag_ingestion\": true, \"skip_evaluator\": true}); a missing flag is treated as false, so scheduled runs are unaffected.", + "Comment": "Alpha Engine Saturday Pipeline — orchestrates weekly data collection, RAG ingestion, research, eval-judge + agent-justification triple, predictor training, backtesting, and evaluation sequentially with fail-fast error handling. MorningEnrich, DataPhase1 and RAGIngestion are independent SF states (RAG split 2026-05-06 after a RAG-side import bug failed under the DataPhase1 label, masking that the data layer was healthy; MorningEnrich split out of DataPhase1 2026-05-16 by the preflight-task-split so a phase1 failure never re-runs the completed ~28-min morning-enrich — plan alpha-engine-docs/private/preflight-task-split-260516.md). Backtester and Evaluator are independent SF states (split 2026-05-07 — plan: alpha-engine-docs/private/evaluator-split-260507.md — for failure isolation, per-stage email, and independent CW heartbeats). The eval-judge chain (EvalJudge → EvalRollingMean → RationaleClustering → ReplayConcordance → Counterfactual) runs after DataPhase2 and BEFORE PredictorTraining (reorder shipped 2026-05-07 evening) so its persisted S3 artifacts (decision_artifacts/_eval/, _clustering/, _concordance/, _counterfactual/) are available when Evaluator generates the weekly email — pre-reorder the judge chain ran AFTER Evaluator, so its results were never visible in the operator's primary review surface. Each spot-bearing state runs on its own spot — accepts +1 bootstrap cycle (~7 min) per split in exchange for failure isolation, redrive granularity, and per-step health markers. RAG still runs before Research so the knowledge base is fresh. Judge chain only reads decision_artifacts/ written by Research, so it has no dependency on Predictor or Backtester output. Backtester runs AFTER predictor training (depends on model weights); Evaluator runs after Backtester (reads same-cohort backtest artifacts). Supports partial execution via boolean skip_* flags on input (e.g. {\"skip_data_phase1\": true, \"skip_rag_ingestion\": true, \"skip_evaluator\": true}); a missing flag is treated as false, so scheduled runs are unaffected.", "StartAt": "InitializeInput", "States": { @@ -10,7 +10,119 @@ "merged.$": "States.JsonMerge(States.StringToJson('{\"sns_topic_arn\":\"arn:aws:sns:us-east-1:711398986525:alpha-engine-alerts\"}'), $$.Execution.Input, false)" }, "OutputPath": "$.merged", - "Next": "CheckSkipDataPhase1" + "Next": "CheckSkipMorningEnrich" + }, + + "CheckSkipMorningEnrich": { + "Type": "Choice", + "Comment": "Skip-gate. Input like {\"skip_morning_enrich\": true} routes past the MorningEnrich state directly to the next skip-gate (CheckSkipDataPhase1). Missing flag = run as normal. MorningEnrich was split out of DataPhase1 by the preflight-task-split (2026-05-16, plan alpha-engine-docs/private/preflight-task-split-260516.md): morning-enrich (~28 min) and phase1 are independent preflight-bearing actions, so a phase1 failure must never re-run a completed morning-enrich. Useful for adhoc reruns where Friday's polygon-corrected daily_closes are already in ArcticDB but phase1 needs a re-run.", + "Choices": [ + { + "And": [ + {"Variable": "$.skip_morning_enrich", "IsPresent": true}, + {"Variable": "$.skip_morning_enrich", "BooleanEquals": true} + ], + "Next": "CheckSkipDataPhase1" + } + ], + "Default": "MorningEnrich" + }, + + "MorningEnrich": { + "Type": "Task", + "Comment": "Friday polygon-T+1 daily_closes fill (weekly_collector.py --morning-enrich) on a dedicated spot EC2. Split out of DataPhase1 by the preflight-task-split (2026-05-16): morning-enrich (~28 min) and phase1 are independent preflight-bearing actions, so a phase1 failure must never re-run this completed ~28-min step. Mirrors the RAGIngestion split precedent — its own spot, watchdog budget, per-state health marker, and a proper UNION entry preflight (preflight.py mode 'morning_enrich': polygon + FRED secrets + reachability + S3 writeable + ArcticDB libraries present; deliberately NO ArcticDB-freshness check — morning-enrich is part of what makes it fresh). ~7 min extra bootstrap vs the prior bundled design — accepted in exchange for never re-paying the 28-min morning-enrich on a downstream redrive.", + "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", + "Parameters": { + "DocumentName": "AWS-RunShellScript", + "InstanceIds.$": "$.ec2_instance_id", + "Parameters": { + "commands": [ + "set -eo pipefail", + "sudo -u ec2-user git -C /home/ec2-user/alpha-engine-data pull --ff-only origin main", + "sudo -u ec2-user git -C /home/ec2-user/alpha-engine-config pull --ff-only origin main", + "cd /home/ec2-user/alpha-engine-data", + "export HOME=/home/ec2-user", + "set -a && source /home/ec2-user/.alpha-engine.env && set +a", + "trap 'aws s3 cp /var/log/morning-enrich.log \"s3://alpha-engine-research/_ssm_logs/morning-enrich/$(date -u +%Y-%m-%d)/$(hostname)-$(date -u +%H%M%SZ).log\" --only-show-errors || true' EXIT", + "bash infrastructure/spot_data_weekly.sh --morning-enrich-only 2>&1 | tee /var/log/morning-enrich.log" + ], + "executionTimeout": ["5400"] + }, + "TimeoutSeconds": 5400 + }, + "TimeoutSeconds": 5460, + "Retry": [ + { + "ErrorEquals": ["States.TaskFailed"], + "MaxAttempts": 1, + "IntervalSeconds": 60, + "BackoffRate": 1.0 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Next": "HandleFailure", + "ResultPath": "$.error" + } + ], + "ResultPath": "$.morning_enrich_result", + "Next": "WaitForMorningEnrich" + }, + + "WaitForMorningEnrich": { + "Type": "Task", + "Comment": "Poll SSM command until complete", + "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", + "Parameters": { + "CommandId.$": "$.morning_enrich_result.Command.CommandId", + "InstanceId.$": "$.ec2_instance_id[0]" + }, + "Retry": [ + { + "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], + "MaxAttempts": 10, + "IntervalSeconds": 30, + "BackoffRate": 1.5 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Next": "HandleFailure", + "ResultPath": "$.error" + } + ], + "ResultPath": "$.morning_enrich_poll", + "Next": "CheckMorningEnrichStatus" + }, + + "CheckMorningEnrichStatus": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.morning_enrich_poll.Status", + "StringEquals": "Success", + "Next": "CheckSkipDataPhase1" + }, + { + "Variable": "$.morning_enrich_poll.Status", + "StringEquals": "InProgress", + "Next": "MorningEnrichWait" + }, + { + "Variable": "$.morning_enrich_poll.Status", + "StringEquals": "Pending", + "Next": "MorningEnrichWait" + } + ], + "Default": "ExtractMorningEnrichError" + }, + + "MorningEnrichWait": { + "Type": "Wait", + "Seconds": 30, + "Next": "WaitForMorningEnrich" }, "CheckSkipDataPhase1": { @@ -30,7 +142,7 @@ "DataPhase1": { "Type": "Task", - "Comment": "Weekly data collection (morning-enrich + Phase 1 + universe-prune) on a dedicated spot EC2. RAG ingestion lives in the separate RAGIngestion state — split 2026-05-06 so a RAG failure no longer hides under the DataPhase1 label and so redrive granularity matches actual workloads.", + "Comment": "Weekly Phase 1 data collection + universe-prune (weekly_collector.py --phase 1 then builders.prune_delisted_tickers) on a dedicated spot EC2. MorningEnrich was split into its own preceding SF state by the preflight-task-split (2026-05-16): a failure here no longer re-runs the completed ~28-min morning-enrich. RAG ingestion lives in the separate RAGIngestion state (split 2026-05-06). Entry preflight is preflight.py mode 'phase1' (a strict superset of morning_enrich's dependency set — phase1 additionally builds the universe).", "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", "Parameters": { "DocumentName": "AWS-RunShellScript", @@ -44,7 +156,7 @@ "export HOME=/home/ec2-user", "set -a && source /home/ec2-user/.alpha-engine.env && set +a", "trap 'aws s3 cp /var/log/data-weekly.log \"s3://alpha-engine-research/_ssm_logs/data-weekly/$(date -u +%Y-%m-%d)/$(hostname)-$(date -u +%H%M%SZ).log\" --only-show-errors || true' EXIT", - "bash infrastructure/spot_data_weekly.sh --data-only 2>&1 | tee /var/log/data-weekly.log" + "bash infrastructure/spot_data_weekly.sh --phase1-only 2>&1 | tee /var/log/data-weekly.log" ], "executionTimeout": ["5400"] }, @@ -1344,6 +1456,18 @@ "End": true }, + "ExtractMorningEnrichError": { + "Type": "Pass", + "Comment": "Normalize MorningEnrich SSM non-Success poll into $.error. Mirrors ExtractDataPhase1Error — the soft-failure path from CheckMorningEnrichStatus.Default does not populate $.error, only Task Catch blocks do.", + "Parameters": { + "phase": "MorningEnrich", + "source": "CheckMorningEnrichStatus.Default", + "poll.$": "$.morning_enrich_poll" + }, + "ResultPath": "$.error", + "Next": "HandleFailure" + }, + "ExtractDataPhase1Error": { "Type": "Pass", "Comment": "Normalize DataPhase1 SSM non-Success poll into $.error so HandleFailure's States.Format template has something to serialize (the soft-failure path from CheckDataPhase1Status.Default does not populate $.error — only Task Catch blocks do).", diff --git a/preflight.py b/preflight.py index 9c9bf00..4da05b8 100644 --- a/preflight.py +++ b/preflight.py @@ -54,6 +54,17 @@ class DataPreflight(BasePreflight): - ``"daily"`` — weekday DailyData step. ArcticDB must be readable and SPY must be ≤4 days stale (covers Fri→Tue long weekends + 1 day of buffer). + - ``"morning_enrich"`` — Saturday MorningEnrich state (split out of + DataPhase1 by the preflight-task-split 2026-05-16, plan + alpha-engine-docs/private/preflight-task-split-260516.md). Its + checks are the UNION of what ``_run_morning_enrich`` actually + needs: polygon + FRED secrets + reachability, S3 read/write, + ArcticDB libraries present. NO ArcticDB-freshness check — + morning-enrich is part of what *makes* ArcticDB fresh, so + asserting freshness at its own entry would be circular. This is a + strict subset of phase1's dependency set (phase1 additionally + builds the universe), so per-mode preflights are correct and + non-overlapping in responsibility. - ``"phase1"`` — Saturday DataPhase1. External APIs (FRED, polygon) needed; no ArcticDB freshness check (phase1 is what *populates* ArcticDB). @@ -61,9 +72,11 @@ class DataPreflight(BasePreflight): EDGAR needed. """ + _MODES = ("daily", "morning_enrich", "phase1", "phase2") + def __init__(self, bucket: str, mode: str): super().__init__(bucket) - if mode not in ("daily", "phase1", "phase2"): + if mode not in self._MODES: raise ValueError(f"DataPreflight: unknown mode {mode!r}") self.mode = mode @@ -84,26 +97,37 @@ def run(self) -> None: # 'POLYGON_API_KEY']" even though MorningEnrich (same collectors) # had just fetched polygon + FRED fine via get_secret(). self.check_env_vars("AWS_REGION") - if self.mode == "phase1": + if self.mode in ("morning_enrich", "phase1"): + # morning-enrich + phase1 both hit polygon + FRED. Today + # morning-enrich's polygon overwrite is the load-bearing call; + # FRED is included because the same macro collectors are in + # the morning-enrich code path and a drifted FRED key must + # fail in <1s here, not 28min into the spot run. self._check_secrets("FRED_API_KEY", "POLYGON_API_KEY") elif self.mode == "phase2": self._check_secrets("FMP_API_KEY", "FINNHUB_API_KEY", "EDGAR_IDENTITY") self.check_s3_bucket() - if self.mode == "phase1": - # Catch credential drift / upstream outages BEFORE 30 min of - # collector work. Net ~400ms across both probes. + if self.mode in ("morning_enrich", "phase1"): + # Catch credential drift / upstream outages BEFORE the + # expensive collector work (phase1 ~30min, morning-enrich + # ~28min). Net ~400ms across both probes. self._check_polygon_reachable() self._check_fred_reachable() # Bucket policies + IAM denies that HEAD doesn't catch: # PUT a sentinel + DELETE it. ~50ms. Caught the 2026-04-12 # IAM-deny class on the spot's executor-role inline policy. self._check_s3_writeable_sentinel() - # Phase 1 BUILDS ArcticDB universe + macro on first run, but - # subsequent runs need both libraries already present. Enforce - # that they exist so a typo in path_prefix fails in 100ms not - # 50min into the run. Catches the 2026-04-14 silent-skip class. + # phase1 BUILDS ArcticDB universe + macro on first run; + # morning-enrich APPENDS the polygon-corrected Friday row to + # the universe library. Both need the libraries already + # present, so enforce existence here so a typo in path_prefix + # fails in 100ms not 28-50min into the run (catches the + # 2026-04-14 silent-skip class). Deliberately NOT a freshness + # check for morning_enrich — it is part of what makes + # ArcticDB fresh, so a freshness gate at its own entry would + # be circular. self._check_arcticdb_libraries_present(("universe", "macro")) elif self.mode == "phase2": self._check_fmp_stable_reachable() diff --git a/tests/test_preflight.py b/tests/test_preflight.py index ea3ef73..16179bb 100644 --- a/tests/test_preflight.py +++ b/tests/test_preflight.py @@ -31,8 +31,8 @@ def test_unknown_mode_raises(self): with pytest.raises(ValueError, match="unknown mode"): DataPreflight(bucket=BUCKET, mode="bogus") - def test_three_known_modes_accepted(self): - for mode in ("phase1", "phase2", "daily"): + def test_known_modes_accepted(self): + for mode in ("phase1", "phase2", "daily", "morning_enrich"): DataPreflight(bucket=BUCKET, mode=mode) # no exception @@ -290,3 +290,141 @@ def test_phase2_requires_fmp_stable(self): status_code=200, text='[{"symbol":"AAPL"}]', json=lambda: [{"symbol": "AAPL"}] ) pf.run() + + +# ── morning_enrich mode (preflight-task-split 2026-05-16) ───────────────────── + + +class TestMorningEnrichMode: + """`morning_enrich` is the dedicated entry preflight for the + MorningEnrich SF state (split out of DataPhase1). Its checks are the + UNION of what _run_morning_enrich needs: polygon + FRED secrets + + reachability + S3 writeable + ArcticDB libraries present. It MUST NOT + perform an ArcticDB-freshness check — morning-enrich is part of what + makes ArcticDB fresh, so a freshness gate at its own entry would be + circular. Previously --morning-enrich mapped to mode "daily", which + only probed ArcticDB freshness and never validated polygon/FRED + reachability — so a drifted key failed ~28min into the spot run.""" + + def test_morning_enrich_all_pass(self): + pf = _make("morning_enrich") + mock_s3 = MagicMock() + mock_s3.head_bucket.return_value = {} + mock_s3.put_object.return_value = {} + mock_s3.delete_object.return_value = {} + mock_arctic = MagicMock() + mock_arctic.list_libraries.return_value = ["universe", "macro"] + + _secrets = {"POLYGON_API_KEY": "k1", "FRED_API_KEY": "k2"} + with patch.dict( + "os.environ", {"AWS_REGION": "us-east-1"}, clear=False + ), patch( + "preflight.get_secret", + side_effect=lambda n, **kw: _secrets.get(n, kw.get("default", "")), + ), patch("boto3.client", return_value=mock_s3), patch( + "arcticdb.Arctic", return_value=mock_arctic + ), patch("requests.get") as mock_http: + mock_http.return_value = MagicMock(status_code=200, text='{"ok": true}') + pf.run() + # Probed polygon + FRED (2 reachability HTTP calls minimum). + assert mock_http.call_count >= 2 + + def test_morning_enrich_probes_polygon_and_fred(self): + """The two reachability probes must actually fire (regression for + the old 'daily' mapping that skipped them entirely).""" + pf = _make("morning_enrich") + mock_s3 = MagicMock() + mock_s3.head_bucket.return_value = {} + mock_s3.put_object.return_value = {} + mock_s3.delete_object.return_value = {} + mock_arctic = MagicMock() + mock_arctic.list_libraries.return_value = ["universe", "macro"] + + _secrets = {"POLYGON_API_KEY": "k1", "FRED_API_KEY": "k2"} + with patch.dict( + "os.environ", {"AWS_REGION": "us-east-1"}, clear=False + ), patch( + "preflight.get_secret", + side_effect=lambda n, **kw: _secrets.get(n, kw.get("default", "")), + ), patch("boto3.client", return_value=mock_s3), patch( + "arcticdb.Arctic", return_value=mock_arctic + ), patch("requests.get") as mock_http: + mock_http.return_value = MagicMock(status_code=200, text='{"ok": true}') + pf.run() + urls = [c.args[0] for c in mock_http.call_args_list] + assert any("polygon.io" in u for u in urls), urls + assert any("stlouisfed.org" in u for u in urls), urls + + def test_morning_enrich_no_arcticdb_freshness_check(self): + """morning_enrich must NOT call check_arcticdb_fresh (it is part + of what makes ArcticDB fresh — a freshness gate here is circular). + Patch the freshness primitive and assert it's never invoked.""" + pf = _make("morning_enrich") + mock_s3 = MagicMock() + mock_s3.head_bucket.return_value = {} + mock_s3.put_object.return_value = {} + mock_s3.delete_object.return_value = {} + mock_arctic = MagicMock() + mock_arctic.list_libraries.return_value = ["universe", "macro"] + + _secrets = {"POLYGON_API_KEY": "k1", "FRED_API_KEY": "k2"} + with patch.dict( + "os.environ", {"AWS_REGION": "us-east-1"}, clear=False + ), patch( + "preflight.get_secret", + side_effect=lambda n, **kw: _secrets.get(n, kw.get("default", "")), + ), patch("boto3.client", return_value=mock_s3), patch( + "arcticdb.Arctic", return_value=mock_arctic + ), patch("requests.get") as mock_http, patch.object( + DataPreflight, "check_arcticdb_fresh" + ) as mock_fresh: + mock_http.return_value = MagicMock(status_code=200, text='{"ok": true}') + pf.run() + mock_fresh.assert_not_called() + + def test_morning_enrich_missing_polygon_secret_short_circuits(self): + """Missing API-key SECRET fails fast in <1s, before HTTP / S3 / + ArcticDB are touched — same fail-fast contract as phase1.""" + pf = _make("morning_enrich") + _secrets = {"AWS_REGION": "us-east-1", "FRED_API_KEY": "k2"} + with patch.dict( + "os.environ", {"AWS_REGION": "us-east-1"}, clear=True + ), patch( + "preflight.get_secret", + side_effect=lambda n, **kw: _secrets.get(n, kw.get("default", "")), + ), patch("requests.get") as mock_http, patch( + "boto3.client" + ) as mock_boto, patch("arcticdb.Arctic") as mock_arctic: + with pytest.raises( + RuntimeError, match="secrets missing.*POLYGON_API_KEY" + ): + pf.run() + mock_http.assert_not_called() + mock_boto.assert_not_called() + mock_arctic.assert_not_called() + + def test_morning_enrich_requires_arcticdb_libraries_present(self): + """Libraries-present gate (not freshness) must still fire — a + path_prefix typo must fail in 100ms not 28min.""" + pf = _make("morning_enrich") + mock_s3 = MagicMock() + mock_s3.head_bucket.return_value = {} + mock_s3.put_object.return_value = {} + mock_s3.delete_object.return_value = {} + mock_arctic = MagicMock() + mock_arctic.list_libraries.return_value = ["macro"] # universe missing + + _secrets = {"POLYGON_API_KEY": "k1", "FRED_API_KEY": "k2"} + with patch.dict( + "os.environ", {"AWS_REGION": "us-east-1"}, clear=False + ), patch( + "preflight.get_secret", + side_effect=lambda n, **kw: _secrets.get(n, kw.get("default", "")), + ), patch("boto3.client", return_value=mock_s3), patch( + "arcticdb.Arctic", return_value=mock_arctic + ), patch("requests.get") as mock_http: + mock_http.return_value = MagicMock(status_code=200, text='{"ok": true}') + with pytest.raises( + RuntimeError, match=r"missing expected libraries.*universe" + ): + pf.run() diff --git a/tests/test_sf_morning_enrich_split_wiring.py b/tests/test_sf_morning_enrich_split_wiring.py new file mode 100644 index 0000000..23d52f3 --- /dev/null +++ b/tests/test_sf_morning_enrich_split_wiring.py @@ -0,0 +1,232 @@ +"""Pins the MorningEnrich → DataPhase1 split in the Saturday SF. + +Origin: the preflight-task-split (2026-05-16, plan +alpha-engine-docs/private/preflight-task-split-260516.md). The standing +rule — every preflight-bearing action is its own SF task; a downstream +failure must never re-run a completed upstream task — was violated by +the old `DataPhase1` state, which ran `spot_data_weekly.sh --data-only` += morning-enrich (~28 min) THEN phase1 on one spot. Every phase1 +recovery re-paid the 28-min morning-enrich because its preflight was +buried 28 minutes deep. + +This test catches regressions like: +- Someone reroutes InitializeInput back past CheckSkipMorningEnrich and + silently drops the MorningEnrich state. +- Someone wires MorningEnrich AFTER DataPhase1 (re-introduces the + re-run-the-28-min-step-on-phase1-failure bug). +- Someone reverts DataPhase1's SSM command back to `--data-only` (which + re-bundles morning-enrich into phase1). +- Someone drops the HandleFailure Catch on the new states. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SF_PATH = _REPO_ROOT / "infrastructure" / "step_function.json" + + +@pytest.fixture(scope="module") +def sf() -> dict: + return json.loads(_SF_PATH.read_text()) + + +@pytest.fixture(scope="module") +def states(sf) -> dict: + return sf["States"] + + +class TestQuartetPresence: + """The MorningEnrich quartet (+ Wait/Extract helpers) must exist, + mirroring the RAGIngestion / DataPhase1 quartets.""" + + @pytest.mark.parametrize( + "name", + [ + "CheckSkipMorningEnrich", + "MorningEnrich", + "WaitForMorningEnrich", + "CheckMorningEnrichStatus", + "MorningEnrichWait", + "ExtractMorningEnrichError", + ], + ) + def test_state_exists(self, states, name): + assert name in states, f"{name} missing from Saturday SF States" + + +class TestChainOrdering: + """InitializeInput → CheckSkipMorningEnrich → MorningEnrich → + WaitForMorningEnrich → CheckMorningEnrichStatus(success) → + CheckSkipDataPhase1 → DataPhase1 (existing downstream unchanged).""" + + def test_initialize_input_routes_to_morning_enrich_skipgate(self, states): + assert states["InitializeInput"]["Next"] == "CheckSkipMorningEnrich", ( + "InitializeInput must hand off to the MorningEnrich skip-gate " + "first — MorningEnrich precedes DataPhase1 post-split." + ) + + def test_skip_morning_enrich_default_runs_morning_enrich(self, states): + assert states["CheckSkipMorningEnrich"]["Default"] == "MorningEnrich" + + def test_skip_morning_enrich_honors_skip_flag(self, states): + """{"skip_morning_enrich": true} must route to CheckSkipDataPhase1 + (mirrors the skip_data_phase1 / skip_rag_ingestion shape).""" + choices = states["CheckSkipMorningEnrich"]["Choices"] + assert len(choices) == 1 + c = choices[0] + # And[ IsPresent, BooleanEquals true ] on $.skip_morning_enrich + variables = {cond["Variable"] for cond in c["And"]} + assert variables == {"$.skip_morning_enrich"} + assert c["Next"] == "CheckSkipDataPhase1" + + def test_morning_enrich_routes_to_wait_state(self, states): + assert states["MorningEnrich"]["Next"] == "WaitForMorningEnrich" + + def test_wait_routes_to_status_check(self, states): + assert states["WaitForMorningEnrich"]["Next"] == "CheckMorningEnrichStatus" + + def test_status_success_routes_to_data_phase1_skipgate(self, states): + success = [ + c["Next"] + for c in states["CheckMorningEnrichStatus"]["Choices"] + if c.get("StringEquals") == "Success" + ] + assert success == ["CheckSkipDataPhase1"], ( + "MorningEnrich success must hand off to CheckSkipDataPhase1 — " + "DataPhase1 runs AFTER a completed MorningEnrich." + ) + + def test_status_inprogress_and_pending_loop_via_wait(self, states): + nexts = { + c["StringEquals"]: c["Next"] + for c in states["CheckMorningEnrichStatus"]["Choices"] + } + assert nexts["InProgress"] == "MorningEnrichWait" + assert nexts["Pending"] == "MorningEnrichWait" + assert states["MorningEnrichWait"]["Next"] == "WaitForMorningEnrich" + + def test_status_default_extracts_error(self, states): + assert ( + states["CheckMorningEnrichStatus"]["Default"] + == "ExtractMorningEnrichError" + ) + + def test_morning_enrich_is_reachable_before_data_phase1(self, sf, states): + """Walk the HAPPY path from StartAt (skip-gates take Default = run + the action; status checks take the Success choice) and assert + MorningEnrich is visited strictly before DataPhase1.""" + order: list[str] = [] + seen: set[str] = set() + cur = sf["StartAt"] + while cur and cur in states and cur not in seen: + seen.add(cur) + order.append(cur) + st = states[cur] + if st.get("Type") == "Choice": + # Status checks: follow the Success edge (the real + # forward path). Skip-gates have no Success edge → fall + # back to Default (= run the action, the no-skip path). + success = [ + c["Next"] + for c in st.get("Choices", []) + if c.get("StringEquals") == "Success" + ] + cur = success[0] if success else st.get("Default") + else: + cur = st.get("Next") + if cur == "DataPhase1": + order.append(cur) + break + assert "MorningEnrich" in order, order + assert "DataPhase1" in order, order + assert order.index("MorningEnrich") < order.index("DataPhase1"), ( + "MorningEnrich must precede DataPhase1 — the whole point of " + "the split is that a phase1 failure never re-runs morning-enrich." + ) + + +class TestSsmCommandShape: + """MorningEnrich invokes --morning-enrich-only; DataPhase1 switched + from --data-only to --phase1-only.""" + + def _commands(self, states, name): + return states[name]["Parameters"]["Parameters"]["commands"] + + def test_morning_enrich_invokes_morning_enrich_only(self, states): + joined = " ".join(self._commands(states, "MorningEnrich")) + assert "spot_data_weekly.sh --morning-enrich-only" in joined + assert "--data-only" not in joined + assert "--phase1-only" not in joined + + def test_data_phase1_invokes_phase1_only(self, states): + joined = " ".join(self._commands(states, "DataPhase1")) + assert "spot_data_weekly.sh --phase1-only" in joined, ( + "DataPhase1 must run --phase1-only post-split — --data-only " + "re-bundles the 28-min morning-enrich into the phase1 task." + ) + assert "--data-only" not in joined + + def test_morning_enrich_command_starts_with_pipefail(self, states): + # Same invariant test_sf_ssm_pipefail_wiring.py pins globally; + # asserted here too so a MorningEnrich-specific regression is + # self-documenting. + cmds = self._commands(states, "MorningEnrich") + assert cmds[0].startswith("set ") and "pipefail" in cmds[0] + + def test_morning_enrich_has_s3_log_trap_before_work(self, states): + cmds = self._commands(states, "MorningEnrich") + trap_idx = next( + i + for i, c in enumerate(cmds) + if c.startswith("trap ") + and "_ssm_logs" in c + and "morning-enrich.log" in c + ) + work_idx = next( + i for i, c in enumerate(cmds) if "| tee /var/log/morning-enrich.log" in c + ) + assert trap_idx < work_idx + assert "|| true" in cmds[trap_idx] + + +class TestCatchSemantics: + """Both new Task states must Catch States.ALL → HandleFailure with + ResultPath $.error, exactly like the DataPhase1 / RAGIngestion + quartets (the SF halts on infra failure of these states).""" + + @pytest.mark.parametrize("name", ["MorningEnrich", "WaitForMorningEnrich"]) + def test_catch_routes_to_handle_failure(self, states, name): + catches = states[name]["Catch"] + assert len(catches) >= 1 + for c in catches: + assert c["ErrorEquals"] == ["States.ALL"] + assert c["Next"] == "HandleFailure" + assert c["ResultPath"] == "$.error" + + def test_extract_error_routes_to_handle_failure(self, states): + st = states["ExtractMorningEnrichError"] + assert st["Type"] == "Pass" + assert st["ResultPath"] == "$.error" + assert st["Next"] == "HandleFailure" + assert st["Parameters"]["phase"] == "MorningEnrich" + + +class TestResultPathIsolation: + """MorningEnrich must not stomp on DataPhase1's SSM result path.""" + + def test_distinct_result_paths(self, states): + assert ( + states["MorningEnrich"]["ResultPath"] + != states["DataPhase1"]["ResultPath"] + ) + assert states["MorningEnrich"]["ResultPath"] == "$.morning_enrich_result" + + def test_wait_reads_morning_enrich_command_id(self, states): + cmd_id = states["WaitForMorningEnrich"]["Parameters"]["CommandId.$"] + assert "morning_enrich_result" in cmd_id diff --git a/tests/test_spot_data_weekly_run_modes.py b/tests/test_spot_data_weekly_run_modes.py new file mode 100644 index 0000000..c0b07c0 --- /dev/null +++ b/tests/test_spot_data_weekly_run_modes.py @@ -0,0 +1,156 @@ +"""Pins the morning-enrich-only / phase1-only run-mode split in +infrastructure/spot_data_weekly.sh. + +Origin: the preflight-task-split (2026-05-16, plan +alpha-engine-docs/private/preflight-task-split-260516.md). The Saturday +SF MorningEnrich state runs `--morning-enrich-only` and the DataPhase1 +state runs `--phase1-only`; each must run ONLY its own action so a +phase1 failure never re-pays the completed ~28-min morning-enrich. +`--data-only` is preserved (runs both) for manual/adhoc backward-compat. + +These are static greps (the script only runs end-to-end on a real spot) +mirroring tests/test_spot_env_source_aws_region.py — they catch the +regression class where the modes are removed, un-gated, or re-bundled. +""" + +from __future__ import annotations + +import re +from pathlib import Path + +import pytest + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SCRIPT = _REPO_ROOT / "infrastructure" / "spot_data_weekly.sh" + + +@pytest.fixture(scope="module") +def text() -> str: + return _SCRIPT.read_text() + + +class TestFlagParsing: + """Both new flags must be parsed into their RUN_MODE values, and + --data-only must be preserved.""" + + @pytest.mark.parametrize( + "flag,run_mode", + [ + ("--morning-enrich-only", "morning-enrich-only"), + ("--phase1-only", "phase1-only"), + ("--data-only", "data-only"), + ], + ) + def test_flag_sets_run_mode(self, text, flag, run_mode): + # case branch shape: `--flag) RUN_MODE="value"; shift ;;` + pat = re.compile( + re.escape(flag) + r'\)\s*RUN_MODE="' + re.escape(run_mode) + r'"' + ) + assert pat.search(text), ( + f"{_SCRIPT.name}: flag {flag} must set RUN_MODE={run_mode!r} " + f"in the arg-parse case block." + ) + + +def _dispatch_arm(text: str, mode: str) -> str: + """Return the body of the `case "$RUN_MODE"` dispatch arm for `mode`. + + There are TWO `)` arms in the script: the flag-parse arm + (`--) RUN_MODE="..."; shift ;;`) and the RUN_MODE dispatch arm + (`) ...DO_MORNING_ENRICH=... ;;`). Anchor on a bare-word + (non-`--`) arm head at line start so we only match the dispatch arm. + """ + m = re.search( + r"^\s*" + re.escape(mode) + r"\)(.*?);;", + text, + re.DOTALL | re.MULTILINE, + ) + assert m, f"{_SCRIPT.name}: no RUN_MODE dispatch arm for {mode!r}" + return m.group(1) + + +class TestModeDispatch: + """RUN_MODE must derive independent DO_MORNING_ENRICH / DO_PHASE1 + gates so each action runs in isolation per the split.""" + + @pytest.mark.parametrize( + "mode,do_me,do_p1", + [ + ("morning-enrich-only", "1", "0"), + ("phase1-only", "0", "1"), + ("data-only", "1", "1"), + ], + ) + def test_mode_sets_independent_gates(self, text, mode, do_me, do_p1): + # The dispatch arm for the mode must set DO_MORNING_ENRICH= + # and DO_PHASE1=. + arm = _dispatch_arm(text, mode) + assert f"DO_MORNING_ENRICH={do_me}" in arm, ( + f"{mode}: must set DO_MORNING_ENRICH={do_me} (got arm: {arm!r})" + ) + assert f"DO_PHASE1={do_p1}" in arm, ( + f"{mode}: must set DO_PHASE1={do_p1} (got arm: {arm!r})" + ) + + def test_morning_enrich_block_independently_gated(self, text): + """The --morning-enrich invocation must be wrapped in a + DO_MORNING_ENRICH gate so phase1-only does NOT run it.""" + assert re.search( + r'if \[ "\$\{DO_MORNING_ENRICH\}" = "1" \]; then', + text, + ), "morning-enrich must be gated by DO_MORNING_ENRICH" + # The actual invocation lives inside the gated block. + assert "weekly_collector.py --morning-enrich 2>&1" in text + + def test_phase1_block_independently_gated(self, text): + """The --phase 1 + prune invocations must be wrapped in a + DO_PHASE1 gate so morning-enrich-only does NOT run them.""" + assert re.search( + r'if \[ "\$\{DO_PHASE1\}" = "1" \]; then', + text, + ), "phase1 + prune must be gated by DO_PHASE1" + assert "weekly_collector.py --phase 1 2>&1" in text + assert "builders.prune_delisted_tickers --apply" in text + + def test_split_modes_skip_rag_block(self, text): + """morning-enrich-only / phase1-only / data-only all run RAG + separately (SKIP_RAG_BLOCK=1) — RAG is its own SF state.""" + for mode in ("morning-enrich-only", "phase1-only", "data-only"): + assert "SKIP_RAG_BLOCK=1" in _dispatch_arm(text, mode), ( + f"{mode}: must set SKIP_RAG_BLOCK=1 (RAG is a separate SF state)" + ) + + +class TestPerModeLabel: + """The S3 log key + heartbeat dimension must reflect which action + ran — a morning-enrich-only run must NOT be labeled data-phase1.""" + + def test_mode_label_assigned_per_mode(self, text): + assert 'MODE_LABEL="morning-enrich"' in _dispatch_arm( + text, "morning-enrich-only" + ), ( + "morning-enrich-only must set MODE_LABEL=morning-enrich so its " + "S3 log key is not health/data_phase1_log/..." + ) + + def test_log_key_uses_mode_label(self, text): + # s3_key built from ${MODE_LABEL...} not a hardcoded data_phase1. + assert "MODE_LABEL" in text + m = re.search(r"s3_key=.*?MODE_LABEL", text) + assert m, ( + f"{_SCRIPT.name}: S3 log key must be derived from MODE_LABEL, " + "not a hardcoded data-phase1 path." + ) + + def test_heartbeat_per_mode(self, text): + """morning-enrich-only emits only the morning-enrich heartbeat; + phase1-only emits data-phase1 + universe-prune; neither + double-credits the other's action.""" + assert re.search( + r'morning-enrich-only\)\s*HEARTBEAT_PROCS=\("morning-enrich"\)', + text, + ) + assert re.search( + r'phase1-only\)\s*HEARTBEAT_PROCS=\("data-phase1" "universe-prune"\)', + text, + ) diff --git a/tests/test_weekly_collector_preflight_mode_mapping.py b/tests/test_weekly_collector_preflight_mode_mapping.py new file mode 100644 index 0000000..f914310 --- /dev/null +++ b/tests/test_weekly_collector_preflight_mode_mapping.py @@ -0,0 +1,76 @@ +"""Pins weekly_collector.main()'s --morning-enrich → preflight-mode mapping. + +Origin: the preflight-task-split (2026-05-16, plan +alpha-engine-docs/private/preflight-task-split-260516.md). Before the +split, `--morning-enrich` mapped to DataPreflight mode "daily" — which +only probes ArcticDB freshness and does NOT validate polygon/FRED +reachability, even though _run_morning_enrich hits polygon. A drifted +polygon key therefore failed ~28min into the spot run instead of in +<1s at the entry. The fix maps `--morning-enrich` → mode +"morning_enrich" (a dedicated UNION entry preflight). + +main() reads argv via _parse_args() with no DI, so this is an AST/source +assertion (the convention used by other static-wiring tests in this +repo) rather than a behavioral test — the behavioral coverage for the +morning_enrich mode itself lives in tests/test_preflight.py +::TestMorningEnrichMode. +""" + +from __future__ import annotations + +import ast +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_COLLECTOR = _REPO_ROOT / "weekly_collector.py" + + +def _main_source() -> str: + """Return the source of weekly_collector.main().""" + tree = ast.parse(_COLLECTOR.read_text()) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == "main": + return ast.get_source_segment(_COLLECTOR.read_text(), node) + raise AssertionError("weekly_collector.main() not found") + + +def test_morning_enrich_maps_to_morning_enrich_mode(): + src = _main_source() + assert 'getattr(args, "morning_enrich", False)' in src, ( + "main() must branch on the --morning-enrich arg before choosing " + "the preflight mode." + ) + # The morning-enrich branch must assign mode = "morning_enrich", + # NOT the old mode = "daily". + assert 'mode = "morning_enrich"' in src, ( + "main() must map --morning-enrich to DataPreflight mode " + '"morning_enrich" (the dedicated UNION entry preflight). The old ' + '"daily" mapping skipped polygon/FRED reachability — a drifted ' + "key failed 28min into the spot run. See preflight-task-split " + "2026-05-16." + ) + + +def test_morning_enrich_not_mapped_to_daily(): + """Regression guard: the --morning-enrich branch must NOT fall back + to the "daily" mode (the pre-split behavior).""" + src = _main_source() + # Isolate the morning_enrich branch body up to the `elif args.daily`. + marker = 'getattr(args, "morning_enrich", False)' + start = src.index(marker) + rest = src[start:] + elif_idx = rest.index("elif args.daily") + branch = rest[:elif_idx] + assert 'mode = "daily"' not in branch, ( + "The --morning-enrich branch must not map to mode 'daily' — that " + "is exactly the pre-split bug the task split fixed." + ) + + +def test_daily_mode_mapping_unchanged(): + """The genuine --daily weekday path still maps to 'daily' (we only + repointed --morning-enrich).""" + src = _main_source() + assert "elif args.daily:" in src + after = src[src.index("elif args.daily:"):] + assert 'mode = "daily"' in after.split("else:")[0] diff --git a/weekly_collector.py b/weekly_collector.py index 39707f1..1e221d4 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -1589,7 +1589,15 @@ def main() -> None: # the real collection work. See alpha-engine-lib/README.md. from preflight import DataPreflight if getattr(args, "morning_enrich", False): - mode = "daily" # same dep footprint: constituents + polygon + arcticdb + # Dedicated morning_enrich mode (preflight-task-split 2026-05-16): + # morning-enrich is its own Saturday SF task and needs a proper + # UNION entry preflight (polygon + FRED secrets + reachability + + # S3 writeable + ArcticDB libraries present). The previous + # "daily" mapping only probed ArcticDB freshness and did NOT + # validate polygon/FRED reachability, even though + # _run_morning_enrich hits polygon — so a drifted key failed + # 28min into the spot run instead of in <1s at the entry. + mode = "morning_enrich" elif args.daily: mode = "daily" else: