From 6b3d85fad76b205220ee9a533bb609556068d220 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Thu, 16 Apr 2026 09:46:15 -0700 Subject: [PATCH] Delete trading_calendar + health_checker (moved to alpha-engine-dashboard) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part 3/3 of ae-dashboard de-bloat. Both scripts were copied verbatim to alpha-engine-dashboard in cipher813/alpha-engine-dashboard#18, and the Step Function SSM commands were repointed in cipher813/alpha-engine-data#42 (merged 2026-04-16) to run from /home/ec2-user/alpha-engine-dashboard. This PR removes the original copies from alpha-engine-data now that nothing in the weekday Step Function or Saturday pipeline references them anymore. The data repo is now scoped purely to data-production code (collectors, builders, features, weekly_collector) — matches the producer-vs-observability seam documented in the earlier commits. Pre-merge requirements (MERGE ORDER IS IMPORTANT): 1. Friday 2026-04-17 weekday Step Function run must complete successfully using the new dashboard paths (verify CheckTradingDay logs `cd /home/ec2-user/alpha-engine-dashboard`, HealthCheck writes /var/log/health-check.log normally) 2. Update ae-dashboard crontab — there's a `0 */6 * * *` entry still running `cd /home/ec2-user/alpha-engine-data && .venv/bin/python health_checker.py --alert`. Operator must crontab -e on ae-dashboard and swap the path to /home/ec2-user/alpha-engine-dashboard before this PR merges, else the cron breaks until that edit happens Pairs with cipher813/alpha-engine-dashboard#19 (removes alpha-engine-data from the dashboard's boot-pull.sh REPOS list). Tests: full suite 49 passed (was 71 before moves — the 22 delta is 26 tests deleted with the moved files + 5 new test_module_health tests added earlier + others). Co-Authored-By: Claude Opus 4.6 (1M context) --- health_checker.py | 291 --------------------------------- tests/test_health_checker.py | 196 ---------------------- tests/test_trading_calendar.py | 66 -------- trading_calendar.py | 134 --------------- 4 files changed, 687 deletions(-) delete mode 100644 health_checker.py delete mode 100644 tests/test_health_checker.py delete mode 100644 tests/test_trading_calendar.py delete mode 100644 trading_calendar.py diff --git a/health_checker.py b/health_checker.py deleted file mode 100644 index b3293d9..0000000 --- a/health_checker.py +++ /dev/null @@ -1,291 +0,0 @@ -""" -health_checker.py — Data staleness and pipeline health checks. - -Checks freshness of all critical data stores and flags stale data. -Designed to be called by the dashboard health page or as a standalone script. - -Usage: - python health_checker.py # check all, print report - python health_checker.py --json # machine-readable output - python health_checker.py --alert # send SNS alert on failures -""" - -from __future__ import annotations - -import argparse -import json -import logging -import os -import sys -from datetime import date, datetime, timedelta, timezone - -import boto3 - -logger = logging.getLogger(__name__) - -DEFAULT_BUCKET = "alpha-engine-research" - -# Staleness thresholds (calendar days) -THRESHOLDS = { - "signals": 8, # Research runs weekly Saturday - "predictions": 2, # Predictor runs daily Mon-Fri - "features": 2, # Feature store runs daily Mon-Fri - "fundamentals": 100, # FMP quarterly, updated weekly in DataPhase1 - "price_cache_slim": 8, # Slim cache rebuilt weekly Saturday - "daily_closes": 2, # Daily Mon-Fri - "population": 8, # Updated weekly by Research -} - - -def _last_modified_age(s3, bucket: str, key: str) -> tuple[str | None, int | None]: - """Get the last modified date and age in days for an S3 object.""" - try: - resp = s3.head_object(Bucket=bucket, Key=key) - modified = resp["LastModified"] - age = (datetime.now(timezone.utc) - modified).days - return modified.strftime("%Y-%m-%d %H:%M UTC"), age - except Exception: - return None, None - - -def _find_latest_prefix(s3, bucket: str, prefix: str) -> tuple[str | None, int | None]: - """Find the most recent date-keyed object under a prefix.""" - try: - paginator = s3.get_paginator("list_objects_v2") - latest_date = None - for page in paginator.paginate(Bucket=bucket, Prefix=prefix, MaxKeys=100): - for obj in page.get("Contents", []): - key = obj["Key"] - # Extract date from key like signals/2026-04-03/signals.json - parts = key.replace(prefix, "").split("/") - for part in parts: - if len(part) == 10 and part[4] == "-" and part[7] == "-": - try: - d = date.fromisoformat(part) - if latest_date is None or d > latest_date: - latest_date = d - except ValueError: - pass - if latest_date: - age = (date.today() - latest_date).days - return str(latest_date), age - return None, None - except Exception: - return None, None - - -def check_all(bucket: str = DEFAULT_BUCKET) -> list[dict]: - """Run all health checks. Returns list of check results.""" - s3 = boto3.client("s3") - results = [] - - # 1. Signals (latest.json pointer) - modified, age = _last_modified_age(s3, bucket, "signals/latest.json") - if modified is None: - # Fallback: scan signals/ prefix - modified, age = _find_latest_prefix(s3, bucket, "signals/") - threshold = THRESHOLDS["signals"] - results.append({ - "check": "signals", - "last_updated": modified, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 2. Predictions - modified, age = _last_modified_age(s3, bucket, "predictor/predictions/latest.json") - threshold = THRESHOLDS["predictions"] - results.append({ - "check": "predictions", - "last_updated": modified, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 3. Feature store - today_str = date.today().isoformat() - modified, age = _last_modified_age(s3, bucket, f"features/{today_str}/technical.parquet") - if modified is None: - # Check yesterday - yesterday = (date.today() - timedelta(days=1)).isoformat() - modified, age = _last_modified_age(s3, bucket, f"features/{yesterday}/technical.parquet") - threshold = THRESHOLDS["features"] - results.append({ - "check": "features", - "last_updated": modified, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 4. Fundamentals - latest_date, age = _find_latest_prefix(s3, bucket, "archive/fundamentals/") - threshold = THRESHOLDS["fundamentals"] - results.append({ - "check": "fundamentals", - "last_updated": latest_date, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 5. Population - modified, age = _last_modified_age(s3, bucket, "population/latest.json") - threshold = THRESHOLDS["population"] - results.append({ - "check": "population", - "last_updated": modified, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 6. Price cache slim - latest_date, age = _find_latest_prefix(s3, bucket, "predictor/price_cache_slim/") - threshold = THRESHOLDS["price_cache_slim"] - results.append({ - "check": "price_cache_slim", - "last_updated": latest_date, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 7. Daily closes - today_str_dc = date.today().isoformat() - modified, age = _last_modified_age(s3, bucket, f"predictor/daily_closes/{today_str_dc}.parquet") - if modified is None: - yesterday_dc = (date.today() - timedelta(days=1)).isoformat() - modified, age = _last_modified_age(s3, bucket, f"predictor/daily_closes/{yesterday_dc}.parquet") - threshold = THRESHOLDS["daily_closes"] - results.append({ - "check": "daily_closes", - "last_updated": modified, - "age_days": age, - "threshold_days": threshold, - "status": "ok" if age is not None and age <= threshold else "stale" if age is not None else "missing", - }) - - # 8. Module health markers - for module in ["data_phase1", "data_phase2", "executor", "predictor"]: - modified, age = _last_modified_age(s3, bucket, f"health/{module}.json") - results.append({ - "check": f"health/{module}", - "last_updated": modified, - "age_days": age, - "threshold_days": 2, - "status": "ok" if age is not None and age <= 2 else "stale" if age is not None else "missing", - }) - - return results - - -def format_report(results: list[dict]) -> str: - """Format results as a human-readable report.""" - lines = ["Data Health Report", "=" * 50] - n_ok = sum(1 for r in results if r["status"] == "ok") - n_stale = sum(1 for r in results if r["status"] == "stale") - n_missing = sum(1 for r in results if r["status"] == "missing") - lines.append(f"OK: {n_ok} Stale: {n_stale} Missing: {n_missing}") - lines.append("") - - for r in results: - icon = {"ok": "✓", "stale": "⚠", "missing": "✗"}[r["status"]] - age_str = f"{r['age_days']}d" if r["age_days"] is not None else "N/A" - lines.append( - f" {icon} {r['check']:25s} age={age_str:5s} " - f"threshold={r['threshold_days']}d " - f"last={r['last_updated'] or 'never'}" - ) - - failures = [r for r in results if r["status"] != "ok"] - if failures: - lines.append("") - lines.append("ACTIONS NEEDED:") - for r in failures: - lines.append(f" - {r['check']}: {r['status']} ({r.get('last_updated', 'never')})") - - return "\n".join(lines) - - -def _emit_cloudwatch_metrics(results: list[dict]) -> None: - """Publish health check metrics to CloudWatch for dashboarding and alarms.""" - try: - cw = boto3.client("cloudwatch", region_name="us-east-1") - metric_data = [] - - # Per-check staleness age - for r in results: - age = r.get("age_days") - if age is not None: - metric_data.append({ - "MetricName": "DataStalenessAge", - "Dimensions": [{"Name": "Check", "Value": r["check"]}], - "Value": float(age), - "Unit": "Count", - }) - - # Aggregate: total OK vs failures - n_ok = sum(1 for r in results if r["status"] == "ok") - n_fail = sum(1 for r in results if r["status"] != "ok") - metric_data.append({ - "MetricName": "HealthChecksOK", - "Value": float(n_ok), - "Unit": "Count", - }) - metric_data.append({ - "MetricName": "HealthChecksFailed", - "Value": float(n_fail), - "Unit": "Count", - }) - - if metric_data: - # CloudWatch PutMetricData max 20 per call - for i in range(0, len(metric_data), 20): - cw.put_metric_data( - Namespace="AlphaEngine", - MetricData=metric_data[i:i+20], - ) - logger.info("Emitted %d CloudWatch metrics", len(metric_data)) - except Exception as e: - logger.warning("CloudWatch metric emission failed (non-fatal): %s", e) - - -def main(): - parser = argparse.ArgumentParser(description="Check data pipeline health") - parser.add_argument("--json", action="store_true", help="JSON output") - parser.add_argument("--bucket", default=DEFAULT_BUCKET) - parser.add_argument("--alert", action="store_true", help="Send SNS alert on failures") - args = parser.parse_args() - - logging.basicConfig(level=logging.WARNING) - results = check_all(args.bucket) - - if args.json: - print(json.dumps(results, indent=2, default=str)) - else: - print(format_report(results)) - - # Emit CloudWatch metrics - _emit_cloudwatch_metrics(results) - - failures = [r for r in results if r["status"] != "ok"] - if failures and args.alert: - try: - topic_arn = os.environ.get("SNS_TOPIC_ARN", "arn:aws:sns:us-east-1:711398986525:alpha-engine-alerts") - sns_client = boto3.client("sns", region_name="us-east-1") - sns_client.publish( - TopicArn=topic_arn, - Subject="Alpha Engine — Data Staleness Alert", - Message=format_report(results), - ) - except Exception as e: - logger.warning("SNS alert failed: %s", e) - - sys.exit(1 if failures else 0) - - -if __name__ == "__main__": - main() diff --git a/tests/test_health_checker.py b/tests/test_health_checker.py deleted file mode 100644 index 46644ae..0000000 --- a/tests/test_health_checker.py +++ /dev/null @@ -1,196 +0,0 @@ -"""Unit tests for health_checker — data staleness and pipeline health checks.""" -import pytest -from unittest.mock import patch, MagicMock -from datetime import date, datetime, timezone, timedelta - -from health_checker import ( - check_all, - format_report, - _last_modified_age, - _find_latest_prefix, - THRESHOLDS, -) - - -# ── Helpers ────────────────────────────────────────────────────────────────── - - -def _mock_head_object(ages: dict[str, int]): - """Return a mock S3 head_object that returns LastModified based on key→age mapping.""" - def head_object(Bucket, Key): - for pattern, age in ages.items(): - if pattern in Key: - return { - "LastModified": datetime.now(timezone.utc) - timedelta(days=age) - } - raise Exception("NoSuchKey") - return head_object - - -def _mock_list_objects(prefix_dates: dict[str, str]): - """Return a mock paginator for list_objects_v2 with date-keyed prefixes.""" - class MockPaginator: - def __init__(self, prefix_dates): - self._prefix_dates = prefix_dates - - def paginate(self, Bucket, Prefix, MaxKeys=100): - date_str = self._prefix_dates.get(Prefix) - if date_str: - yield {"Contents": [{"Key": f"{Prefix}{date_str}/data.parquet"}]} - else: - yield {"Contents": []} - - paginator = MockPaginator(prefix_dates) - - def get_paginator(method): - return paginator - - return get_paginator - - -# ═══════════════════════════════════════════════════════════════════════════════ -# _last_modified_age -# ═══════════════════════════════════════════════════════════════════════════════ - - -class TestLastModifiedAge: - def test_returns_age_for_existing_object(self): - s3 = MagicMock() - s3.head_object.return_value = { - "LastModified": datetime.now(timezone.utc) - timedelta(days=3) - } - modified, age = _last_modified_age(s3, "bucket", "key") - assert age == 3 - assert modified is not None - - def test_returns_none_for_missing_object(self): - s3 = MagicMock() - s3.head_object.side_effect = Exception("NoSuchKey") - modified, age = _last_modified_age(s3, "bucket", "key") - assert modified is None - assert age is None - - -# ═══════════════════════════════════════════════════════════════════════════════ -# check_all -# ═══════════════════════════════════════════════════════════════════════════════ - - -class TestCheckAll: - @patch("health_checker.boto3") - def test_all_checks_present(self, mock_boto3): - """Every THRESHOLDS key should produce a check result.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - - # Make all head_object calls return a fresh object - s3.head_object.return_value = { - "LastModified": datetime.now(timezone.utc) - timedelta(hours=1) - } - # Make list_objects work for prefix-based checks - paginator = MagicMock() - today = date.today().isoformat() - paginator.paginate.return_value = [ - {"Contents": [{"Key": f"prefix/{today}/data.parquet"}]} - ] - s3.get_paginator.return_value = paginator - - results = check_all() - check_names = {r["check"] for r in results} - - # Core data checks - assert "signals" in check_names - assert "predictions" in check_names - assert "features" in check_names - assert "fundamentals" in check_names - assert "population" in check_names - assert "price_cache_slim" in check_names - assert "daily_closes" in check_names - - # Module health markers - assert "health/data_phase1" in check_names - assert "health/executor" in check_names - - @patch("health_checker.boto3") - def test_fresh_data_returns_ok(self, mock_boto3): - """Objects modified within threshold should be 'ok'.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - - s3.head_object.return_value = { - "LastModified": datetime.now(timezone.utc) - timedelta(hours=1) - } - paginator = MagicMock() - today = date.today().isoformat() - paginator.paginate.return_value = [ - {"Contents": [{"Key": f"prefix/{today}/data.parquet"}]} - ] - s3.get_paginator.return_value = paginator - - results = check_all() - statuses = {r["status"] for r in results} - assert "ok" in statuses - - @patch("health_checker.boto3") - def test_stale_data_returns_stale(self, mock_boto3): - """Objects older than threshold should be 'stale'.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - - # Make everything look 30 days old - s3.head_object.return_value = { - "LastModified": datetime.now(timezone.utc) - timedelta(days=30) - } - old_date = (date.today() - timedelta(days=30)).isoformat() - paginator = MagicMock() - paginator.paginate.return_value = [ - {"Contents": [{"Key": f"prefix/{old_date}/data.parquet"}]} - ] - s3.get_paginator.return_value = paginator - - results = check_all() - stale_checks = [r for r in results if r["status"] == "stale"] - # predictions (2d threshold) and features (2d) should definitely be stale at 30d - stale_names = {r["check"] for r in stale_checks} - assert "predictions" in stale_names - - @patch("health_checker.boto3") - def test_missing_data_returns_missing(self, mock_boto3): - """Objects that don't exist should be 'missing'.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - - s3.head_object.side_effect = Exception("NoSuchKey") - paginator = MagicMock() - paginator.paginate.return_value = [{"Contents": []}] - s3.get_paginator.return_value = paginator - - results = check_all() - assert all(r["status"] == "missing" for r in results) - - -# ═══════════════════════════════════════════════════════════════════════════════ -# format_report -# ═══════════════════════════════════════════════════════════════════════════════ - - -class TestFormatReport: - def test_ok_report(self): - results = [{"check": "signals", "status": "ok", "age_days": 1, - "threshold_days": 8, "last_updated": "2026-04-03"}] - report = format_report(results) - assert "OK: 1" in report - assert "Stale: 0" in report - - def test_stale_report_includes_actions(self): - results = [{"check": "predictions", "status": "stale", "age_days": 5, - "threshold_days": 2, "last_updated": "2026-03-29"}] - report = format_report(results) - assert "ACTIONS NEEDED" in report - assert "predictions" in report - - def test_missing_report(self): - results = [{"check": "features", "status": "missing", "age_days": None, - "threshold_days": 2, "last_updated": None}] - report = format_report(results) - assert "Missing: 1" in report diff --git a/tests/test_trading_calendar.py b/tests/test_trading_calendar.py deleted file mode 100644 index a66ae69..0000000 --- a/tests/test_trading_calendar.py +++ /dev/null @@ -1,66 +0,0 @@ -"""Unit tests for trading_calendar — NYSE holiday and trading day checks.""" -import pytest -from datetime import date - -from trading_calendar import is_trading_day, next_trading_day, NYSE_HOLIDAYS - - -class TestIsTradingDay: - def test_weekday_non_holiday(self): - # 2026-04-01 is a Wednesday, not a holiday - assert is_trading_day(date(2026, 4, 1)) is True - - def test_saturday(self): - assert is_trading_day(date(2026, 4, 4)) is False - - def test_sunday(self): - assert is_trading_day(date(2026, 4, 5)) is False - - def test_good_friday_2026(self): - # 2026-04-03 is Good Friday - assert is_trading_day(date(2026, 4, 3)) is False - - def test_christmas_2026(self): - assert is_trading_day(date(2026, 12, 25)) is False - - def test_mlk_day_2026(self): - assert is_trading_day(date(2026, 1, 19)) is False - - def test_normal_monday(self): - # 2026-04-06 is a normal Monday - assert is_trading_day(date(2026, 4, 6)) is True - - -class TestNextTradingDay: - def test_next_after_friday(self): - # 2026-04-03 is Good Friday, so next trading day is Monday 2026-04-06 - result = next_trading_day(date(2026, 4, 3)) - assert result == date(2026, 4, 6) - - def test_next_after_wednesday(self): - # 2026-04-01 (Wed) -> next is 2026-04-02 (Thu) - result = next_trading_day(date(2026, 4, 1)) - assert result == date(2026, 4, 2) - - def test_next_after_saturday(self): - # Saturday -> Monday (if not a holiday) - result = next_trading_day(date(2026, 4, 4)) - assert result == date(2026, 4, 6) - - def test_skips_holiday_weekend_cluster(self): - # Thanksgiving 2026: Thu Nov 26. Wed->Fri (skip Thu holiday) - result = next_trading_day(date(2026, 11, 25)) - assert result == date(2026, 11, 27) - - -class TestHolidayCompleteness: - def test_all_years_have_holidays(self): - """Each year 2025-2030 should have holidays defined.""" - for year in range(2025, 2031): - year_holidays = [h for h in NYSE_HOLIDAYS if h.year == year] - assert len(year_holidays) >= 9, f"Year {year} has only {len(year_holidays)} holidays" - - def test_no_weekday_holidays_on_weekends(self): - """Observed holidays should fall on weekdays (exchanges observe on Mon/Fri).""" - for h in NYSE_HOLIDAYS: - assert h.weekday() <= 4, f"Holiday {h} falls on a weekend (day {h.weekday()})" diff --git a/trading_calendar.py b/trading_calendar.py deleted file mode 100644 index 2e9f896..0000000 --- a/trading_calendar.py +++ /dev/null @@ -1,134 +0,0 @@ -""" -trading_calendar.py — NYSE trading day check with holiday awareness. - -Lightweight implementation that doesn't require exchange_calendars or -pandas_market_calendars. Maintains a static list of NYSE holidays through 2030. - -Usage: - python trading_calendar.py # check today - python trading_calendar.py 2026-04-03 # check specific date - -Exit codes: - Always exits 0 — Step Function checks stdout markers, not exit code. - -Stdout markers: - "TRADING DAY" = NYSE is open (proceed with pipeline) - "MARKET_CLOSED" = weekend or holiday (skip pipeline) -""" - -from __future__ import annotations - -import sys -from datetime import date, timedelta - -# NYSE observed holidays through 2030. -# Source: https://www.nyse.com/markets/hours-calendars -# Updated annually — add new years as they're published. -NYSE_HOLIDAYS: set[date] = { - # 2025 - date(2025, 1, 1), # New Year's Day - date(2025, 1, 20), # MLK Day - date(2025, 2, 17), # Presidents' Day - date(2025, 4, 18), # Good Friday - date(2025, 5, 26), # Memorial Day - date(2025, 6, 19), # Juneteenth - date(2025, 7, 4), # Independence Day - date(2025, 9, 1), # Labor Day - date(2025, 11, 27), # Thanksgiving - date(2025, 12, 25), # Christmas - # 2026 - date(2026, 1, 1), # New Year's Day - date(2026, 1, 19), # MLK Day - date(2026, 2, 16), # Presidents' Day - date(2026, 4, 3), # Good Friday - date(2026, 5, 25), # Memorial Day - date(2026, 6, 19), # Juneteenth - date(2026, 7, 3), # Independence Day (observed, July 4 is Saturday) - date(2026, 9, 7), # Labor Day - date(2026, 11, 26), # Thanksgiving - date(2026, 12, 25), # Christmas - # 2027 - date(2027, 1, 1), # New Year's Day - date(2027, 1, 18), # MLK Day - date(2027, 2, 15), # Presidents' Day - date(2027, 3, 26), # Good Friday - date(2027, 5, 31), # Memorial Day - date(2027, 6, 18), # Juneteenth (observed, June 19 is Saturday) - date(2027, 7, 5), # Independence Day (observed, July 4 is Sunday) - date(2027, 9, 6), # Labor Day - date(2027, 11, 25), # Thanksgiving - date(2027, 12, 24), # Christmas (observed, Dec 25 is Saturday) - # 2028 - date(2028, 1, 17), # MLK Day - date(2028, 2, 21), # Presidents' Day - date(2028, 4, 14), # Good Friday - date(2028, 5, 29), # Memorial Day - date(2028, 6, 19), # Juneteenth - date(2028, 7, 4), # Independence Day - date(2028, 9, 4), # Labor Day - date(2028, 11, 23), # Thanksgiving - date(2028, 12, 25), # Christmas - # 2029 - date(2029, 1, 1), # New Year's Day - date(2029, 1, 15), # MLK Day - date(2029, 2, 19), # Presidents' Day - date(2029, 3, 30), # Good Friday - date(2029, 5, 28), # Memorial Day - date(2029, 6, 19), # Juneteenth - date(2029, 7, 4), # Independence Day - date(2029, 9, 3), # Labor Day - date(2029, 11, 22), # Thanksgiving - date(2029, 12, 25), # Christmas - # 2030 - date(2030, 1, 1), # New Year's Day - date(2030, 1, 21), # MLK Day - date(2030, 2, 18), # Presidents' Day - date(2030, 4, 19), # Good Friday - date(2030, 5, 27), # Memorial Day - date(2030, 6, 19), # Juneteenth - date(2030, 7, 4), # Independence Day - date(2030, 9, 2), # Labor Day - date(2030, 11, 28), # Thanksgiving - date(2030, 12, 25), # Christmas -} - - -def is_trading_day(d: date | None = None) -> bool: - """Return True if the given date is an NYSE trading day.""" - if d is None: - d = date.today() - if d.weekday() > 4: # Saturday=5, Sunday=6 - return False - if d in NYSE_HOLIDAYS: - return False - return True - - -def next_trading_day(d: date | None = None) -> date: - """Return the next NYSE trading day after the given date.""" - if d is None: - d = date.today() - d = d + timedelta(days=1) - while not is_trading_day(d): - d = d + timedelta(days=1) - return d - - -if __name__ == "__main__": - check_date = date.today() - if len(sys.argv) > 1: - check_date = date.fromisoformat(sys.argv[1]) - - trading = is_trading_day(check_date) - day_name = check_date.strftime("%A") - - if trading: - print(f"{check_date} ({day_name}): TRADING DAY") - sys.exit(0) - else: - reason = "weekend" if check_date.weekday() > 4 else "NYSE holiday" - nxt = next_trading_day(check_date) - print(f"{check_date} ({day_name}): MARKET_CLOSED ({reason}) — next trading day: {nxt}") - # Exit 0 so SSM reports Success — Step Function checks stdout marker - # instead of exit code to distinguish holidays from script crashes. - sys.exit(0)