In [1]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Tuple

import numpy as np
import pandas as pd


@dataclass
class ASMMCalibrationResult:
    """
    Результат калибровки параметров для ASMM на одном окне.

    Единицы:
      * время — секунды,
      * цены — как mid,
      * объёмы — как trades.qty.
    """
    sigma: float           # волатильность mid в корень(сек)
    alpha: float           # степенной показатель на сегменте [q0, q1]
    k_impact: float        # чувствительность impact к ln(q)
    kappa: float           # alpha / k_impact – коэффициент в exp(-kappa * delta)
    q0: float              # нижний порог сегмента
    q1: float              # верхний порог сегмента
    window_seconds: float  # фактическая длина окна в секундах
    n_trades_segment: int  # число сделок в сегменте


class ASMMCalibrator:
    """
    Лёгкий калибратор "в духе" MarketState, но только для сегмента [q0, q1]
    и коротких окон (real-time).

    Ожидаемые входы:
      trades: DataFrame с колонками:
        - time (pd.Timestamp, tz-aware),
        - price (float),
        - qty (float).
      book: DataFrame с колонками:
        - book_ts (pd.Timestamp, tz-aware),
        - mid (float).

    Алгоритм:
      1) Берём окно по времени (последние window_seconds).
      2) Мёрджим trades + book по asof (как в MarketState).
      3) На сегменте q0 ≤ qty ≤ q1:
           * alpha = 1 + N / sum(log(v / q0))
           * k_impact из регрессии delta = c + k_impact * ln(q) + eps
      4) sigma – std(log(mid_t / mid_{t-1})) по mid, ресэмпленным в 1s.
    """

    def __init__(
        self,
        window_seconds: int = 900,
        min_trades_segment: int = 200,
        book_resample_rule: str = "200ms",
        verbose: bool = True,
    ):
        self.window_seconds = int(window_seconds)
        self.min_trades_segment = int(min_trades_segment)
        self.book_resample_rule = book_resample_rule
        self.verbose = verbose

    # ----- простенький логгер -----
    def _log(self, msg: str):
        if self.verbose:
            print(msg)

    # ----- merge в произвольном окне -----
    def _merge_window(
        self,
        trades: pd.DataFrame,
        book: pd.DataFrame,
    ) -> tuple[Optional[pd.DataFrame], float]:
        """
        Берём последнее время по trades, режем окно [t_end - window; t_end],
        мёрджим как в твоём MarketState и добавляем delta = |price - mid|.
        Возвращает (merged, T_window_sec).
        """
        if trades is None or trades.empty:
            self._log("❌ calibrator: trades пуст.")
            return None, 0.0
        if book is None or book.empty:
            self._log("❌ calibrator: book пуст.")
            return None, 0.0

        t_end = trades["time"].max()
        t_start = t_end - pd.Timedelta(seconds=self.window_seconds)

        trades_w = trades[(trades["time"] >= t_start) & (trades["time"] <= t_end)].copy()
        book_w = book[(book["book_ts"] >= t_start) & (book["book_ts"] <= t_end)].copy()

        if trades_w.empty or book_w.empty:
            self._log("❌ calibrator: в окне нет ни trades, ни book.")
            return None, 0.0

        # ресэмплим book, как в MarketState
        book_res = (
            book_w.set_index("book_ts")
            .resample(self.book_resample_rule)
            .median()
            .dropna()
            .reset_index()
        )
        if book_res.empty:
            self._log("❌ calibrator: после ресэмплинга book пуст.")
            return None, 0.0

        trades_w = trades_w.sort_values("time")
        book_res = book_res.sort_values("book_ts")

        merged = pd.merge_asof(
            trades_w,
            book_res,
            left_on="time",
            right_on="book_ts",
            direction="backward",
        ).dropna(subset=["mid", "price", "qty"])

        if merged.empty:
            self._log("❌ calibrator: merged пуст.")
            return None, 0.0

        merged["delta"] = (merged["price"] - merged["mid"]).abs()

        t_min = merged["time"].min()
        t_max = merged["time"].max()
        T = (t_max - t_min).total_seconds()
        if T <= 0:
            self._log("❌ calibrator: T_window ≤ 0.")
            return None, 0.0

        return merged, float(T)

    # ----- alpha на сегменте [q0, q1] -----
    def _estimate_alpha_segment(
        self,
        volumes: np.ndarray,
        q0: float,
        q1: float,
    ) -> tuple[float, int]:
        v = np.asarray(volumes, dtype=float)
        v = v[np.isfinite(v)]
        v = v[(v >= q0) & (v <= q1)]

        n = len(v)
        if n < self.min_trades_segment:
            return float("nan"), n

        # ММ-оценка степенного хвоста при пороге q0
        alpha = 1.0 + n / np.sum(np.log(v / q0))
        return float(alpha), n

    # ----- k_impact на том же сегменте -----
    def _estimate_k_impact_segment(
        self,
        deltas: np.ndarray,
        volumes: np.ndarray,
        q0: float,
        q1: float,
    ) -> float:
        d = np.asarray(deltas, dtype=float)
        v = np.asarray(volumes, dtype=float)

        mask = (
            np.isfinite(d)
            & np.isfinite(v)
            & (v > 0)
            & (d >= 0)
            & (v >= q0)
            & (v <= q1)
        )
        d = d[mask]
        v = v[mask]

        if len(d) < self.min_trades_segment:
            return float("nan")

        x = np.log(v)
        y = d

        # линейная регрессия y = a + b x
        b, a = np.polyfit(x, y, 1)
        return float(b)

    # ----- sigma по mid из book -----
    def _estimate_sigma(
        self,
        book: pd.DataFrame,
    ) -> float:
        """
        Берём mid по book, ресэмплим в 1s, считаем лог-доходности и их std.
        Это даёт sigma в корень(сек).
        """
        if book is None or book.empty:
            return float("nan")

        s = (
            book.set_index("book_ts")["mid"]
            .resample("1s")
            .last()
            .dropna()
        )
        if len(s) < 10:
            return float("nan")

        log_s = np.log(s.astype(float))
        rets = log_s.diff().dropna()
        if len(rets) < 5:
            return float("nan")

        sigma = float(rets.std())
        return sigma

    # ----- публичный метод -----
    def calibrate(
        self,
        trades: pd.DataFrame,
        book: pd.DataFrame,
        q0: float,
        q1: float,
    ) -> Optional[ASMMCalibrationResult]:
        """
        Главное API: на вход DataFrame'ы из Hummingbot за "недавнее прошлое",
        на выход – ASMMCalibrationResult или None, если данных мало.
        """
        if not (q0 > 0 and q1 > q0):
            self._log("❌ calibrator: некорректные q0/q1.")
            return None

        merged, T = self._merge_window(trades, book)
        if merged is None or T <= 0:
            return None

        # sigma по всему окну (без фильтра по q)
        sigma = self._estimate_sigma(book)
        if not np.isfinite(sigma):
            self._log("⚠️ calibrator: sigma не удалось оценить.")
            sigma = float("nan")

        volumes = merged["qty"].values
        deltas = merged["delta"].values

        # alpha и k_impact только на сегменте [q0, q1]
        alpha, n_seg = self._estimate_alpha_segment(volumes, q0=q0, q1=q1)
        if not np.isfinite(alpha):
            self._log(
                f"⚠️ calibrator: alpha не удалось оценить (n_seg={n_seg})."
            )
            return None

        k_impact = self._estimate_k_impact_segment(deltas, volumes, q0=q0, q1=q1)
        if not np.isfinite(k_impact) or k_impact <= 0:
            self._log("⚠️ calibrator: k_impact не удалось оценить или он ≤ 0.")
            return None

        kappa = alpha / k_impact

        self._log(
            f"✅ calibrator: окно ~{T:.1f} сек, n_seg={n_seg}, "
            f"sigma={sigma:.3g}, alpha={alpha:.3g}, "
            f"k_impact={k_impact:.3g}, kappa={kappa:.3g}"
        )

        return ASMMCalibrationResult(
            sigma=sigma,
            alpha=alpha,
            k_impact=k_impact,
            kappa=kappa,
            q0=float(q0),
            q1=float(q1),
            window_seconds=T,
            n_trades_segment=n_seg,
        )
