diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..8f89252 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,46 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +jobs: + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, windows-latest] + python-version: ['3.10', '3.11', '3.12'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r toptek/requirements-lite.txt + - name: Lint + run: | + ruff check . + black --check . + bandit -r toptek -q + - name: Type check + run: | + mypy --config-file mypy.ini toptek tests + - name: Tests + env: + PERF_CHECK: 0 + run: | + pytest --maxfail=1 --disable-warnings -q + - name: Upload reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: reports-${{ matrix.os }}-${{ matrix.python-version }} + path: | + reports + models + data/bank + if-no-files-found: ignore diff --git a/README.md b/README.md index 331ba38..6c14f8b 100644 --- a/README.md +++ b/README.md @@ -82,3 +82,33 @@ GUI. Install [`PyYAML`](https://pyyaml.org/) if the command reports a missing dependency. The same report is serialised back into the `configs["trade"]` dictionary whenever the guard is refreshed, allowing downstream automation to respond when the status shifts between `OK` and `DEFENSIVE_MODE`. + +## Monitoring surface + +Operational dashboards can now import helpers from `toptek.monitor` to surface +data quality and feed health at a glance: + +- `toptek.monitor.compute_drift_report` evaluates PSI/KS drift across a + DataFrame slice, returning feature-level and aggregate severities that the UI + can render as badges or alerts. +- `toptek.monitor.build_latency_badge` converts a timestamp for the latest bar + into deterministic status copy (`Live`, `Lagging`, `Stalled`) based on latency + thresholds. + +Both utilities return frozen dataclasses to keep the API predictable for +widgets, scripts, or automated monitors. + +## Data bank, nightly prep, and live surfaces + +- `python -m toptek.databank.bank ingest --symbol ES --timeframe 5m --days 365` + writes deterministic synthetic OHLCV bars under `data/bank/ES/5m` and maintains + a catalog for downstream loaders. +- `python -m toptek.pipelines.prep_nightly --date YYYY-MM-DD` executes the full + nightly prep flow (ingest → features → train → calibrate → drift check) and + emits a daily brief JSON, a threshold curve plot, and a versioned model card. +- `toptek.confidence.score_probabilities` powers the new confidence ring widget + displayed in the Train, Backtest, and Trade tabs. +- `toptek.advisor.engine.AdvisorEngine` streams a synthetic research brief with + risk buckets, ATR%, and three actionable bullets for chat intents. +- `bench/run_bench.py` captures a deterministic baseline; exporting `PERF_CHECK=1` + when running `pytest` enables the gated perf regression test. diff --git a/bench/run_bench.py b/bench/run_bench.py new file mode 100644 index 0000000..a0cea00 --- /dev/null +++ b/bench/run_bench.py @@ -0,0 +1,38 @@ +"""Synthetic performance harness producing deterministic baselines.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import numpy as np + + +def run_scenario(config: dict[str, Any]) -> dict[str, float]: + seed = int(config.get("seed", 42)) + horizon = int(config.get("horizon", 256)) + rng = np.random.default_rng(seed) + pnl = rng.normal(config.get("drift", 0.05), config.get("vol", 0.2), size=horizon) + sharpe = float(np.mean(pnl) / (np.std(pnl) + 1e-9)) + hit_rate = float((pnl > 0).mean()) + return {"sharpe": sharpe, "hit_rate": hit_rate, "horizon": horizon} + + +def main() -> None: + scenario_path = Path(__file__).with_name("scenario_small.yaml") + if not scenario_path.exists(): + scenario_path = Path("bench/scenario_small.yaml") + if not scenario_path.exists(): + raise FileNotFoundError("scenario_small.yaml not found") + config = json.loads(scenario_path.read_text(encoding="utf-8")) + metrics = run_scenario(config) + out_dir = Path("reports/baselines") + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "latest.json").write_text( + json.dumps(metrics, indent=2), encoding="utf-8" + ) + + +if __name__ == "__main__": + main() diff --git a/bench/scenario_small.yaml b/bench/scenario_small.yaml new file mode 100644 index 0000000..4d9e025 --- /dev/null +++ b/bench/scenario_small.yaml @@ -0,0 +1 @@ +{"seed": 7, "horizon": 512, "drift": 0.03, "vol": 0.15} diff --git a/tests/test_advisor.py b/tests/test_advisor.py new file mode 100644 index 0000000..073f2ae --- /dev/null +++ b/tests/test_advisor.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +import pytest + +pytest.importorskip("numpy") +pytest.importorskip("pandas") + +from toptek.advisor import AdvisorEngine + + +def test_advisor_engine() -> None: + engine = AdvisorEngine() + response = engine.advise("AAPL") + assert response.symbol == "AAPL" + assert len(response.bullets) == 3 + assert response.recommendation diff --git a/tests/test_confidence.py b/tests/test_confidence.py new file mode 100644 index 0000000..6488d1e --- /dev/null +++ b/tests/test_confidence.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import pytest + +np = pytest.importorskip("numpy") +pytest.importorskip("scipy") + +from toptek.confidence import score_probabilities + + +def test_score_probabilities_fast() -> None: + probs = np.linspace(0.55, 0.75, 20) + result = score_probabilities(probs, method="fast") + assert 0.6 < result.probability < 0.7 + assert result.confidence > 0 + assert result.ci_high <= 1.0 + assert result.expected_value > 0 + + +def test_score_probabilities_beta() -> None: + probs = [0.6] * 15 + result = score_probabilities(probs, method="beta") + assert result.ci_low < result.probability < result.ci_high diff --git a/tests/test_confidence_widget.py b/tests/test_confidence_widget.py new file mode 100644 index 0000000..e1d5ab3 --- /dev/null +++ b/tests/test_confidence_widget.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import os +import tkinter as tk + +import pytest + +from toptek.ui.widgets import ConfidenceRing + + +@pytest.mark.skipif(tk.TkVersion < 8.0, reason="Tk not available") +@pytest.mark.skipif(not os.environ.get("DISPLAY"), reason="no X display available") +def test_confidence_ring_updates() -> None: + root = tk.Tk() + root.withdraw() + widget = ConfidenceRing(root) + widget.update_from_payload({"p": 0.65, "coverage": 0.4, "ev": 0.12}) + assert widget.winfo_exists() + root.destroy() diff --git a/tests/test_databank.py b/tests/test_databank.py new file mode 100644 index 0000000..25f0bc5 --- /dev/null +++ b/tests/test_databank.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +pytest.importorskip("pandas") + +from toptek.databank import Bank, SyntheticBars + + +def test_ingest_and_read(tmp_path: Path) -> None: + bank = Bank(tmp_path / "bank", provider=SyntheticBars(seed=1)) + target_end = datetime(2024, 1, 31, tzinfo=timezone.utc) + bank.ingest("ES", "5m", days=2, end=target_end) + df = bank.read("ES", "5m") + assert not df.empty + assert {"open", "high", "low", "close", "volume"}.issubset(df.columns) + catalog = bank.catalog("ES", "5m") + assert catalog["symbol"] == "ES" + assert catalog["timeframe"] == "5m" + partitions = catalog["partitions"] + assert isinstance(partitions, list) + assert len(list((tmp_path / "bank" / "ES" / "5m").glob("*.parquet"))) == len( + partitions + ) diff --git a/tests/test_drift_flags.py b/tests/test_drift_flags.py new file mode 100644 index 0000000..11fb6db --- /dev/null +++ b/tests/test_drift_flags.py @@ -0,0 +1,63 @@ +"""Deterministic checks for the drift severity report.""" + +from __future__ import annotations + +import pytest + +from toptek.monitor import Severity, compute_drift_report + + +def _build_column(zero_count: int, one_count: int) -> dict[str, list[int]]: + return {"feature": [0] * zero_count + [1] * one_count} + + +def test_compute_drift_report_stable_flag(): + reference = _build_column(50, 50) + current = _build_column(50, 50) + + report = compute_drift_report(reference, current, bins=2) + + assert report.overall == Severity.STABLE + feature_report = report.features["feature"] + assert feature_report.metric.psi == pytest.approx(0.0) + assert feature_report.metric.ks == pytest.approx(0.0) + assert feature_report.severity == Severity.STABLE + assert feature_report.message == "No material drift detected." + + +def test_compute_drift_report_watch_flag(): + reference = _build_column(50, 50) + current = _build_column(68, 32) + + report = compute_drift_report(reference, current, bins=2) + + feature_report = report.features["feature"] + assert feature_report.psi_severity == Severity.WATCH + assert feature_report.ks_severity == Severity.WATCH + assert feature_report.severity == Severity.WATCH + assert report.overall == Severity.WATCH + + +def test_compute_drift_report_alert_flag(): + reference = _build_column(50, 50) + current = _build_column(80, 20) + + report = compute_drift_report(reference, current, bins=2) + + feature_report = report.features["feature"] + assert feature_report.severity == Severity.ALERT + assert feature_report.metric.psi > 0.25 + assert feature_report.metric.ks > 0.2 + assert "alert" in report.summary + + +def test_compute_drift_report_unknown_when_empty(): + reference = _build_column(50, 50) + current = {"feature": []} + + report = compute_drift_report(reference, current) + + feature_report = report.features["feature"] + assert feature_report.severity == Severity.UNKNOWN + assert report.overall == Severity.UNKNOWN + assert report.summary == "One or more features lacked data for drift assessment." diff --git a/tests/test_latency_badge.py b/tests/test_latency_badge.py new file mode 100644 index 0000000..0058972 --- /dev/null +++ b/tests/test_latency_badge.py @@ -0,0 +1,73 @@ +"""Latency badge tests covering severity transitions.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest + +from toptek.monitor import LatencyBadge, Severity, build_latency_badge + + +def _now() -> datetime: + return datetime(2024, 1, 1, tzinfo=timezone.utc) + + +def test_latency_badge_live_severity(): + badge = build_latency_badge( + _now() - timedelta(seconds=10), + now=_now(), + warning_threshold=30, + alert_threshold=90, + ) + + assert isinstance(badge, LatencyBadge) + assert badge.severity == Severity.STABLE + assert badge.label == "Live" + assert "healthy" in badge.message + + +def test_latency_badge_watch_severity(): + badge = build_latency_badge( + _now() - timedelta(seconds=45), + now=_now(), + warning_threshold=30, + alert_threshold=90, + ) + + assert badge.severity == Severity.WATCH + assert badge.label == "Lagging" + assert badge.latency_seconds == pytest.approx(45.0) + + +def test_latency_badge_alert_severity(): + badge = build_latency_badge( + _now() - timedelta(seconds=120), + now=_now(), + warning_threshold=30, + alert_threshold=90, + ) + + assert badge.severity == Severity.ALERT + assert badge.label == "Stalled" + assert "stalled" in badge.message.lower() + + +def test_latency_badge_unknown_without_timestamp(): + badge = build_latency_badge(None, now=_now()) + + assert badge.severity == Severity.UNKNOWN + assert badge.label == "No signal" + assert badge.message == "No bars received yet." + + +def test_latency_badge_validates_threshold_order(): + with pytest.raises(ValueError): + build_latency_badge( + _now(), now=_now(), warning_threshold=90, alert_threshold=30 + ) + + with pytest.raises(ValueError): + build_latency_badge( + _now(), now=_now(), warning_threshold=-1, alert_threshold=30 + ) diff --git a/tests/test_perf.py b/tests/test_perf.py new file mode 100644 index 0000000..a1c2cf1 --- /dev/null +++ b/tests/test_perf.py @@ -0,0 +1,23 @@ +"""Performance harness smoke test (gated by PERF_CHECK).""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +pytest.importorskip("numpy") + +from bench.run_bench import main as run_bench + + +@pytest.mark.skipif(os.getenv("PERF_CHECK") != "1", reason="PERF_CHECK env not set") +def test_run_bench(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.chdir(tmp_path) + run_bench() + output = Path("reports/baselines/latest.json") + assert output.exists() + data = json.loads(output.read_text(encoding="utf-8")) + assert data["horizon"] == 32 diff --git a/tests/test_prep_nightly.py b/tests/test_prep_nightly.py new file mode 100644 index 0000000..91b3db9 --- /dev/null +++ b/tests/test_prep_nightly.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from datetime import date +from pathlib import Path + +import pytest + +pytest.importorskip("numpy") +pytest.importorskip("pandas") +pytest.importorskip("joblib") +pytest.importorskip("sklearn") +pytest.importorskip("matplotlib") + +from toptek.pipelines.prep_nightly import PipelineResult, run_pipeline + + +def test_run_pipeline(tmp_path: Path, monkeypatch) -> None: + reports = tmp_path / "reports" + models = tmp_path / "models" + bank_root = tmp_path / "bank" + result: PipelineResult = run_pipeline( + target_date=date(2024, 2, 1), + bank_root=bank_root, + reports_root=reports, + models_root=models, + days=30, + ) + brief = result.daily_brief + assert brief.exists() + data = brief.read_text(encoding="utf-8") + assert "tau" in data and "drift" in data + assert result.threshold_curve.exists() + version_dir = result.model_dir + assert (version_dir / "model.pkl").exists() + assert (version_dir / "calibrator.pkl").exists() + assert (version_dir / "model_card.json").exists() diff --git a/tests/test_ranker.py b/tests/test_ranker.py new file mode 100644 index 0000000..a34a740 --- /dev/null +++ b/tests/test_ranker.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import pytest + +np = pytest.importorskip("numpy") +pd = pytest.importorskip("pandas") + +from toptek.rank import RankRequest, rank_strategies + + +def test_rank_strategies_filters_constraints() -> None: + index = pd.RangeIndex(90) + signals = pd.DataFrame( + { + "strategy_a": np.linspace(-0.1, 0.2, len(index)), + "strategy_b": np.linspace(0.05, 0.15, len(index)), + }, + index=index, + ) + result = rank_strategies( + RankRequest( + signals=signals, min_coverage=0.2, min_ev=0.01, max_drawdown=0.5, folds=3 + ) + ) + assert result.scores + names = [score.name for score in result.scores] + assert "strategy_b" in names diff --git a/toptek/README.md b/toptek/README.md index 832b369..fcfef9f 100644 --- a/toptek/README.md +++ b/toptek/README.md @@ -54,6 +54,21 @@ toptek/ risk.py live.py utils.py + confidence/ + score.py + databank/ + bank.py + providers.py + monitor/ + drift.py + latency.py + pipelines/ + prep_nightly.py + advisor/ + engine.py + providers.py + rank/ + ranker.py gui/ app.py widgets.py @@ -70,6 +85,18 @@ Configuration defaults live under the `config/` folder and are merged with value - On start-up `python -m toptek.main` validates that NumPy/SciPy/scikit-learn match the vetted wheels and raises a friendly guidance error if the environment drifts. Reinstall with `pip install -r requirements-lite.txt` to resolve mismatches. +## Monitoring helpers + +Use the `toptek.monitor` package to keep an eye on data quality and feed +freshness: + +- `compute_drift_report` compares PSI/KS statistics between reference and + current windows, returning severity tiers per feature and overall so the GUI + can escalate drift badges deterministically. +- `build_latency_badge` maps the latest bar timestamp to friendly status copy + (`Live`, `Lagging`, `Stalled`) based on configurable thresholds, making it + trivial to render latency pills in the dashboard header. + ## Development notes - Source code is fully typed and documented with docstrings. diff --git a/toptek/advisor/__init__.py b/toptek/advisor/__init__.py new file mode 100644 index 0000000..20fae90 --- /dev/null +++ b/toptek/advisor/__init__.py @@ -0,0 +1,16 @@ +"""Live advisor surface.""" + +from .engine import AdvisorEngine, AdvisorResponse +from .providers import ( + AdvisorProvider, + SyntheticAdvisorProvider, + YFinanceAdvisorProvider, +) + +__all__ = [ + "AdvisorEngine", + "AdvisorResponse", + "AdvisorProvider", + "SyntheticAdvisorProvider", + "YFinanceAdvisorProvider", +] diff --git a/toptek/advisor/engine.py b/toptek/advisor/engine.py new file mode 100644 index 0000000..485f68a --- /dev/null +++ b/toptek/advisor/engine.py @@ -0,0 +1,63 @@ +"""Advisor engine generating contextual guidance.""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + +from .providers import AdvisorProvider, SyntheticAdvisorProvider + + +@dataclass(frozen=True) +class AdvisorResponse: + symbol: str + risk_bucket: str + atr_percent: float + bullets: tuple[str, str, str] + recommendation: str + + +class AdvisorEngine: + def __init__(self, provider: AdvisorProvider | None = None) -> None: + self._provider = provider or SyntheticAdvisorProvider() + + def advise(self, symbol: str) -> AdvisorResponse: + quotes = self._provider.quotes(symbol) + if quotes.empty: + raise RuntimeError("No advisor quotes available") + closes = quotes["close"].astype(float) + atr = float(np.abs(np.diff(closes)).mean()) if len(closes) > 1 else 0.0 + atr_percent = float(atr / closes.iloc[-1]) if closes.iloc[-1] else 0.0 + risk_bucket = self._risk_bucket(atr_percent) + bullets = tuple(self._provider.headlines(symbol)) + if len(bullets) < 3: + bullets = bullets + ("Liquidity normalising",) * (3 - len(bullets)) + bullet_triplet = (bullets[0], bullets[1], bullets[2]) + recommendation = self._recommendation(risk_bucket, atr_percent) + return AdvisorResponse( + symbol=symbol.upper(), + risk_bucket=risk_bucket, + atr_percent=atr_percent, + bullets=bullet_triplet, + recommendation=recommendation, + ) + + @staticmethod + def _risk_bucket(atr_percent: float) -> str: + if atr_percent < 0.01: + return "Low" + if atr_percent < 0.02: + return "Moderate" + return "High" + + @staticmethod + def _recommendation(bucket: str, atr_percent: float) -> str: + if bucket == "Low": + return "Consider scaling into the trend with tight risk." + if bucket == "Moderate": + return "Deploy balanced positioning; watch catalysts." + return "Trade defensive; volatility regime elevated." + + +__all__ = ["AdvisorEngine", "AdvisorResponse"] diff --git a/toptek/advisor/providers.py b/toptek/advisor/providers.py new file mode 100644 index 0000000..303d15e --- /dev/null +++ b/toptek/advisor/providers.py @@ -0,0 +1,73 @@ +"""Advisor data providers for research mode.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Iterable, Protocol + +import numpy as np +import pandas as pd + +try: # pragma: no cover - optional dependency + import yfinance as yf # type: ignore +except ModuleNotFoundError: # pragma: no cover - optional dependency + yf = None # type: ignore + + +class AdvisorProvider(Protocol): + """Interface for fetching advisor context.""" + + def quotes(self, symbol: str) -> pd.DataFrame: + raise NotImplementedError + + def headlines(self, symbol: str) -> Iterable[str]: + raise NotImplementedError + + +@dataclass(slots=True) +class SyntheticAdvisorProvider: + seed: int | None = None + + def quotes(self, symbol: str) -> pd.DataFrame: + rng = np.random.default_rng(self.seed or abs(hash(symbol)) % (2**32)) + base_price = 100 + (abs(hash(symbol)) % 50) + idx = pd.date_range(end=datetime.now(timezone.utc), periods=20, freq="1H") + prices = base_price * np.cumprod(1 + rng.normal(0, 0.002, size=len(idx))) + return pd.DataFrame({"close": prices}, index=idx) + + def headlines(self, symbol: str) -> Iterable[str]: + topics = [ + f"{symbol} consolidates ahead of earnings", + f"Analysts eye upside skew for {symbol}", + f"{symbol} implied vol signals event risk", + f"Macro flows shift toward {symbol} sector leaders", + ] + return topics[:3] + + +@dataclass(slots=True) +class YFinanceAdvisorProvider: + def __post_init__(self) -> None: + if yf is None: # pragma: no cover - runtime guard + raise ImportError("yfinance is required for YFinanceAdvisorProvider") + + def quotes(self, symbol: str) -> pd.DataFrame: + data = yf.download(symbol, period="5d", interval="1h", progress=False) # type: ignore[attr-defined] + if data.empty: + raise RuntimeError("No advisor quotes returned") + return data[["Close"]].rename(columns={"Close": "close"}) + + def headlines(self, symbol: str) -> Iterable[str]: + return [ + f"{symbol} sentiment steady per options skew", + f"{symbol} desk highlights tactical range trades", + f"{symbol} liquidity remains supportive", + ] + + +__all__ = [ + "AdvisorProvider", + "SyntheticAdvisorProvider", + "YFinanceAdvisorProvider", +] diff --git a/toptek/confidence/__init__.py b/toptek/confidence/__init__.py new file mode 100644 index 0000000..108e95b --- /dev/null +++ b/toptek/confidence/__init__.py @@ -0,0 +1,5 @@ +"""Confidence scoring utilities for live surfaces.""" + +from .score import ConfidenceResult, score_probabilities + +__all__ = ["ConfidenceResult", "score_probabilities"] diff --git a/toptek/confidence/score.py b/toptek/confidence/score.py new file mode 100644 index 0000000..8d44e54 --- /dev/null +++ b/toptek/confidence/score.py @@ -0,0 +1,69 @@ +"""Confidence scoring helpers for probability forecasts.""" + +from __future__ import annotations + +import math +from dataclasses import dataclass +from typing import Iterable, Literal + +import numpy as np +from scipy.stats import beta + + +@dataclass(frozen=True) +class ConfidenceResult: + probability: float + confidence: float + ci_low: float + ci_high: float + coverage: float + expected_value: float + + +def _wilson_interval( + successes: int, trials: int, confidence: float = 0.95 +) -> tuple[float, float]: + if trials == 0: + return (0.0, 1.0) + z = 1.959963984540054 + phat = successes / trials + denom = 1 + z * z / trials + centre = phat + z * z / (2 * trials) + margin = z * math.sqrt((phat * (1 - phat) + z * z / (4 * trials)) / trials) + lower = (centre - margin) / denom + upper = (centre + margin) / denom + return max(0.0, lower), min(1.0, upper) + + +def score_probabilities( + probabilities: Iterable[float], + *, + method: Literal["fast", "beta"] = "fast", +) -> ConfidenceResult: + probs = np.clip(np.fromiter(probabilities, dtype=float), 0.0, 1.0) + if probs.size == 0: + raise ValueError("probabilities must not be empty") + p = float(np.mean(probs)) + coverage = float(np.mean(probs >= 0.5)) + ev = float(np.mean(2 * probs - 1)) + if method == "fast": + conf = abs(p - 0.5) * 2 + low, high = _wilson_interval(int(np.round(p * probs.size)), probs.size) + elif method == "beta": + alpha = probs.sum() + 1 + beta_param = probs.size - probs.sum() + 1 + conf = float(abs(p - 0.5) * 2) + low, high = beta.ppf([0.025, 0.975], alpha, beta_param) + else: # pragma: no cover - defensive branch + raise ValueError(f"Unsupported method: {method}") + return ConfidenceResult( + probability=p, + confidence=conf, + ci_low=float(low), + ci_high=float(high), + coverage=coverage, + expected_value=ev, + ) + + +__all__ = ["ConfidenceResult", "score_probabilities"] diff --git a/toptek/databank/__init__.py b/toptek/databank/__init__.py new file mode 100644 index 0000000..97b359c --- /dev/null +++ b/toptek/databank/__init__.py @@ -0,0 +1,6 @@ +"""Offline-first market data bank with deterministic synthetic providers.""" + +from .bank import Bank +from .providers import SyntheticBars, YFinanceBars + +__all__ = ["Bank", "SyntheticBars", "YFinanceBars"] diff --git a/toptek/databank/bank.py b/toptek/databank/bank.py new file mode 100644 index 0000000..e67c44b --- /dev/null +++ b/toptek/databank/bank.py @@ -0,0 +1,130 @@ +"""Parquet-backed market data bank with deterministic ingest.""" + +from __future__ import annotations + +import argparse +import json +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from typing import Sequence + +import pandas as pd + +from .providers import BarsProvider, SyntheticBars + + +@dataclass(slots=True) +class Bank: + """Manage historical bars stored as Parquet partitions.""" + + root: Path + provider: BarsProvider = field(default_factory=SyntheticBars) + + def __post_init__(self) -> None: + if isinstance(self.root, str): # pragma: no cover - convenience + self.root = Path(self.root) + self.root = self.root.expanduser().resolve() + self.root.mkdir(parents=True, exist_ok=True) + + def ingest( + self, + symbol: str, + timeframe: str, + *, + days: int, + end: datetime | None = None, + ) -> Path: + """Download deterministic data and persist partitions. + + Returns the path to the symbol/timeframe directory. + """ + + if days <= 0: + raise ValueError("days must be positive") + end = end or datetime.now(timezone.utc) + start = end - timedelta(days=days) + data = self.provider.fetch(symbol, timeframe, start=start, end=end) + if data.empty: + raise RuntimeError("Provider returned no data") + if data.index.tzinfo is None: + data.index = data.index.tz_localize(timezone.utc) + path = self.root / symbol.upper() / timeframe + path.mkdir(parents=True, exist_ok=True) + daily_groups = data.groupby(data.index.date) + for day, frame in daily_groups: + day_str = datetime.combine( + day, datetime.min.time(), tzinfo=timezone.utc + ).strftime("%Y%m%d") + target = path / f"{day_str}.parquet" + if target.exists(): + continue + frame.to_parquet(target) + catalog_path = path / "catalog.json" + partitions = sorted(p.name for p in path.glob("*.parquet")) + catalog = { + "symbol": symbol.upper(), + "timeframe": timeframe, + "partitions": partitions, + } + catalog_path.write_text(json.dumps(catalog, indent=2), encoding="utf-8") + return path + + def read( + self, + symbol: str, + timeframe: str, + *, + start: datetime | None = None, + end: datetime | None = None, + ) -> pd.DataFrame: + """Load OHLCV bars for the given window.""" + + path = self.root / symbol.upper() / timeframe + if not path.exists(): + raise FileNotFoundError(f"No data for {symbol} {timeframe}") + frames: list[pd.DataFrame] = [] + for parquet_file in sorted(path.glob("*.parquet")): + frame = pd.read_parquet(parquet_file) + frames.append(frame) + if not frames: + raise RuntimeError("No partitions found") + data = pd.concat(frames).sort_index() + if start: + data = data[data.index >= start] + if end: + data = data[data.index <= end] + return data + + def catalog(self, symbol: str, timeframe: str) -> dict[str, list[str]]: + """Return the stored partition catalog for the instrument.""" + + path = self.root / symbol.upper() / timeframe / "catalog.json" + if not path.exists(): + raise FileNotFoundError("Catalog missing; run ingest first") + data = json.loads(path.read_text(encoding="utf-8")) + data["partitions"] = list(data.get("partitions", [])) + return data + + +def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Toptek Data Bank CLI") + parser.add_argument("command", choices=["ingest"]) + parser.add_argument("--symbol", required=True) + parser.add_argument("--timeframe", required=True) + parser.add_argument("--days", type=int, default=365) + parser.add_argument("--root", default="data/bank") + return parser.parse_args(argv) + + +def main(argv: Sequence[str] | None = None) -> int: + args = _parse_args(argv) + bank = Bank(Path(args.root)) + if args.command == "ingest": + bank.ingest(args.symbol, args.timeframe, days=args.days) + return 0 + + +if __name__ == "__main__": # pragma: no cover - CLI entrypoint + raise SystemExit(main()) diff --git a/toptek/databank/providers.py b/toptek/databank/providers.py new file mode 100644 index 0000000..e2b6ff6 --- /dev/null +++ b/toptek/databank/providers.py @@ -0,0 +1,153 @@ +"""Market data providers for the offline-first data bank.""" + +from __future__ import annotations + +import hashlib +import math +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Protocol + +import numpy as np +import pandas as pd + +try: # pragma: no cover - optional dependency + import yfinance as yf # type: ignore +except ModuleNotFoundError: # pragma: no cover - optional dependency + yf = None # type: ignore + + +class BarsProvider(Protocol): + """Protocol defining a market data source.""" + + def fetch( + self, + symbol: str, + timeframe: str, + *, + start: datetime, + end: datetime, + ) -> pd.DataFrame: + """Retrieve OHLCV bars for *symbol* between *start* and *end* inclusive.""" + + +def _timeframe_to_timedelta(timeframe: str) -> timedelta: + unit = timeframe[-1].lower() + try: + value = int(timeframe[:-1]) + except ValueError as exc: # pragma: no cover - defensive branch + raise ValueError(f"Invalid timeframe: {timeframe}") from exc + if value <= 0: + raise ValueError(f"Timeframe must be positive: {timeframe}") + mapping = { + "s": timedelta(seconds=value), + "m": timedelta(minutes=value), + "h": timedelta(hours=value), + "d": timedelta(days=value), + } + if unit not in mapping: + raise ValueError(f"Unsupported timeframe unit: {timeframe}") + return mapping[unit] + + +@dataclass(slots=True) +class SyntheticBars: + """Deterministic geometric Brownian motion provider for offline workflows.""" + + seed: int | None = None + + def fetch( + self, + symbol: str, + timeframe: str, + *, + start: datetime, + end: datetime, + ) -> pd.DataFrame: + step = _timeframe_to_timedelta(timeframe) + if start.tzinfo is None: + start = start.replace(tzinfo=timezone.utc) + if end.tzinfo is None: + end = end.replace(tzinfo=timezone.utc) + if end < start: + raise ValueError("End must be after start") + index = pd.date_range(start=start, end=end, freq=step) + if index.empty: + index = pd.DatetimeIndex([start, end]) + base_seed = self.seed + if base_seed is None: + digest = hashlib.sha256(f"{symbol}:{timeframe}".encode("utf-8")).digest() + base_seed = int.from_bytes(digest[:8], "big") + rng = np.random.default_rng(base_seed) + drift = 0.0002 + volatility = 0.01 + prices = np.empty(len(index), dtype=np.float64) + prices[0] = 100.0 + (hash(symbol) % 100) * 0.1 + for i in range(1, len(index)): + shock = rng.normal(drift, volatility) + prices[i] = prices[i - 1] * math.exp(shock) + highs = prices * (1 + rng.uniform(0.0005, 0.002, size=len(index))) + lows = prices * (1 - rng.uniform(0.0005, 0.002, size=len(index))) + opens = np.concatenate(([prices[0]], prices[:-1])) + volumes = rng.integers(1_000, 10_000, size=len(index)) + df = pd.DataFrame( + { + "open": opens, + "high": np.maximum.reduce([opens, highs, prices]), + "low": np.minimum.reduce([opens, lows, prices]), + "close": prices, + "volume": volumes.astype(np.int64), + }, + index=index, + ) + df.index.name = "timestamp" + return df + + +@dataclass(slots=True) +class YFinanceBars: + """Yahoo Finance powered provider with graceful fallback when unavailable.""" + + auto_adjust: bool = True + + def __post_init__(self) -> None: + if yf is None: # pragma: no cover - runtime guard + raise ImportError("yfinance is not installed; SyntheticBars is the default") + + def fetch( + self, + symbol: str, + timeframe: str, + *, + start: datetime, + end: datetime, + ) -> pd.DataFrame: + interval = timeframe + data = yf.download( # type: ignore[attr-defined] + symbol, + start=start, + end=end + timedelta(days=1), + interval=interval, + progress=False, + auto_adjust=self.auto_adjust, + ) + if data.empty: + raise RuntimeError(f"No data returned from yfinance for {symbol}") + data.index = data.index.tz_localize(timezone.utc) + data = data.rename( + columns={ + "Open": "open", + "High": "high", + "Low": "low", + "Close": "close", + "Adj Close": "close", + "Volume": "volume", + } + ) + required = {"open", "high", "low", "close", "volume"} + if not required.issubset(data.columns): + raise RuntimeError("Unexpected columns from yfinance download") + return data[list(required)].sort_index() + + +__all__ = ["BarsProvider", "SyntheticBars", "YFinanceBars"] diff --git a/toptek/monitor/__init__.py b/toptek/monitor/__init__.py new file mode 100644 index 0000000..d898f96 --- /dev/null +++ b/toptek/monitor/__init__.py @@ -0,0 +1,20 @@ +"""Monitoring utilities for drift and latency checks.""" + +from .drift import ( + DriftFeatureReport, + DriftMetric, + DriftReport, + Severity, + compute_drift_report, +) +from .latency import LatencyBadge, build_latency_badge + +__all__ = [ + "DriftFeatureReport", + "DriftMetric", + "DriftReport", + "Severity", + "compute_drift_report", + "LatencyBadge", + "build_latency_badge", +] diff --git a/toptek/monitor/drift.py b/toptek/monitor/drift.py new file mode 100644 index 0000000..1ac9dae --- /dev/null +++ b/toptek/monitor/drift.py @@ -0,0 +1,388 @@ +"""Data drift diagnostics with PSI and KS severity scoring.""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import IntEnum +import math +from collections.abc import Iterable, Mapping, MutableMapping +from typing import TYPE_CHECKING, List, Optional, Sequence, TypeAlias, cast + +try: # pragma: no cover - optional pandas dependency + import pandas as pd # type: ignore +except ModuleNotFoundError: # pragma: no cover - optional pandas dependency + pd = None # type: ignore + +if TYPE_CHECKING: # pragma: no cover + from pandas import DataFrame as _PandasDataFrame +else: # pragma: no cover + _PandasDataFrame = object # type: ignore[misc, assignment] + +ColumnarLike: TypeAlias = Mapping[str, Iterable[object]] | _PandasDataFrame + + +_EPSILON = 1e-9 + + +class Severity(IntEnum): + """Severity tiers for drift and latency monitors.""" + + STABLE = 0 + WATCH = 1 + ALERT = 2 + UNKNOWN = 3 + + @property + def label(self) -> str: + labels = { + Severity.STABLE: "stable", + Severity.WATCH: "watch", + Severity.ALERT: "alert", + Severity.UNKNOWN: "unknown", + } + return labels[self] + + def describe(self) -> str: + descriptions = { + Severity.STABLE: "Distributions align with reference expectations.", + Severity.WATCH: "Shifts detected; monitor the feature closely.", + Severity.ALERT: "Large drift; retraining or investigation required.", + Severity.UNKNOWN: "Insufficient data to evaluate drift.", + } + return descriptions[self] + + +@dataclass(frozen=True) +class DriftMetric: + """Container for drift statistics.""" + + psi: float + ks: float + + +@dataclass(frozen=True) +class DriftFeatureReport: + """Feature-level drift findings.""" + + feature: str + metric: DriftMetric + psi_severity: Severity + ks_severity: Severity + severity: Severity + message: str + + +@dataclass(frozen=True) +class DriftReport: + """Aggregate drift report for a dataset.""" + + features: Mapping[str, DriftFeatureReport] + overall: Severity + summary: str + + +class _Thresholds: + """Severity thresholds for PSI and KS.""" + + PSI = (0.1, 0.25) + KS = (0.1, 0.2) + + +def compute_drift_report( + reference: ColumnarLike, + current: ColumnarLike, + *, + features: Optional[Sequence[str]] = None, + bins: int = 10, +) -> DriftReport: + """Compute PSI/KS drift metrics and severity tiers. + + Parameters + ---------- + reference: + Historical data establishing the baseline distribution. + current: + Recent data to compare against the baseline. + features: + Optional subset of columns to analyse. When omitted, the intersection of + columns present in both datasets is used. + bins: + Number of quantile bins used when estimating the PSI. + + Returns + ------- + DriftReport + Structured report including per-feature and aggregate severities. + """ + + if bins < 2: + raise ValueError("`bins` must be >= 2 to compute PSI.") + + reference_columns = _column_names(reference) + current_columns = _column_names(current) + + if features is None: + features = sorted(set(reference_columns).intersection(current_columns)) + else: + missing = [ + col + for col in features + if col not in reference_columns or col not in current_columns + ] + if missing: + raise KeyError( + f"Requested features not available in both datasets: {missing}" + ) + + if not features: + raise ValueError("No overlapping features to compute drift.") + + per_feature: MutableMapping[str, DriftFeatureReport] = {} + alerts: List[str] = [] + unknown_features: List[str] = [] + + for feature in features: + ref_values = _extract_numeric(reference, feature) + cur_values = _extract_numeric(current, feature) + + if not ref_values or not cur_values: + report = DriftFeatureReport( + feature=feature, + metric=DriftMetric(float("nan"), float("nan")), + psi_severity=Severity.UNKNOWN, + ks_severity=Severity.UNKNOWN, + severity=Severity.UNKNOWN, + message="Insufficient data points to compute drift.", + ) + per_feature[feature] = report + unknown_features.append(feature) + continue + + psi_value = _population_stability_index(ref_values, cur_values, bins=bins) + ks_value = _kolmogorov_smirnov(ref_values, cur_values) + + psi_severity = _severity_from_value(psi_value, _Thresholds.PSI) + ks_severity = _severity_from_value(ks_value, _Thresholds.KS) + severity = max(psi_severity, ks_severity, key=lambda s: s.value) + + if severity == Severity.STABLE: + message = "No material drift detected." + elif severity == Severity.WATCH: + message = "Moderate drift detected; monitor the feature." + else: + message = "Severe drift detected; investigate immediately." + + report = DriftFeatureReport( + feature=feature, + metric=DriftMetric(float(psi_value), float(ks_value)), + psi_severity=psi_severity, + ks_severity=ks_severity, + severity=severity, + message=message, + ) + + per_feature[feature] = report + + if severity in (Severity.WATCH, Severity.ALERT): + alerts.append(f"{feature}: {severity.label}") + + non_unknown = [ + feature_report.severity + for feature_report in per_feature.values() + if feature_report.severity != Severity.UNKNOWN + ] + + overall: Severity + if non_unknown: + overall = max(non_unknown, key=lambda s: s.value) + else: + overall = Severity.UNKNOWN + + if overall == Severity.STABLE: + summary = "All monitored features remain stable." + if unknown_features: + summary += f" ({', '.join(unknown_features)} missing data)" + elif overall == Severity.UNKNOWN: + summary = "One or more features lacked data for drift assessment." + else: + summary = "; ".join(alerts) + if unknown_features: + summary += f"; {', '.join(unknown_features)} missing data" + + return DriftReport(features=dict(per_feature), overall=overall, summary=summary) + + +def _column_names(data: ColumnarLike) -> List[str]: + if pd is not None and isinstance(data, pd.DataFrame): # type: ignore[arg-type] + return [str(column) for column in data.columns] + if isinstance(data, Mapping): + mapping_data = cast(Mapping[str, Iterable[object]], data) + return [str(column) for column in mapping_data.keys()] + raise TypeError("Unsupported container type for drift computation.") + + +def _extract_numeric(data: ColumnarLike, feature: str) -> List[float]: + if pd is not None and isinstance(data, pd.DataFrame): # type: ignore[arg-type] + series = pd.to_numeric(data[feature], errors="coerce").dropna() + return [float(value) for value in series.tolist()] + if isinstance(data, Mapping): + mapping_data = cast(Mapping[str, Iterable[object]], data) + if feature not in mapping_data: + raise KeyError(f"Column '{feature}' not present in dataset.") + column = mapping_data[feature] + else: + raise TypeError("Unsupported container type for drift computation.") + + return _coerce_to_list(column) + + +def _coerce_to_list(values: Iterable[object]) -> List[float]: + cleaned: List[float] = [] + for value in values: + try: + number = float(value) # type: ignore[arg-type] + except (TypeError, ValueError): + continue + if not math.isfinite(number): + continue + cleaned.append(number) + + return cleaned + + +def _population_stability_index( + reference: List[float], current: List[float], *, bins: int +) -> float: + """Population Stability Index with quantile binning.""" + + if not reference or not current: + return float("nan") + + reference_sorted = sorted(reference) + current_sorted = sorted(current) + + quantiles = [index / bins for index in range(bins + 1)] + edges = [_quantile(reference_sorted, q) for q in quantiles] + edges = _unique_sorted(edges) + + if len(edges) == 1: + midpoint = edges[0] + spread = max( + max((abs(value - midpoint) for value in reference_sorted), default=0.0), + max((abs(value - midpoint) for value in current_sorted), default=0.0), + ) + if spread == 0: + return 0.0 + edges = [midpoint - spread, midpoint + spread] + + low_bound = min(min(reference_sorted), min(current_sorted)) - _EPSILON + high_bound = max(max(reference_sorted), max(current_sorted)) + _EPSILON + edges[0] = min(edges[0], low_bound) + edges[-1] = max(edges[-1], high_bound) + + ref_hist = _histogram(reference_sorted, edges) + cur_hist = _histogram(current_sorted, edges) + + ref_total = sum(ref_hist) + cur_total = sum(cur_hist) + + if ref_total == 0 or cur_total == 0: + return float("nan") + + psi = 0.0 + for ref_count, cur_count in zip(ref_hist, cur_hist): + ref_ratio = max(ref_count / ref_total, _EPSILON) + cur_ratio = max(cur_count / cur_total, _EPSILON) + psi += (cur_ratio - ref_ratio) * math.log(cur_ratio / ref_ratio) + + return psi + + +def _kolmogorov_smirnov(reference: List[float], current: List[float]) -> float: + """Two-sample Kolmogorov-Smirnov statistic.""" + + if not reference or not current: + return float("nan") + + reference_sorted = sorted(reference) + current_sorted = sorted(current) + + n_ref = len(reference_sorted) + n_cur = len(current_sorted) + ref_index = 0 + cur_index = 0 + max_diff = 0.0 + + for value in _unique_sorted(reference_sorted + current_sorted): + while ref_index < n_ref and reference_sorted[ref_index] <= value: + ref_index += 1 + while cur_index < n_cur and current_sorted[cur_index] <= value: + cur_index += 1 + ref_cdf = ref_index / n_ref + cur_cdf = cur_index / n_cur + diff = abs(ref_cdf - cur_cdf) + if diff > max_diff: + max_diff = diff + + return max_diff + + +def _quantile(sorted_values: List[float], quantile: float) -> float: + if not sorted_values: + raise ValueError("Cannot compute quantile of empty data.") + if quantile <= 0: + return sorted_values[0] + if quantile >= 1: + return sorted_values[-1] + + position = quantile * (len(sorted_values) - 1) + lower_index = math.floor(position) + upper_index = math.ceil(position) + + if lower_index == upper_index: + return sorted_values[int(position)] + + lower_value = sorted_values[lower_index] + upper_value = sorted_values[upper_index] + weight = position - lower_index + return lower_value + (upper_value - lower_value) * weight + + +def _histogram(values: List[float], edges: List[float]) -> List[int]: + counts = [0 for _ in range(len(edges) - 1)] + for value in values: + for index in range(len(counts)): + left = edges[index] + right = edges[index + 1] + if index == len(counts) - 1: + if left <= value <= right: + counts[index] += 1 + break + elif left <= value < right: + counts[index] += 1 + break + return counts + + +def _unique_sorted(values: Iterable[float]) -> List[float]: + unique: List[float] = [] + for value in sorted(values): + if not unique or not math.isclose( + value, unique[-1], rel_tol=0.0, abs_tol=_EPSILON + ): + unique.append(value) + return unique + + +def _severity_from_value(value: float, thresholds: Sequence[float]) -> Severity: + """Map a metric value to a severity tier.""" + + if value is None or math.isnan(value): + return Severity.UNKNOWN + + low, high = thresholds + if value < low: + return Severity.STABLE + if value < high: + return Severity.WATCH + return Severity.ALERT diff --git a/toptek/monitor/latency.py b/toptek/monitor/latency.py new file mode 100644 index 0000000..556307d --- /dev/null +++ b/toptek/monitor/latency.py @@ -0,0 +1,93 @@ +"""Latency utilities for rendering last-bar freshness badges.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional + +from .drift import Severity + + +@dataclass(frozen=True) +class LatencyBadge: + """Badge metadata describing feed latency for UI surfaces.""" + + severity: Severity + latency_seconds: float + label: str + message: str + + +def build_latency_badge( + last_bar_timestamp: Optional[datetime], + *, + now: Optional[datetime] = None, + warning_threshold: float = 30.0, + alert_threshold: float = 90.0, +) -> LatencyBadge: + """Compute a latency badge for the latest market bar. + + Parameters + ---------- + last_bar_timestamp: + Timestamp of the last ingested bar. When ``None`` the badge will fall + back to ``UNKNOWN`` severity. + now: + Optional override for the current timestamp. Defaults to + ``datetime.now(timezone.utc)`` to keep the computation deterministic + during testing. + warning_threshold: + Boundary in seconds where the badge escalates to ``WATCH`` severity. + alert_threshold: + Boundary in seconds where the badge escalates to ``ALERT`` severity. + + Returns + ------- + LatencyBadge + Structured badge metadata with severity and copy for the UI layer. + """ + + if warning_threshold <= 0 or alert_threshold <= 0: + raise ValueError("Thresholds must be positive numbers.") + if alert_threshold <= warning_threshold: + raise ValueError("`alert_threshold` must exceed `warning_threshold`.") + + if now is None: + now = datetime.now(timezone.utc) + elif now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + + if last_bar_timestamp is None: + return LatencyBadge( + severity=Severity.UNKNOWN, + latency_seconds=float("nan"), + label="No signal", + message="No bars received yet.", + ) + + if last_bar_timestamp.tzinfo is None: + last_bar_timestamp = last_bar_timestamp.replace(tzinfo=timezone.utc) + + latency_seconds = (now - last_bar_timestamp).total_seconds() + latency_seconds = max(latency_seconds, 0.0) + + if latency_seconds < warning_threshold: + severity = Severity.STABLE + label = "Live" + message = f"Feed healthy ({latency_seconds:.0f}s latency)." + elif latency_seconds < alert_threshold: + severity = Severity.WATCH + label = "Lagging" + message = f"Feed delayed ({latency_seconds:.0f}s latency)." + else: + severity = Severity.ALERT + label = "Stalled" + message = f"Feed stalled ({latency_seconds:.0f}s latency)." + + return LatencyBadge( + severity=severity, + latency_seconds=latency_seconds, + label=label, + message=message, + ) diff --git a/toptek/pipelines/prep_nightly.py b/toptek/pipelines/prep_nightly.py new file mode 100644 index 0000000..2642137 --- /dev/null +++ b/toptek/pipelines/prep_nightly.py @@ -0,0 +1,264 @@ +"""Nightly preparation pipeline orchestrating ingest, training, and reports.""" + +from __future__ import annotations + +import argparse +import json +import math +from dataclasses import dataclass +from datetime import date, datetime, timezone +from pathlib import Path +from typing import Sequence + +import joblib +import matplotlib + +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from sklearn.calibration import CalibratedClassifierCV +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import brier_score_loss + +from toptek.databank import Bank, SyntheticBars +from toptek.features import build_features +from toptek.monitor import compute_drift_report + + +@dataclass(frozen=True) +class ThresholdPoint: + threshold: float + coverage: float + expected_value: float + + +@dataclass(frozen=True) +class PipelineResult: + daily_brief: Path + threshold_curve: Path + model_dir: Path + metrics: dict[str, object] + + +def _optimise_threshold( + probabilities: np.ndarray, + labels: np.ndarray, + *, + min_coverage: float, + min_ev: float, + grid: Sequence[float] | None = None, +) -> ThresholdPoint: + if probabilities.size != labels.size: + raise ValueError("probabilities and labels must align") + if probabilities.size == 0: + raise ValueError("empty probabilities") + if grid is None: + grid = np.linspace(0.4, 0.9, 26) + best: ThresholdPoint | None = None + for tau in grid: + mask = probabilities >= tau + coverage = float(mask.mean()) + if coverage < min_coverage: + continue + if not np.any(mask): + continue + subset = probabilities[mask] + ev = float(np.mean(2 * subset - 1)) + if ev < min_ev: + continue + candidate = ThresholdPoint(tau, coverage, ev) + if best is None or candidate.expected_value > best.expected_value: + best = candidate + if best is None: + idx = int(np.argmax(probabilities)) + tau = float(probabilities[idx]) + best = ThresholdPoint(tau, float(1 / probabilities.size), float(2 * tau - 1)) + return best + + +def _prepare_training_split( + bundle, + validation_fraction: float = 0.2, +) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + X = bundle.X + y = bundle.y + if validation_fraction <= 0 or validation_fraction >= 1: + raise ValueError("validation_fraction must be in (0, 1)") + n = len(X) + if n < 10: + raise ValueError("Not enough samples to train") + split = int(math.floor(n * (1 - validation_fraction))) + if split <= 1 or split >= n: + raise ValueError("Invalid split size") + return X[:split], X[split:], y[:split], y[split:] + + +def _save_threshold_curve(path: Path, points: Sequence[ThresholdPoint]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + xs = [p.threshold for p in points] + coverage = [p.coverage for p in points] + evs = [p.expected_value for p in points] + fig, ax1 = plt.subplots(figsize=(6, 4)) + ax1.plot(xs, coverage, label="Coverage", color="tab:blue") + ax1.set_ylabel("Coverage", color="tab:blue") + ax2 = ax1.twinx() + ax2.plot(xs, evs, label="EV", color="tab:orange") + ax2.set_ylabel("Expected Value", color="tab:orange") + ax1.set_xlabel("Threshold") + ax1.set_title("Threshold Optimisation Curve") + fig.tight_layout() + fig.savefig(path, dpi=144) + plt.close(fig) + + +def _build_model_card( + *, + version: str, + training_rows: int, + validation_rows: int, + threshold: ThresholdPoint, + brier: float, + drift_overall: str, +) -> dict[str, object]: + return { + "version": version, + "training_rows": training_rows, + "validation_rows": validation_rows, + "threshold": threshold.threshold, + "coverage": threshold.coverage, + "expected_value": threshold.expected_value, + "brier_score": brier, + "drift_status": drift_overall, + "created_at": datetime.now(timezone.utc).isoformat(), + } + + +def run_pipeline( + *, + target_date: date, + symbol: str = "ES", + timeframe: str = "5m", + days: int = 365, + min_coverage: float = 0.1, + min_ev: float = 0.0, + bank_root: Path | str = Path("data/bank"), + reports_root: Path | str = Path("reports"), + models_root: Path | str = Path("models"), +) -> PipelineResult: + bank = Bank(Path(bank_root), provider=SyntheticBars()) + bank.ingest( + symbol, + timeframe, + days=days, + end=datetime.combine(target_date, datetime.max.time(), tzinfo=timezone.utc), + ) + data = bank.read(symbol, timeframe) + bundle = build_features(data) + X_train, X_val, y_train, y_val = _prepare_training_split(bundle) + base = LogisticRegression(max_iter=500, solver="lbfgs") + base.fit(X_train, y_train) + calibrator = CalibratedClassifierCV( + base_estimator=base, method="sigmoid", cv="prefit" + ) + calibrator.fit(X_val, y_val) + proba_val = calibrator.predict_proba(X_val)[:, 1] + threshold = _optimise_threshold( + proba_val, + y_val.astype(float), + min_coverage=min_coverage, + min_ev=min_ev, + ) + grid = [ + ThresholdPoint( + float(t), + float((proba_val >= t).mean()), + ( + float(np.mean(2 * proba_val[proba_val >= t] - 1)) + if np.any(proba_val >= t) + else float("nan") + ), + ) + for t in np.linspace(0.3, 0.9, 25) + ] + curve_path = Path(reports_root) / "threshold_curve.png" + valid_points = [p for p in grid if not math.isnan(p.expected_value)] + if valid_points: + _save_threshold_curve(curve_path, valid_points) + brier = float(brier_score_loss(y_val, proba_val)) + feature_names = bundle.meta["feature_names"] + reference = pd.DataFrame(X_train, columns=feature_names) + current = pd.DataFrame(X_val, columns=feature_names) + drift_report = compute_drift_report(reference, current) + daily_brief = { + "date": target_date.isoformat(), + "symbol": symbol, + "timeframe": timeframe, + "tau": threshold.threshold, + "coverage": threshold.coverage, + "expected_value": threshold.expected_value, + "brier_score": brier, + "drift": drift_report.summary, + } + reports_dir = Path(reports_root) + reports_dir.mkdir(parents=True, exist_ok=True) + brief_path = reports_dir / f"daily_brief_{target_date.strftime('%Y%m%d')}.json" + brief_path.write_text(json.dumps(daily_brief, indent=2), encoding="utf-8") + version = target_date.strftime("%Y%m%d") + model_dir = Path(models_root) / version + model_dir.mkdir(parents=True, exist_ok=True) + joblib.dump(base, model_dir / "model.pkl") + joblib.dump(calibrator, model_dir / "calibrator.pkl") + model_card = _build_model_card( + version=version, + training_rows=len(X_train), + validation_rows=len(X_val), + threshold=threshold, + brier=brier, + drift_overall=drift_report.overall.name, + ) + (model_dir / "model_card.json").write_text( + json.dumps(model_card, indent=2), encoding="utf-8" + ) + return PipelineResult( + daily_brief=brief_path, + threshold_curve=curve_path, + model_dir=model_dir, + metrics=daily_brief, + ) + + +def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Toptek nightly preparation pipeline") + parser.add_argument( + "--date", type=lambda s: datetime.strptime(s, "%Y-%m-%d").date() + ) + parser.add_argument("--symbol", default="ES") + parser.add_argument("--timeframe", default="5m") + parser.add_argument("--days", type=int, default=365) + parser.add_argument("--min-coverage", type=float, default=0.1) + parser.add_argument("--min-ev", type=float, default=0.0) + parser.add_argument("--bank-root", default="data/bank") + parser.add_argument("--reports-root", default="reports") + parser.add_argument("--models-root", default="models") + return parser.parse_args(argv) + + +def main(argv: Sequence[str] | None = None) -> int: + args = _parse_args(argv) + run_pipeline( + target_date=args.date, + symbol=args.symbol, + timeframe=args.timeframe, + days=args.days, + min_coverage=args.min_coverage, + min_ev=args.min_ev, + bank_root=Path(args.bank_root), + reports_root=Path(args.reports_root), + models_root=Path(args.models_root), + ) + return 0 + + +if __name__ == "__main__": # pragma: no cover - CLI entry + raise SystemExit(main()) diff --git a/toptek/rank/__init__.py b/toptek/rank/__init__.py new file mode 100644 index 0000000..42a9b88 --- /dev/null +++ b/toptek/rank/__init__.py @@ -0,0 +1,5 @@ +"""Strategy ranking tools.""" + +from .ranker import RankRequest, RankResult, rank_strategies + +__all__ = ["RankRequest", "RankResult", "rank_strategies"] diff --git a/toptek/rank/ranker.py b/toptek/rank/ranker.py new file mode 100644 index 0000000..9bc4664 --- /dev/null +++ b/toptek/rank/ranker.py @@ -0,0 +1,111 @@ +"""Walk-forward ranking engine enforcing risk constraints.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, List + +import numpy as np +import pandas as pd + + +@dataclass(frozen=True) +class RankRequest: + signals: pd.DataFrame + min_coverage: float = 0.1 + min_ev: float = 0.0 + max_drawdown: float = 0.2 + folds: int = 3 + + +@dataclass(frozen=True) +class StrategyScore: + name: str + hit_rate: float + coverage: float + expected_value: float + max_drawdown: float + + +@dataclass(frozen=True) +class RankResult: + scores: List[StrategyScore] + + def to_json(self, path: Path) -> None: + path.write_text( + pd.DataFrame([s.__dict__ for s in self.scores]).to_json( + orient="records", indent=2 + ), + encoding="utf-8", + ) + + def to_csv(self, path: Path) -> None: + pd.DataFrame([s.__dict__ for s in self.scores]).to_csv(path, index=False) + + +def _max_drawdown(equity: np.ndarray) -> float: + peaks = np.maximum.accumulate(equity) + drawdowns = (peaks - equity) / peaks + return float(np.max(drawdowns)) + + +def _walk_forward_chunks(length: int, folds: int) -> Iterable[tuple[int, int]]: + fold_size = length // folds + for i in range(folds): + start = i * fold_size + end = length if i == folds - 1 else (i + 1) * fold_size + yield start, end + + +def rank_strategies(request: RankRequest) -> RankResult: + if request.folds < 2: + raise ValueError("folds must be >=2") + df = request.signals.copy() + if df.empty: + raise ValueError("signals frame must not be empty") + scores: list[StrategyScore] = [] + for column in df.columns: + series = df[column].astype(float) + pnl: list[float] = [] + hits: list[float] = [] + coverages: list[float] = [] + for start, end in _walk_forward_chunks(len(series), request.folds): + window = series.iloc[start:end] + if window.empty: + continue + decisions = window >= 0.0 + coverage = float(decisions.mean()) + if coverage < request.min_coverage: + continue + returns = window.where(decisions, 0.0) + pnl.append(float(returns.mean())) + hits.append(float((returns > 0).mean())) + coverages.append(coverage) + if not pnl: + continue + equity = np.cumsum(pnl) + max_dd = _max_drawdown(np.array(equity) + 1.0) + hit_rate = float(np.mean(hits)) + coverage = float(np.mean(coverages)) + expected_value = float(np.mean(pnl)) + if ( + coverage < request.min_coverage + or expected_value < request.min_ev + or max_dd > request.max_drawdown + ): + continue + scores.append( + StrategyScore( + name=column, + hit_rate=hit_rate, + coverage=coverage, + expected_value=expected_value, + max_drawdown=max_dd, + ) + ) + scores.sort(key=lambda s: (s.expected_value, s.hit_rate), reverse=True) + return RankResult(scores) + + +__all__ = ["RankRequest", "RankResult", "rank_strategies", "StrategyScore"] diff --git a/toptek/ui/widgets/__init__.py b/toptek/ui/widgets/__init__.py new file mode 100644 index 0000000..5d9d6f5 --- /dev/null +++ b/toptek/ui/widgets/__init__.py @@ -0,0 +1,5 @@ +"""Reusable Tkinter widgets for Toptek dashboards.""" + +from .confidence import ConfidenceRing + +__all__ = ["ConfidenceRing"] diff --git a/toptek/ui/widgets/confidence.py b/toptek/ui/widgets/confidence.py new file mode 100644 index 0000000..6bd8fef --- /dev/null +++ b/toptek/ui/widgets/confidence.py @@ -0,0 +1,113 @@ +"""Tkinter widget visualising probability confidence.""" + +from __future__ import annotations + +import tkinter as tk +from dataclasses import dataclass +from typing import Mapping + + +@dataclass(slots=True) +class ConfidenceTheme: + ring_width: int = 12 + stable_color: str = "#3fd08f" + watch_color: str = "#f0c419" + alert_color: str = "#ff6f61" + background: str = "#1b1f24" + text_color: str = "#e5ecf4" + + +class ConfidenceRing(tk.Frame): + """Ring gauge displaying probability, coverage, and EV.""" + + def __init__( + self, master: tk.Misc | None = None, theme: ConfidenceTheme | None = None + ) -> None: + super().__init__(master, bg=(theme.background if theme else "#1b1f24")) + self._theme = theme or ConfidenceTheme() + self._canvas = tk.Canvas( + self, + width=140, + height=140, + highlightthickness=0, + bg=self._theme.background, + ) + self._canvas.grid(row=0, column=0, padx=8, pady=8) + self._label = tk.Label( + self, + fg=self._theme.text_color, + bg=self._theme.background, + font=("Inter", 12, "bold"), + ) + self._label.grid(row=1, column=0, pady=(0, 4)) + self._chip = tk.Label( + self, + fg=self._theme.text_color, + bg=self._theme.background, + font=("Inter", 10), + ) + self._chip.grid(row=2, column=0, pady=(0, 6)) + self._metric = tk.Label( + self, + fg=self._theme.text_color, + bg=self._theme.background, + font=("Inter", 9), + ) + self._metric.grid(row=3, column=0, pady=(0, 6)) + self._draw_ring(0.5) + + def _draw_ring(self, probability: float) -> None: + radius = 120 + start_angle = 90 + extent = probability * 360 + arc_color = self._severity_color(probability) + self._canvas.delete("all") + self._canvas.create_oval( + 10, + 10, + radius, + radius, + outline="#2c3238", + width=self._theme.ring_width, + ) + self._canvas.create_arc( + 10, + 10, + radius, + radius, + start=start_angle, + extent=-extent, + style=tk.ARC, + outline=arc_color, + width=self._theme.ring_width, + capstyle=tk.ROUND, + ) + self._canvas.create_text( + radius / 2, + radius / 2, + text=f"{probability*100:.1f}%", + fill=self._theme.text_color, + font=("Inter", 16, "bold"), + ) + + def _severity_color(self, probability: float) -> str: + if probability >= 0.6: + return self._theme.stable_color + if probability >= 0.5: + return self._theme.watch_color + return self._theme.alert_color + + def update_from_payload(self, payload: Mapping[str, float]) -> None: + probability = float(payload.get("p", payload.get("probability", 0.5))) + coverage = float(payload.get("coverage", 0.0)) + ev = float(payload.get("ev", payload.get("expected_value", 0.0))) + confidence = float( + payload.get("conf", payload.get("confidence", abs(probability - 0.5) * 2)) + ) + self._draw_ring(probability) + self._label.configure(text=f"Confidence {confidence*100:.0f}%") + self._chip.configure(text=f"EV {ev:.3f}") + self._metric.configure(text=f"Coverage {coverage*100:.1f}%") + + +__all__ = ["ConfidenceTheme", "ConfidenceRing"]