In [10]:
# sql.py and config.py
import os
from datetime import UTC, datetime
from functools import lru_cache
from typing import TYPE_CHECKING, Final
from collections.abc import Mapping
import pandas as pd
from pandas import DataFrame
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, Connection

if TYPE_CHECKING:  # import only for type checking
    from redis import Redis  # pragma: no cover

DB_URL: Final[str] = os.getenv(
    "DB_URL",
    "mysql+mysqlconnector://247team:password@192.168.50.238:3306/trades",
)

BALANCE_SCHEMA = "balance"
BALANCE_TIME_COLUMN = "datetime"
BALANCE_VALUE_COLUMN = "overall_balance"

@lru_cache(maxsize=1)
def get_engine() -> Engine:
    """Return a cached SQLAlchemy Engine."""
    return create_engine(DB_URL, pool_pre_ping=True, pool_recycle=3600)

def _sql_to_df(
    conn: Connection, stmt: str, params: Mapping[str, object] | None = None
) -> DataFrame:
    """Execute SQL and materialize a DataFrame."""
    q = text(stmt)
    res = conn.execute(q, params or {})
    try:
        cols = list(res.keys())
        rows = [dict(m) for m in res.mappings().all()]
        return pd.DataFrame(rows, columns=cols)
    finally:
        res.close()

def _coerce_float(x: object) -> float:
    """Robust scalar→float conversion that keeps type checkers quiet."""
    if isinstance(x, int | float):
        return float(x)
    try:
        s = pd.Series([x])
        v = pd.to_numeric(s, errors="coerce").iloc[0]
        return float(v) if pd.notna(v) else 0.0
    except Exception:
        return 0.0


def _coerce_ts(x: object, *, default: pd.Timestamp) -> pd.Timestamp:
    """Coerce a single value to pd.Timestamp; fallback to provided default.

    Uses a Series-based path to satisfy pandas-stubs overloads for to_datetime.
    """
    try:
        s = pd.Series([x])
        ts_series = pd.to_datetime(s, errors="coerce")
        ts0 = ts_series.iloc[0]
        if isinstance(ts0, pd.Timestamp) and not pd.isna(ts0):
            return ts0
    except Exception:
        pass
    return default

def nearest_balance_on_or_before(
    account: str, start_ts: pd.Timestamp
) -> tuple[float, pd.Timestamp]:
    """Return (balance, timestamp) for nearest snapshot <= start_ts; fallback to earliest."""
    eng = get_engine()
    with eng.connect() as conn:
        q1 = (
            f"SELECT `{BALANCE_TIME_COLUMN}` AS ts, `{BALANCE_VALUE_COLUMN}` AS bal "
            f"FROM `{BALANCE_SCHEMA}`.`{account}_balance` "
            f"WHERE `{BALANCE_TIME_COLUMN}` <= :start "
            f"ORDER BY `{BALANCE_TIME_COLUMN}` DESC LIMIT 1"
        )
        df = _sql_to_df(conn, q1, {"start": f"{start_ts:%Y-%m-%d %H:%M:%S}"})
        if not df.empty:
            bal = _coerce_float(df.at[0, "bal"])
            ts = _coerce_ts(df.at[0, "ts"], default=start_ts)
            return bal, ts

        q2 = (
            f"SELECT `{BALANCE_TIME_COLUMN}` AS ts, `{BALANCE_VALUE_COLUMN}` AS bal "
            f"FROM `{BALANCE_SCHEMA}`.`{account}_balance` "
            f"ORDER BY `{BALANCE_TIME_COLUMN}` ASC LIMIT 1"
        )
        df2 = _sql_to_df(conn, q2)
        if df2.empty:
            return 0.0, start_ts
        bal2 = _coerce_float(df2.at[0, "bal"])
        ts2 = _coerce_ts(df2.at[0, "ts"], default=start_ts)
        return bal2, ts2

