In [None]:
# trade_pacifica_vwap_resid.py
# -*- coding: utf-8 -*-
# Python 3.9+
# pip install requests==2.32.3 python-dotenv==1.0.1 pybit==5.7.0 solders base58 pandas numpy

import os, time, math
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from pybit.unified_trading import HTTP as BybitHTTP

# === Pacifica thin client ===
from pacifica_client import (
    load_owner, load_or_create_agent, bind_agent_wallet,
    create_market_order, cancel_all_orders, _coid
)

# ================== パラメータ（ASTERの構造に揃えています） ==================
PACIFICA_SYMBOL_EXEC = "SOL"         # Pacifica で売買するシンボル（例: "SOL"）
BYBIT_ALT_SYMBOL     = "SOLUSDT"     # Bybit 側のALT
BYBIT_BTC_SYMBOL     = "BTCUSDT"     # Bybit 側のBTC

# 残差（係数）推定 & フィルター
COEFF_WINDOW         = 15
USE_MAD_SCALE        = False
RESID_K              = 2.0
RESID_PERSIST        = 1
USE_DIRECTION_FILTER = True
VWAP_RESID_WINDOW    = 5

# VWAP クロス（ALT 側）
VWAP_SHORT           = 10
VWAP_MEDIUM          = 12

# 取引サイズ・手数料・SL
NOTIONAL_USD         = 500.0        # 名目USD（手動指定）
TAKER_FEE            = 0.0004       # 片道手数料（参考ログ用）
STOP_LOSS_PCT        = 0.05         # 5% SL（成行のstop_lossに同梱）

# 取得バー設定（5分足）
KLINE_INTERVAL_BYBIT = "5"          # Bybit: "5"=5分
BAR_MINUTES          = 5
KLINE_LIMIT          = 120
MINUTE_DELAY_SEC     = 5            # 次足確定待ちの遅延（秒）

RETRY_WAIT_SEC       = 5
MAX_RETRIES          = 12

# SL 後の挙動
AFTER_SL_POLICY = "WAIT_CROSS"      # "IMMEDIATE" or "WAIT_CROSS"
COOLDOWN_SEC_AFTER_SL = 30

# ================== ユーティリティ ==================
def _now_ms() -> int:
    return int(time.time() * 1000)