def read_trades(account: str, start_dt: str, end_dt: str) -> DataFrame:
    """Trades with realizedPnl net of commission; index=time."""
    eng = get_engine()
    sql = (
        "SELECT symbol, id, orderId, side, price, qty, realizedPnl, commission, time, positionSide "
        f"FROM `{account}` WHERE time >= :start AND time <= :end"
    )
    with eng.connect() as conn:
        df = _sql_to_df(conn, sql, {"start": start_dt, "end": end_dt})
    if df.empty:
        return DataFrame(columns=["symbol", "realizedPnl"]).set_index(
            pd.DatetimeIndex([], name="time"),
        )
    df["time"] = pd.to_datetime(df["time"], errors="coerce")
    df = df.dropna(subset=["time"]).set_index("time").sort_index()
    pnl = pd.to_numeric(df["realizedPnl"], errors="coerce").fillna(0.0)
    fee = pd.to_numeric(df["commission"], errors="coerce").fillna(0.0)
    df["realizedPnl"] = pnl - fee
    return df


def read_transactions(account: str, start_dt: str, end_dt: str) -> DataFrame:
    """Transaction history: incomeType, income, time; index=time."""
    eng = get_engine()
    sql = (
        "SELECT incomeType, income, time "
        f"FROM `transaction_history`.`{account}_transaction` "
        "WHERE time >= :start AND time <= :end"
    )
    with eng.connect() as conn:
        df = _sql_to_df(conn, sql, {"start": start_dt, "end": end_dt})
    if df.empty:
        return DataFrame(columns=["incomeType", "income"]).set_index(
            pd.DatetimeIndex([], name="time"),
        )
    df["time"] = pd.to_datetime(df["time"], errors="coerce")
    df = df.dropna(subset=["time"]).set_index("time").sort_index()
    df["income"] = pd.to_numeric(df["income"], errors="coerce").fillna(0.0)
    df["incomeType"] = df["incomeType"].astype(str)
    return df


def read_earnings(account: str, start_dt: str, end_dt: str) -> DataFrame:
    """Earnings (rewards, time); index=time."""
    eng = get_engine()
    sql = (
        "SELECT rewards, time "
        f"FROM `earnings`.`{account}_earnings` "
        "WHERE time >= :start AND time <= :end"
    )
    with eng.connect() as conn:
        df = _sql_to_df(conn, sql, {"start": start_dt, "end": end_dt})
    if df.empty:
        return DataFrame(columns=["rewards"]).set_index(pd.DatetimeIndex([], name="time"))
    df["time"] = pd.to_datetime(df["time"], errors="coerce")
    df = df.dropna(subset=["time"]).set_index("time").sort_index()
    df["rewards"] = pd.to_numeric(df["rewards"], errors="coerce").fillna(0.0)
    return df

In [11]:
# equity.py
import pandas as pd
from pandas import Series

def _daily_pnl(trades: DataFrame, txn: DataFrame, earn: DataFrame) -> Series:
    """Sum daily PnL from trades (net), funding fee, earnings. Excludes transfers."""
    parts: list[Series] = []
    if not trades.empty:
        parts.append(trades["realizedPnl"])
    if not txn.empty:
        it = txn["incomeType"].astype(str).str.upper()
        if it.eq("FUNDING_FEE").any():
            parts.append(txn.loc[it.eq("FUNDING_FEE"), "income"])
        # Exclude TRANSFER by design
    if not earn.empty:
        parts.append(earn["rewards"])
    if not parts:
        return pd.Series(dtype="float64")
    s = pd.concat(parts).sort_index()
    s.index = pd.DatetimeIndex(s.index)
    return s.resample("D").sum()