def last_closed_open_ms(now_ms: int) -> int:
    """直近の確定バー（直前バー）のopen時刻(ms)にスナップ"""
    period_ms = BAR_MINUTES * 60_000
    return ((now_ms - period_ms) // period_ms) * period_ms

def sleep_until_next_minute_plus(delta_sec: int = MINUTE_DELAY_SEC):
    """次のバー境界＋オプション遅延までスリープ（5分足でもOKな汎用化）"""
    now = time.time()
    period = BAR_MINUTES * 60.0
    next_boundary = math.floor(now / period) * period + period
    time.sleep(max(0.0, next_boundary + delta_sec - now))

# ================== Bybit クライアント（データ専用） ==================
@dataclass
class BybitClient:
    api_key: str
    api_secret: str
    testnet: bool = False
    def __post_init__(self):
        self.session = BybitHTTP(testnet=self.testnet, api_key=self.api_key, api_secret=self.api_secret)

    def get_klines(self, symbol: str, interval: str = KLINE_INTERVAL_BYBIT, limit: int = KLINE_LIMIT):
        r = self.session.get_kline(category="linear", symbol=symbol, interval=interval, limit=limit)
        lst = r["result"]["list"]
        # list の並びは: [start, open, high, low, close, volume, turnover, ...]
        rows = [{
            "openTime": int(rr[0]),
            "open": float(rr[1]),
            "high": float(rr[2]),
            "low": float(rr[3]),
            "close": float(rr[4]),
            "volume": float(rr[5]),   # ★出来高を取り込む
        } for rr in lst]
        rows.sort(key=lambda x: x["openTime"])
        return rows

# ================== シグナル（VWAPクロス × 残差） ==================
def _rolling_resid_z(ret_alt: pd.Series, ret_btc: pd.Series, W: int, use_mad: bool) -> pd.Series:
    mA = ret_alt.rolling(W, min_periods=W).mean()
    mB = ret_btc.rolling(W, min_periods=W).mean()
    vB = ret_btc.rolling(W, min_periods=W).var(ddof=0)
    cAB = ret_alt.rolling(W, min_periods=W).cov(ret_btc)
    beta = cAB / vB
    alpha = mA - beta * mB
    pred  = alpha + beta * ret_btc
    resid = ret_alt - pred
    if use_mad:
        def _mad(s):
            med = s.median()
            return 1.4826 * (np.abs(s - med)).median()
        scale = resid.rolling(W, min_periods=W).apply(_mad, raw=False)
    else:
        scale = resid.rolling(W, min_periods=W).std()
    return resid / scale

def _rolling_vwap_from_rows(rows, window: int) -> pd.Series:
    """
    rows: BybitClient.get_klines() の返す list[dict]
          keys: openTime, open, high, low, close, volume
    """
    df = pd.DataFrame(rows).sort_values("openTime").reset_index(drop=True)
    typical = (df["open"] + df["high"] + df["low"] + df["close"]) / 4.0
    tpv = typical * df["volume"]
    num = tpv.rolling(window, min_periods=window).sum()
    den = df["volume"].rolling(window, min_periods=window).sum()
    return num / den  # indexはdf.index（openTimeとは別）

def _rolling_resid_z_via_vwap_resid(vwap_alt: pd.Series,
                                    vwap_btc: pd.Series,
                                    W: int,
                                    use_mad: bool) -> pd.Series:
    # VWAP_resid の対数リターン
    ret_alt = np.log(vwap_alt).diff()
    ret_btc = np.log(vwap_btc).diff()

    # ローリングOLS（alpha, beta）
    mA = ret_alt.rolling(W, min_periods=W).mean()
    mB = ret_btc.rolling(W, min_periods=W).mean()
    vB = ret_btc.rolling(W, min_periods=W).var(ddof=0)
    cAB = ret_alt.rolling(W, min_periods=W).cov(ret_btc)
    beta = cAB / vB
    alpha = mA - beta * mB

    pred  = alpha + beta * ret_btc
    resid = ret_alt - pred

    if use_mad:
        def _mad(s):
            med = s.median()
            return 1.4826 * (np.abs(s - med)).median()
        scale = resid.rolling(W, min_periods=W).apply(_mad, raw=False)
    else:
        scale = resid.rolling(W, min_periods=W).std()

    # ゼロ割り・極小スケール保護
    scale = scale.replace(0.0, np.nan)
    z = resid / scale
    return z

def compute_trade_signal(byb_alt_rows: list, byb_btc_rows: list):
    # DataFrames
    alt = pd.DataFrame(byb_alt_rows).sort_values("openTime").reset_index(drop=True)
    btc = pd.DataFrame(byb_btc_rows).sort_values("openTime").reset_index(drop=True)

    # 最低本数チェック（VWAP_resid と OLS で必要な長さを満たすか）
    need = max(COEFF_WINDOW, VWAP_RESID_WINDOW, VWAP_MEDIUM) + 2
    if len(alt) < need or len(btc) < need:
        return None, {"reason": "insufficient bars"}

    # ---- 残差用 VWAP_resid を作る（ALT/BTC）
    vwap_alt_resid = _rolling_vwap_from_rows(byb_alt_rows, VWAP_RESID_WINDOW)
    vwap_btc_resid = _rolling_vwap_from_rows(byb_btc_rows, VWAP_RESID_WINDOW)

    # インデックスで揃っている想定だが、openTimeで厳密に揃える
    tmp_alt = alt[["openTime"]].copy()
    tmp_alt["VWAP_resid_ALT"] = vwap_alt_resid.values
    tmp_btc = btc[["openTime"]].copy()
    tmp_btc["VWAP_resid_BTC"] = vwap_btc_resid.values

    x = tmp_alt.merge(tmp_btc, on="openTime", how="inner").reset_index(drop=True)

    # VWAP_residベース残差Z
    z_series = _rolling_resid_z_via_vwap_resid(
        x["VWAP_resid_ALT"], x["VWAP_resid_BTC"], COEFF_WINDOW, USE_MAD_SCALE
    )
    x["resid_z"] = z_series

    # ---- ここから “エントリー用 VWAP（短/中）” ----
    alt["typical"] = (alt["open"] + alt["high"] + alt["low"] + alt["close"]) / 4.0
    alt["TPxV"]    = alt["typical"] * alt["volume"]

    roll_tpv_s = alt["TPxV"].rolling(VWAP_SHORT,  min_periods=1).sum()
    roll_v_s   = alt["volume"].rolling(VWAP_SHORT,  min_periods=1).sum()
    roll_tpv_m = alt["TPxV"].rolling(VWAP_MEDIUM, min_periods=1).sum()
    roll_v_m   = alt["volume"].rolling(VWAP_MEDIUM, min_periods=1).sum()

    alt["VWAP_short"]  = roll_tpv_s / roll_v_s
    alt["VWAP_medium"] = roll_tpv_m / roll_v_m

    df = alt.merge(
        x[["openTime", "resid_z"]],
        on="openTime", how="left"
    )

    df["Regeme_vwap"] = (df["VWAP_short"] > df["VWAP_medium"]).astype(int)
    df["Reg_prev"]    = df["Regeme_vwap"].shift(1)
    df["ignition"]    = (df["Regeme_vwap"] != df["Reg_prev"])

    # 残差フィルター（バックテスト版と同じ）
    gate_abs     = df["resid_z"].abs() > RESID_K
    gate_persist = gate_abs.rolling(RESID_PERSIST, min_periods=RESID_PERSIST).sum() == RESID_PERSIST
    if USE_DIRECTION_FILTER:
        gate_dir = ((df["resid_z"] > 0) & (df["Regeme_vwap"] == 1)) | \
                   ((df["resid_z"] < 0) & (df["Regeme_vwap"] == 0))
        gate = gate_persist & gate_dir
    else:
        gate = gate_persist

    df["signal"] = np.where(df["ignition"] & gate, df["Regeme_vwap"], np.nan)

    last = df.iloc[-1]
    sig  = int(last["signal"]) if pd.notna(last["signal"]) else None

    debug = {
        "resid_z": (float(last["resid_z"]) if pd.notna(last["resid_z"]) else None),
        "vwap_s" : float(last["VWAP_short"]),
        "vwap_m" : float(last["VWAP_medium"]),
        "reg"    : int(last["Regeme_vwap"]),
    }
    return sig, debug

# ================== トレーダ（Pacificaで約定） ==================
@dataclass
class SingleLegState:
    side: Optional[str] = None     # "LONG"/"SHORT"
    qty: float = 0.0
    entry: Optional[float] = None
    sl: Optional[float] = None

class PacificaSignalTrader:
    def __init__(self, bybit: BybitClient, owner, agent):
        self.bybit = bybit
        self.owner = owner
        self.owner_pub = str(owner.pubkey())
        self.agent = agent
        self.state = SingleLegState()
        self._cooldown_until = None
        self._waiting_for_cross = False
        self._sl_side = None  # "LONG"/"SHORT" to re-enter immediately after SL

    # ---- 量の決定（手動：NOTIONAL_USD / price）----
    def _compute_trade_qty(self, px: float) -> float:
        raw = NOTIONAL_USD / max(px, 1e-9)
        # ロット刻みは手動指定の前提（必要に応じて market info で厳密化可）
        return max(round(raw, 6), 1e-6)

    def _fetch_last_closed_bars(self):
        tries = 0
        while tries < MAX_RETRIES:
            now_ms = _now_ms()
            target = last_closed_open_ms(now_ms)
            try:
                alt_byb = self.bybit.get_klines(BYBIT_ALT_SYMBOL, KLINE_INTERVAL_BYBIT, KLINE_LIMIT)
                btc_byb = self.bybit.get_klines(BYBIT_BTC_SYMBOL, KLINE_INTERVAL_BYBIT, KLINE_LIMIT)
            except Exception as e:
                print("[kline fetch error]", e)
                alt_byb, btc_byb = [], []

            ba = next((x for x in alt_byb if x["openTime"] == target), None)
            bb = next((x for x in btc_byb if x["openTime"] == target), None)

            if ba and bb:
                ts_iso = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(target/1000))
                return alt_byb, btc_byb, ts_iso

            missing = []
            if not ba: missing.append("Bybit ALT closed")
            if not bb: missing.append("Bybit BTC closed")
            print(f"[retry in {RETRY_WAIT_SEC}s] missing={','.join(missing)} target_open:{target}")
            time.sleep(RETRY_WAIT_SEC); tries += 1
        return None, None, None

    # ---- Pacifica: 起動時のポジション引き継ぎ ----
    def resume_position_if_any(self):
        import requests
        from common.constants import REST_URL
        url = f"{REST_URL}/positions"
        params = {"account": self.owner_pub}
        try:
            r = requests.get(url, params=params, headers={"Content-Type": "application/json"}, timeout=15)
            if r.status_code // 100 != 2:
                print("[positions] non-2xx:", r.status_code, r.text)
                return
            data = r.json()
            items = data.get("data") or data
            if not isinstance(items, list):
                items = [items]
            pos = next((it for it in items if it.get("symbol") == PACIFICA_SYMBOL_EXEC), None)
            if not pos:
                print("[positions] no position for", PACIFICA_SYMBOL_EXEC)
                return
            qty  = float(pos.get("position_size", 0.0))
            side = "LONG" if qty > 0 else ("SHORT" if qty < 0 else None)
            ep   = float(pos.get("entry_price", 0.0))
            if side:
                self.state.side  = side
                self.state.qty   = abs(qty)
                self.state.entry = ep
                print(f"[RESUME] {side} qty={self.state.qty} entry={self.state.entry}")
        except Exception as e:
            print("[positions] error]", e)

    def _enter(self, side: str, last_price: float):
        qty = self._compute_trade_qty(last_price)
        if qty <= 0:
            print("[enter] qty<=0, skip")
            return False

        # SL を Market-Order に同梱
        if side == "LONG":
            sl_price = last_price * (1 - STOP_LOSS_PCT)
            stop_loss = {"stop_price": f"{sl_price:.6f}", "client_order_id": _coid("sl")}
        else:
            sl_price = last_price * (1 + STOP_LOSS_PCT)
            stop_loss = {"stop_price": f"{sl_price:.6f}", "client_order_id": _coid("sl")}

        r = create_market_order(
            self.owner_pub, self.agent, symbol=PACIFICA_SYMBOL_EXEC,
            side=("bid" if side == "LONG" else "ask"),
            amount=f"{qty:.8f}",
            slippage_percent="0.5",
            client_order_id=_coid("mkt"),
            stop_loss=stop_loss
        )
        print("[ENTER resp]", r.status_code, r.text)
        if r.status_code // 100 == 2:
            self.state.side  = side
            self.state.qty   = qty
            self.state.entry = last_price
            self.state.sl    = sl_price
            print(f"[ENTER] {side} {qty} @ ~{last_price:.6f} (SL {sl_price:.6f})")
            return True
        return False

    def _reverse(self, side: str, last_price: float):
        # 反転：全キャンセル → 新規成行
        cancel_all_orders(self.owner_pub, self.agent, symbols=[PACIFICA_SYMBOL_EXEC])
        print("[CANCEL ALL] sent")
        self.state = SingleLegState()
        return self._enter(side, last_price)

    def _ensure_flat(self):
        cancel_all_orders(self.owner_pub, self.agent, symbols=[PACIFICA_SYMBOL_EXEC])
        print("[CANCEL ALL] sent")
        self.state = SingleLegState()

    def run(self):
        # 1) bind（冪等）— self.owner を使う
        print("[bind]", bind_agent_wallet(self.owner, str(self.agent.pubkey())).status_code)
        # 2) 起動時のポジション引き継ぎ
        self.resume_position_if_any()

        while True:
            try:
                sleep_until_next_minute_plus(MINUTE_DELAY_SEC)
                alt_byb, btc_byb, ts = self._fetch_last_closed_bars()
                if alt_byb is None:
                    continue

                sig, dbg = compute_trade_signal(alt_byb, btc_byb)
                last_price = alt_byb[-1]["close"]

                side_txt = self.state.side or "-"
                rz = dbg.get('resid_z')
                vs = dbg.get('vwap_s')
                vm = dbg.get('vwap_m')
                print(f"[{ts}] side={side_txt} price={last_price:.6f} "
                      f"(resid_z={None if rz is None else f'{rz:.3f}'}, "
                      f"vwap_s={vs:.6f}, vwap_m={vm:.6f})")

                # クールダウン＆SL後ポリシー
                if self._cooldown_until and time.time() < self._cooldown_until:
                    print("[COOLDOWN] skipping signals")
                    continue

                if AFTER_SL_POLICY == "IMMEDIATE" and self._sl_side:
                    print("[SL POLICY] re-enter immediately")
                    self._enter(self._sl_side, last_price)
                    self._sl_side = None
                    continue

                if AFTER_SL_POLICY == "WAIT_CROSS" and self._waiting_for_cross:
                    if self.state.side is None:
                        self._waiting_for_cross = False
                    else:
                        desired_for_exit = (1 if self.state.side == "LONG" else 0)
                        if dbg.get("reg") != desired_for_exit:
                            print("[SL POLICY] cross detected, re-entry enabled")
                            self._waiting_for_cross = False
                        else:
                            continue

                if sig is None:
                    continue

                desired = "LONG" if sig == 1 else "SHORT"
                if self.state.side is None:
                    self._enter(desired, last_price)
                elif self.state.side != desired:
                    self._reverse(desired, last_price)

            except Exception as e:
                print("[run-loop error]", e)
                time.sleep(RETRY_WAIT_SEC)

# ================== 起動 ==================
def main():
    load_dotenv("keys.env")
    bybit = BybitClient(
        api_key=os.getenv("BYBIT_API_KEY") or "",
        api_secret=os.getenv("BYBIT_API_SECRET") or "",
        testnet=(os.getenv("BYBIT_TESTNET", "false").lower() == "true")
    )
    owner = load_owner()
    agent = load_or_create_agent()

    print("==============================================")
    print("[START] Pacifica VWAP×Residual Trader")
    print(f" Exec={PACIFICA_SYMBOL_EXEC} (Pacifica), SignalAlt={BYBIT_ALT_SYMBOL} (Bybit), SignalBTC={BYBIT_BTC_SYMBOL} (Bybit)")
    print(f" COEFF_WINDOW={COEFF_WINDOW}, MAD={USE_MAD_SCALE}, K={RESID_K}, PERSIST={RESID_PERSIST}")
    print(f" VWAP(short/medium)=({VWAP_SHORT},{VWAP_MEDIUM}), DirFilter={USE_DIRECTION_FILTER}")
    print(f" NOTIONAL_USD={NOTIONAL_USD}, SL={STOP_LOSS_PCT:.2%}, Fee(1way est.)={TAKER_FEE:.5f}")
    print(f" AFTER_SL_POLICY={AFTER_SL_POLICY}, COOLDOWN={COOLDOWN_SEC_AFTER_SL}s")
    print("==============================================")

    trader = PacificaSignalTrader(bybit, owner, agent)
    trader.run()

if __name__ == "__main__":
    main()


[START] Pacifica VWAP×Residual Trader
 Exec=SOL (Pacifica), SignalAlt=SOLUSDT (Bybit), SignalBTC=BTCUSDT (Bybit)
 COEFF_WINDOW=15, MAD=False, K=2.0, PERSIST=1
 VWAP(short/medium)=(10,12), DirFilter=True
 NOTIONAL_USD=500.0, SL=5.00%, Fee(1way est.)=0.00040
 AFTER_SL_POLICY=WAIT_CROSS, COOLDOWN=30s
[bind] 200
[positions] no position for SOL
[2025-10-01 20:05:00] side=- price=219.820000 (resid_z=-0.228, vwap_s=219.916553, vwap_m=219.851439)


KeyboardInterrupt: 