def build_fixed_balances(
    accounts: list[str],
    start_day: pd.Timestamp,
    end_day: pd.Timestamp,
) -> tuple[pd.DataFrame, dict[str, float]]:
    """Build realized equity series (no UPnL, no unrealized shift), daily frequency."""
    idx = pd.date_range(start_day.normalize(), end_day.normalize(), freq="D")
    cols: list[pd.Series] = []
    init_map: dict[str, float] = {}

    # Initial balance is fetched by orchestrator; here we only build deltas.
    for acc in accounts:
        tr = read_trades(acc, f"{start_day.date()} 00:00:00", f"{end_day.date()} 23:59:59")
        tx = read_transactions(acc, f"{start_day.date()} 00:00:00", f"{end_day.date()} 23:59:59")
        er = read_earnings(acc, f"{start_day.date()} 00:00:00", f"{end_day.date()} 23:59:59")

        daily = _daily_pnl(tr, tx, er)
        if daily.empty:
            s = pd.Series(0.0, index=idx, name=acc, dtype="float64")
        else:
            s = daily.reindex(idx).fillna(0.0).cumsum()
        cols.append(s.rename(acc))

    delta = (
        pd.concat(cols, axis=1)
        if cols
        else pd.DataFrame(index=idx, columns=accounts, dtype="float64")
    )
    return delta, init_map  # init_map kept for signature parity

In [12]:
# returns.py
def mtd_return(balance: DataFrame) -> dict[str, float]:
    """Month-to-date simple return per column (latest month in index)."""
    if balance.empty:
        return {}
    idx = pd.DatetimeIndex(balance.index)
    ym = idx.year * 100 + idx.month
    latest = int(ym.max())
    month = balance.loc[ym == latest]
    if month.empty:
        return {}
    first = month.iloc[0]
    last = month.iloc[-1]
    out: dict[str, float] = {}
    for col in month.columns:
        first_val = float(first[col])
        last_val = float(last[col])
        out[str(col)] = (last_val - first_val) / first_val if first_val != 0.0 else 0.0
    return out

In [13]:
# performance_metrics.py
from typing import Sequence
import pandas as pd
from zoneinfo import ZoneInfo

def _mtd_window_today() -> tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp]:
    """Return (start_of_month_local, now_local_floored_to_hour, yesterday_local_normalized).

    Anchored to Europe/Zurich. Returned timestamps are naive (tz removed) but
    aligned to local time to match the CLI's intraday, hourly-anchored behavior.
    """
    tz = ZoneInfo("Europe/Zurich")
    now_local = pd.Timestamp.now(tz=tz).floor("h")
    start_local = now_local.normalize().replace(day=1)
    yesterday_local = now_local.normalize() - pd.Timedelta(days=1)
    return (
        start_local.tz_convert(None),
        now_local.tz_convert(None),
        yesterday_local.tz_convert(None),
    )

def _offset_fixed_with_initial(
    fixed_delta: pd.DataFrame,
    init_map: dict[str, float],
    accounts: list[str],
) -> pd.DataFrame:
    """Turn per-period deltas into realized equity by adding SQL initial balances."""
    if fixed_delta.empty:
        return fixed_delta
    fixed = fixed_delta.copy()
    for a in accounts:
        if a in fixed.columns:
            fixed[a] = fixed[a] + float(init_map.get(a, 0.0))
    return fixed

def build_metrics_payload(accounts: Sequence[str]) -> dict[str, object]:
    accs = [a.strip().lower() for a in accounts if a.strip()]
    start_day, today, _yesterday = _mtd_window_today()

    # Initial balances (SQL only)
    init_map: dict[str, float] = {}
    for a in accs:
        bal, _ts = nearest_balance_on_or_before(a, start_day)
        init_map[a] = float(bal)

    # Realized equity (SQL deltas + SQL initial), hourly through NOW
    fixed_delta, _ = build_fixed_balances(accs, start_day, today)
    fixed = _offset_fixed_with_initial(fixed_delta, init_map, accs)

    # Returns blocks (MTD series-based) — series-level MTD% using levels
    fixed_total = fixed.assign(total=fixed[accs].sum(axis=1)) if not fixed.empty else fixed
    mtd_ret_realized = mtd_return(fixed_total) if not fixed_total.empty else {}
    print(f"mtd_ret_realized: {mtd_ret_realized}")

build_metrics_payload(["fund2", "fund3"])

mtd_ret_realized: {'fund2': 0.014733934812096615, 'fund3': -0.031863293640260866, 'total': -0.008829522505566073}
