<a href="https://colab.research.google.com/github/Terry-usdc/binance-futures-alert/blob/main/%E6%96%B0%E5%B9%A3%E7%AD%96%E7%95%A5_%E4%B8%8A%E5%89%8D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install vectorbt>log.txt

In [None]:
# @title 讀取幣安事件寬表
import os
import pandas as pd

BASE_EVENT_DIR = "/content/drive/MyDrive/cryptodata/新幣資料/分類(幣安)/分類事件_USDT"

# 儲存所有 event df：event_name → DataFrame
all_event_dfs = {}

print("===== 開始讀取全部事件檔案 =====\n")

for class_folder in sorted(os.listdir(BASE_EVENT_DIR)):
    class_dir = os.path.join(BASE_EVENT_DIR, class_folder)
    if not os.path.isdir(class_dir):
        continue

    print(f"--- 分類：{class_folder} ---")

    for fn in sorted(os.listdir(class_dir)):
        if not fn.endswith(".csv"):
            continue

        event_path = os.path.join(class_dir, fn)

        # 讀 event_df
        df = pd.read_csv(event_path, index_col="open_time_iso")
        df.index = pd.to_datetime(df.index, errors="coerce", utc=True)
        df = df.sort_index()

        event_name = fn.replace(".csv", "")   # 去掉 .csv
        all_event_dfs[event_name] = df

        # ===== 檢查資訊 =====
        print(f"\n  [事件檔] {fn}")
        print(f"    shape = {df.shape}")
        print(f"    時間範圍：{df.index.min()} → {df.index.max()}")

        # 每個 symbol 的 True 數（理論上應該 0 或 1）
        event_count = df.sum(axis=0)

        # 顯示前幾個，避免太長
        #print("    每個 symbol 的事件 True 數量（前 10 個）：")
        #print(event_count.head(10))
        #print()

print("\n===== 讀取完成！ =====")
print(f"總共讀到 {len(all_event_dfs)} 個 event_df")



===== 開始讀取全部事件檔案 =====

--- 分類：分類_先上合約_USDT ---

  [事件檔] event_fut__fut_first__1m.csv
    shape = (2389, 51)
    時間範圍：2023-04-28 12:00:00+00:00 → 2025-10-29 11:19:00+00:00

  [事件檔] event_spot__fut_first__1m.csv
    shape = (2150, 51)
    時間範圍：2023-09-22 08:00:00+00:00 → 2025-11-06 10:49:00+00:00
--- 分類：分類_先上現貨_USDT ---

  [事件檔] event_fut__spot_first__1m.csv
    shape = (9830, 205)
    時間範圍：2019-09-08 17:57:00+00:00 → 2025-07-23 12:34:00+00:00

  [事件檔] event_spot__spot_first__1m.csv
    shape = (9550, 205)
    時間範圍：2017-08-17 04:00:00+00:00 → 2025-02-28 10:49:00+00:00
--- 分類：分類_只有合約_USDT ---

  [事件檔] event_fut__only_fut__1m.csv
    shape = (7450, 168)
    時間範圍：2020-02-03 08:03:00+00:00 → 2025-11-12 13:04:00+00:00
--- 分類：分類_只有現貨_USDT ---

  [事件檔] event_spot__only_spot__1m.csv
    shape = (3250, 67)
    時間範圍：2018-05-31 09:32:00+00:00 → 2025-09-09 12:49:00+00:00
--- 分類：分類_同時上架_USDT ---

  [事件檔] event_simul__1m.csv
    shape = (5650, 117)
    時間範圍：2020-07-20 12:00:00+00:00 → 2025-11-11 14:4

In [None]:
!pip install pyfolio-reloaded>log.txt

In [None]:
# @title 把gate開高收低價格資料拼成寬表
import os
import glob
from pathlib import Path
import pandas as pd

# ===== Gate 資料夾路徑 =====
GATE_DIR_BINANCE_FUT = "/content/drive/MyDrive/cryptodata/新幣資料/gateio合約_幣安合約/1m"
GATE_DIR_BINANCE_SPOT = "/content/drive/MyDrive/cryptodata/新幣資料/gateio合約_幣安現貨/1m"

GATE_FIELDS = ["open", "high", "low", "close"]

def parse_contract_from_filename(path: str) -> str | None:
    """
    檔名格式：{contract}_around_{YYYY-mm-ddTHH-MM-SSZ}_pm{n}_1m.csv
    e.g. BTC_USDT_around_2025-11-12T02-30-00Z_pm180_1m.csv
    回傳 contract：例如 'BTC_USDT'
    """
    stem = Path(path).stem
    if "_around_" not in stem:
        return None
    contract, _ = stem.split("_around_", 1)
    return contract

def build_gate_ohlc_from_dir(data_dir: str) -> dict[str, pd.DataFrame]:
    """
    掃描 data_dir 下所有 Gate 視窗 CSV，拼出 OHLC 寬表：
      {
        "open":  open_df,
        "high":  high_df,
        "low":   low_df,
        "close": close_df,
      }
    index = UTC DatetimeIndex
    columns = Gate 合約名稱 (如 'PEPE_USDT')
    """
    field_to_symbol_series: dict[str, dict[str, pd.Series]] = {f: {} for f in GATE_FIELDS}

    csv_files = sorted(glob.glob(os.path.join(data_dir, "*.csv")))
    print(f"[INFO] 在 {data_dir} 發現 {len(csv_files)} 個 Gate CSV 檔案")

    for i, fp in enumerate(csv_files, start=1):
        fname = Path(fp).name
        contract = parse_contract_from_filename(fp)
        if contract is None:
            print(f"[SKIP] 檔名無法解析 contract：{fname}")
            continue

        # 讀取 CSV
        try:
            df = pd.read_csv(fp)
        except Exception as e:
            print(f"[SKIP] 讀取失敗 {fname}：{e}")
            continue

        # 找時間欄位：優先 'time'，沒有則用第一欄
        if "time" in df.columns:
            time_col = "time"
        else:
            time_col = df.columns[0]

        idx = pd.to_datetime(df[time_col], errors="coerce", utc=True)
        mask = ~idx.isna()
        idx = idx[mask]

        # 逐欄處理 OHLC
        for fld in GATE_FIELDS:
            if fld not in df.columns:
                continue

            values = pd.to_numeric(df[fld], errors="coerce")
            values = values[mask]

            s = pd.Series(values.values, index=idx, name=contract)
            s = s[~s.index.duplicated(keep="last")].sort_index()
            if s.empty:
                continue

            if contract in field_to_symbol_series[fld]:
                old_s = field_to_symbol_series[fld][contract]
                combined = pd.concat([old_s, s]).sort_index()
                combined = combined[~combined.index.duplicated(keep="last")]
                field_to_symbol_series[fld][contract] = combined
            else:
                field_to_symbol_series[fld][contract] = s

        if i % 50 == 0 or i == len(csv_files):
            print(f"  處理進度：{i}/{len(csv_files)}")

    result: dict[str, pd.DataFrame] = {}
    for fld, sym_map in field_to_symbol_series.items():
        if not sym_map:
            print(f"[WARN] 欄位 {fld} 沒有任何資料")
            continue

        merged_df = pd.DataFrame.from_dict(sym_map, orient="columns")
        merged_df.index = pd.to_datetime(merged_df.index, errors="coerce", utc=True)
        merged_df = merged_df.sort_index().astype(float)

        result[fld] = merged_df
        print(f"[OK] 完成欄位 {fld}：shape={merged_df.shape}")

    return result

# 實際組 Gate 價格表（幣安合約 / 幣安現貨兩側）
gate_ohlc_fut_side = build_gate_ohlc_from_dir(GATE_DIR_BINANCE_FUT)
gate_ohlc_spot_side = build_gate_ohlc_from_dir(GATE_DIR_BINANCE_SPOT)

gate_open_fut  = gate_ohlc_fut_side.get("open")
gate_high_fut  = gate_ohlc_fut_side.get("high")
gate_low_fut   = gate_ohlc_fut_side.get("low")
gate_close_fut = gate_ohlc_fut_side.get("close")

gate_open_spot  = gate_ohlc_spot_side.get("open")
gate_high_spot  = gate_ohlc_spot_side.get("high")
gate_low_spot   = gate_ohlc_spot_side.get("low")
gate_close_spot = gate_ohlc_spot_side.get("close")

print("\n[SUMMARY] Gate×幣安合約 close_df shape:", None if gate_close_fut is None else gate_close_fut.shape)
print("[SUMMARY] Gate×幣安現貨 close_df shape:", None if gate_close_spot is None else gate_close_spot.shape)

[INFO] 在 /content/drive/MyDrive/cryptodata/新幣資料/gateio合約_幣安合約/1m 發現 433 個 Gate CSV 檔案
  處理進度：50/433
  處理進度：100/433
  處理進度：150/433
  處理進度：200/433
  處理進度：250/433
  處理進度：300/433
  處理進度：350/433
  處理進度：400/433
  處理進度：433/433
[OK] 完成欄位 open：shape=(127395, 433)
[OK] 完成欄位 high：shape=(127395, 433)
[OK] 完成欄位 low：shape=(127395, 433)
[OK] 完成欄位 close：shape=(127395, 433)
[INFO] 在 /content/drive/MyDrive/cryptodata/新幣資料/gateio合約_幣安現貨/1m 發現 192 個 Gate CSV 檔案
  處理進度：50/192
  處理進度：100/192
  處理進度：150/192
  處理進度：192/192
[OK] 完成欄位 open：shape=(63413, 192)
[OK] 完成欄位 high：shape=(63413, 192)
[OK] 完成欄位 low：shape=(63413, 192)
[OK] 完成欄位 close：shape=(63413, 192)

[SUMMARY] Gate×幣安合約 close_df shape: (127395, 433)
[SUMMARY] Gate×幣安現貨 close_df shape: (63413, 192)


In [None]:
# @title 回測代碼

import empyrical as ep
import pyfolio as pf  # 之後你可以自己呼叫 create_full_tear_sheet
import pandas as pd
import numpy as np
from typing import Literal, Dict, Tuple, Optional


def run_backtest_strategy(
    open_: pd.DataFrame,
    high_: pd.DataFrame,
    low_: pd.DataFrame,
    close_: pd.DataFrame,
    # --- 外部提供的訊號 ---
    entries: Optional[pd.DataFrame] = None,
    exits: Optional[pd.DataFrame] = None,
    short_entries: Optional[pd.DataFrame] = None,
    short_exits: Optional[pd.DataFrame] = None,
    # --- 其餘參數 ---
    strategy_type: Optional[Literal["long", "short"]] = None,  # 可省略，將自動推斷
    fees: float = 0.0004,
    slippage: float = 0.0000,
    cash: float = 100_000,
    freq: str = "1m",
    tp_stop: float | None = 0.05,
    sl_stop: float | None = 0.02,
    max_alloc: float = 0.25,
    per_bar_budget: float | None = None,
):
    """
    回測主框架（精簡版）

    回傳：
      pf_vbt     : vectorbt.Portfolio 物件（可以用來看每筆交易）
      pf_returns : 給 pyfolio 用的「日報酬」Series
      summary    : 指標總表（Total/Annual Return, MDD, Sortino, Trades）
    """
    # ---------- 1) 判斷方向 & 整理訊號 ----------
    def _is_df_nonempty(df):
        return (df is not None) and isinstance(df, pd.DataFrame) and (df.size > 0)

    long_ok = _is_df_nonempty(entries) or _is_df_nonempty(exits)
    short_ok = _is_df_nonempty(short_entries) or _is_df_nonempty(short_exits)

    if strategy_type is None:
        if long_ok and not short_ok:
            strategy_type = "long"
        elif short_ok and not long_ok:
            strategy_type = "short"
        else:
            raise ValueError("無法自動判斷策略方向：請提供單一方向的訊號，或明確指定 strategy_type。")
    else:
        if strategy_type == "long" and not long_ok:
            raise ValueError("strategy_type='long' 但未提供 entries/exits。")
        if strategy_type == "short" and not short_ok:
            raise ValueError("strategy_type='short' 但未提供 short_entries/short_exits。")

    idx, cols = close_.index, close_.columns

    def _ensure_bool_df(df):
        if df is None:
            return pd.DataFrame(False, index=idx, columns=cols)
        return df.reindex(index=idx, columns=cols).fillna(False).astype(bool)
        #讓訊號的index跟column都對齊價格df
        #但是如果本來訊號的df其index就不連續，那讓他shift會有問題，還是要在函數外處理

    if strategy_type == "long":
        entries = _ensure_bool_df(entries)
        exits = _ensure_bool_df(exits)
        short_entries = None
        short_exits = None
        signal_matrix = entries
    else:
        short_entries = _ensure_bool_df(short_entries)
        short_exits = _ensure_bool_df(short_exits)
        entries = None
        exits = None
        signal_matrix = short_entries

    # ---------- 2) 每根 K 的下單金額 ----------
    if per_bar_budget is None:
        per_bar_budget = float(cash)

    n_signals = signal_matrix.sum(axis=1)
    per_order_value = per_bar_budget / n_signals.replace(0, np.nan)
    per_order_value = per_order_value.fillna(0.0)

    size_value = pd.DataFrame(
        np.where(signal_matrix.values, per_order_value.values[:, None], 0.0),
        index=signal_matrix.index,
        columns=signal_matrix.columns,
    )

    cap = cash * max_alloc
    size_value = size_value.clip(upper=cap)

    # ---------- 3) 用 vectorbt 跑回測 ----------
    pf_vbt = vbt.Portfolio.from_signals(
        open=open_,
        high=high_,
        low=low_,
        close=close_,
        entries=entries,
        exits=exits,
        short_entries=short_entries,
        short_exits=short_exits,
        size=size_value,
        size_type="value",
        fees=fees,
        slippage=slippage,
        init_cash=cash,
        cash_sharing=True,   # 一套錢
        freq=freq,
        tp_stop=tp_stop,
        sl_stop=sl_stop,
    )

    # ---------- 4) 從權益曲線推「日報酬」給 pyfolio/empyrical ----------
    equity = pd.Series(pf_vbt.value()).dropna()
    '''
    vectorbt 的 Portfolio 所有時間序列，都直接沿用 close_ 的 index，不管有沒有交易都會有值
    包括：

    pf_vbt.value()

    pf_vbt.cash()

    pf_vbt.position_size()

    pf_vbt.trades.records

    pf_vbt.max_drawdown()

    那因為我的close不是每天都有資料，所以equity.resample("1D").last()會有很多nan.dropna()後會變短，算年化報酬ep.annual_return(pf_returns)他是看我有有幾天的資料來算年化
    '''
    if equity.empty:
        pf_returns = pd.Series(dtype=float)
    else:
        # 先壓到「每日最後一筆」，再算日報酬
        daily_equity = equity.resample("1D").last().dropna()
        pf_returns = daily_equity.pct_change().dropna()

    # ---------- 5) 用 empyrical 算指標 ----------
    if pf_returns.empty:
        total_ret = np.nan
        ann_ret = np.nan
        mdd = np.nan
        sortino = np.nan
    else:
        # 累積報酬（從 0 → 0.5 表示 +50%）
        total_ret = float(ep.cum_returns_final(pf_returns))
        ann_ret = float(ep.annual_return(pf_returns))
        mdd = float(ep.max_drawdown(pf_returns))          # 負的，例如 -0.35 表示 -35%
        sortino = float(ep.sortino_ratio(pf_returns))     # 以 0 為門檻

    # 交易次數（全部 symbol 加總）
    trade_count = int(pf_vbt.trades.count().sum())

    trade_records = pf_vbt.trades.records_readable
    wins = (trade_records["Return"] > 0).sum()
    win_rate = float(wins / len(trade_records))

    summary = pd.Series(
        {
            "Strategy Type": strategy_type,
            "Total Return %": total_ret * 100 if not np.isnan(total_ret) else np.nan,
            "Annual Return %": ann_ret * 100 if not np.isnan(ann_ret) else np.nan,
            "Max Drawdown %": mdd * 100 if not np.isnan(mdd) else np.nan,
            "Win Rate %": win_rate * 100 if not np.isnan(win_rate) else np.nan,
            "Sortino Ratio": sortino,
            "Trades": trade_count,
            "Per-Bar Budget": per_bar_budget,
            "Max Alloc": max_alloc,
            "TP": tp_stop,
            "SL": sl_stop,
        }
    )

    return pf_vbt, pf_returns, summary




In [None]:
# @title 看事件前後的趨勢
import pandas as pd
import numpy as np
from typing import Literal, Dict, Tuple

def event_window_stats(
    price_df: pd.DataFrame,
    event_df: pd.DataFrame,
    n: int = 5,
    include: Literal["rel_price", "daily_return", "both"] = "both",
    dropna_event: bool = True,   # 若事件日價為 NaN 是否丟棄該事件(對 rel_price)
    q_low: float = 0.20,         # rel_price 的下分位數 (可調)
    q_high: float = 0.80,        # rel_price 的上分位數 (可調)
) -> Tuple[Dict[str, pd.DataFrame], Dict[str, Dict[str, pd.Series]]]:
    """
    對 event_df==True 的每個事件抽取固定 offset 視窗 [-n..+n]。
    超界的 offset 位置以 NaN 表示（不往右/左補）。

    回傳兩種對齊矩陣與其統計（依 include 決定）：
      - rel_price:   (price / price_event_day) - 1，其中 base 為「事件日」價格（固定）
      - daily_return: 每日單期報酬（pct_change）

    新增統計：
      - 對 rel_price 另外計算上下分位數：q_low（預設0.2）與 q_high（預設0.8）

    參數：
      q_low, q_high 僅作用於 rel_price 的列向統計（跨事件/欄位的分位數）。
      需滿足 0 < q_low < q_high < 1。

    回傳:
      aligned: {
        "rel_price":    DataFrame(index=offsets, columns=MultiIndex(symbol,event_time)) (若包含)
        "daily_return": DataFrame(index=offsets, columns=MultiIndex(symbol,event_time)) (若包含)
      }
      stats: 同上鍵，值為 {"mean":Series, "median":Series, "count":Series, 以及 rel_price 專屬的 "q_low","q_high"}
    """
    if not (0.0 < q_low < q_high < 1.0):
        raise ValueError("q_low 與 q_high 需滿足 0 < q_low < q_high < 1")

    # 1) 對齊 event_df 形狀
    event_df = event_df.reindex_like(price_df)
    idx = price_df.index
    cols = price_df.columns
    offsets = np.arange(-n, n + 1)

    # 2) 預先計算每日報酬
    daily_ret_full = price_df.pct_change()

    # 3) 暫存容器
    frames_map: Dict[str, list] = {"rel_price": [], "daily_return": []}
    col_keys: list[tuple] = []

    # 4) 逐 symbol、逐事件
    for sym in cols:
        ev_mask = event_df[sym].fillna(False).to_numpy()
        if not ev_mask.any():
            continue
        ev_positions = np.flatnonzero(ev_mask)  # 事件在整體 idx 的位置(int)

        for pos in ev_positions:
            # 建立固定 offset 的空 series（先全 NaN）
            rel_series = pd.Series(np.nan, index=offsets) if include in ("rel_price", "both") else None
            dr_series  = pd.Series(np.nan, index=offsets) if include in ("daily_return", "both") else None

            # 視窗邊界：left_bound..right_bound
            left_bound  = max(-n, -pos)                    # 不能小於資料開頭
            right_bound = min(n, (len(idx) - 1) - pos)     # 不能超過資料結尾

            if left_bound <= right_bound:
                real_range = np.arange(pos + left_bound, pos + right_bound + 1)
                win_idx = idx[real_range]

                price_win = price_df[sym].reindex(win_idx)
                dr_win = daily_ret_full[sym].reindex(win_idx)

                # ---- rel_price ---- (固定以事件日為基準)
                if include in ("rel_price", "both"):
                    # 事件日本身是否在範圍內（理論上一定在，因為 0 ∈ [left_bound, right_bound]，除非資料缺失）
                    if 0 < left_bound or 0 > right_bound:    #這個條件我覺得一定不會發生，但是留著也無所謂
                        base_price = np.nan
                    else:
                        # 直接拿事件日價格（較直觀，也避免切片定位錯誤）
                        base_price = price_df.at[idx[pos], sym] #.at類似loc，但是只能取單值，不過速度更快

                    # 若事件日價為 NaN 且 dropna_event=True，則整個事件在 rel_price 留空（全 NaN）
                    if dropna_event and pd.isna(base_price):
                        # 跳過這個事件，不填任何資料
                        pass
                    else:
                        # 不管 base_price 是不是 NaN，這樣寫都安全
                        rel_values = (price_win / base_price) -1
                        rel_series.loc[left_bound:right_bound] = rel_values.to_numpy()

                # ---- daily_return ----
                if include in ("daily_return", "both"):
                    dr_series.loc[left_bound:right_bound] = dr_win.to_numpy()

            # 記錄欄位 key（即使全 NaN 也保留，方便兩種度量同形）
            col_keys.append((sym, idx[pos]))

            # 收集
            if include in ("rel_price", "both"):
                frames_map["rel_price"].append(rel_series)
            if include in ("daily_return", "both"):
                frames_map["daily_return"].append(dr_series)

    # 5) 組裝輸出
    aligned: Dict[str, pd.DataFrame] = {}
    stats: Dict[str, Dict[str, pd.Series]] = {}

    def _build_df_and_stats(key: str):
        if len(frames_map[key]) == 0: #這邊為0代表frames_map的rel_price或日報酬完全沒被append，就是完全沒事件發生
            raise ValueError(f"【錯誤】'{key}' 無資料，應無事件可供計算。")

        df = pd.concat(frames_map[key], axis=1)
        df.columns = pd.MultiIndex.from_tuples(col_keys, names=["symbol", "event_time"])

        s = {
            "mean":   df.mean(axis=1, skipna=True),
            "median": df.median(axis=1, skipna=True),
            "count":  df.count(axis=1),
        }
        if key == "rel_price":
            # 列向量的分位數（跨事件）
            s["q_low"]  = df.quantile(q_low,  axis=1, interpolation="linear")
            s["q_high"] = df.quantile(q_high, axis=1, interpolation="linear")

        return df, s

    if include in ("rel_price", "both"):
        aligned["rel_price"], stats["rel_price"] = _build_df_and_stats("rel_price")

    if include in ("daily_return", "both"):
        aligned["daily_return"], stats["daily_return"] = _build_df_and_stats("daily_return")

    return aligned, stats

In [None]:
all_event_dfs.keys()

dict_keys(['event_fut__fut_first__1m', 'event_spot__fut_first__1m', 'event_fut__spot_first__1m', 'event_spot__spot_first__1m', 'event_fut__only_fut__1m', 'event_spot__only_spot__1m', 'event_simul__1m'])

In [None]:
event_df_corrected = all_event_dfs['event_fut__only_fut__1m'].copy()
event_df_corrected.columns = [col.replace('USDT', '_USDT') if col.endswith('USDT') else col for col in event_df_corrected.columns]
#事件跟價格的column格式差一個_

# Combine indices from event_df_corrected and gate_close_fut
combined_index = event_df_corrected.index.union(gate_close_fut.index) #這邊注意這次是跟誰在做

#這邊是因為，事件df的index如果跟close的不同，那shift的時候就會跳到非預期的地方，close的價格是事件前後，讓他reindex讓兩邊統一才不會出問題



event_df_corrected = event_df_corrected.reindex(combined_index, fill_value=False)
#這感覺要檢視一下看前後檔案趨勢的函數會不會受影響

In [None]:
import vectorbt as vbt

TP = 0.3
SL = 0.1


#cond2= close.pct_change()>0.2

# 2) 將 OHLC + 訊號 丟進重構後的 run_backtest_strategy
pf_vbt, pf_returns, summary = run_backtest_strategy(
    open_=gate_open_fut , high_=gate_high_fut, low_=gate_low_fut , close_=gate_close_fut,
    # 只要放你要的方向那組；這裡是放空
    short_entries=None,
    short_exits=None,
    entries=event_df_corrected.shift(-1),
    exits=event_df_corrected.shift(10),
    strategy_type="long",      # 也可省略，函式會依訊號自動判斷
    fees=0.0004,
    slippage=0.0000,
    cash=100_000,
    freq='1m',
    tp_stop=TP,
    sl_stop=SL,
    max_alloc=0.25,
    per_bar_budget=None,        # 預設=整包資金

)

print("摘要：")
print(summary)

  return df.reindex(index=idx, columns=cols).fillna(False).astype(bool)
  return df.reindex(index=idx, columns=cols).fillna(False).astype(bool)


摘要：
Strategy Type           long
Total Return %     72.015425
Annual Return %    50.384451
Max Drawdown %    -10.077319
Win Rate %         53.703704
Sortino Ratio       6.175557
Trades                   108
Per-Bar Budget      100000.0
Max Alloc               0.25
TP                       0.3
SL                       0.1
dtype: object


In [None]:
pf_vbt, pf_returns, summary = run_backtest_strategy(
    open_=gate_open_fut , high_=gate_high_fut, low_=gate_low_fut , close_=gate_close_fut,
    # 只要放你要的方向那組；這裡是放空
    short_entries=event_df_corrected.shift(10),
    short_exits=event_df_corrected.shift(120),
    entries=None,
    exits=None,
    strategy_type="short",      # 也可省略，函式會依訊號自動判斷
    fees=0.0004,
    slippage=0.0000,
    cash=100_000,
    freq='1m',
    tp_stop=TP,
    sl_stop=SL,
    max_alloc=0.25,
    per_bar_budget=None,        # 預設=整包資金

)

print("摘要：")
print(summary)

In [None]:
import pandas as pd
import itertools
import gc   # 垃圾回收機制
import os   # 檔案操作
from tqdm import tqdm

# 請務必加入這行，避免 Pandas 跳警告
pd.set_option('future.no_silent_downcasting', True)

def scan_all_params(shift_range, tp_range, sl_range, event_df, ohlc_data, fees=0.0004, csv_filename="grid_search_results.csv"):
    """
    全能參數掃描 (省記憶體版)：
    每跑完一次回測，就將結果寫入 CSV 並釋放記憶體。
    """

    # 1. 產生參數組合
    param_combinations = list(itertools.product(shift_range, tp_range, sl_range))
    print(f"即將執行 {len(param_combinations)} 次回測組合...")
    print(f"結果將即時儲存至: {csv_filename}")

    # 如果檔案已存在，先刪除舊的，避免混到舊資料
    if os.path.exists(csv_filename):
        os.remove(csv_filename)

    # 2. 開始迴圈
    for shift_val, tp_val, sl_val in tqdm(param_combinations, desc="參數優化中"):
        try:
            # --- A. 執行回測 ---
            current_exits = event_df.shift(shift_val)

            pf_vbt, pf_returns, summary = run_backtest_strategy(
                open_=ohlc_data['open'],
                high_=ohlc_data['high'],
                low_=ohlc_data['low'],
                close_=ohlc_data['close'],
                short_entries=None,
                short_exits=None,
                entries=event_df.shift(-1),
                exits=current_exits,
                strategy_type="long",
                fees=fees,
                slippage=0.0000,
                cash=100_000,
                freq='1m',
                tp_stop=tp_val,
                sl_stop=sl_val,
                max_alloc=0.25,
                per_bar_budget=None
            )

            # --- B. 整理單筆結果 ---
            # 注意：這裡要確保從 summary 取出來的是純數字 (float/int)，不要存物件
            record = {
                'Shift': shift_val,
                'TP': tp_val,
                'SL': sl_val,
                'Total Return %': float(summary['Total Return %']),
                'Win Rate %': float(summary['Win Rate %']),
                'Sortino Ratio': float(summary['Sortino Ratio']),
                'Max Drawdown %': float(summary['Max Drawdown %']),
                'Trades': int(summary['Trades'])
            }

            # --- C. 即時寫入 CSV ---
            df_single_row = pd.DataFrame([record])

            # 如果檔案不存在(第一筆)，就寫入 Header；否則不寫 Header
            use_header = not os.path.exists(csv_filename)
            df_single_row.to_csv(csv_filename, mode='a', header=use_header, index=False)

            # --- D. 關鍵：釋放記憶體 ---
            # 1. 刪除大物件
            del pf_vbt
            del pf_returns
            del summary
            del current_exits
            del df_single_row

            # 2. 強制垃圾回收 (這行最重要)
            gc.collect()

        except Exception as e:
            # 捕捉錯誤但不中斷迴圈
            print(f"組合 [Shift={shift_val}, TP={tp_val}, SL={sl_val}] 發生錯誤: {e}")
            # 發生錯誤時也要嘗試清理，以免殘留
            gc.collect()

    # 3. 全部跑完後，讀取 CSV 並排序回傳
    if os.path.exists(csv_filename):
        df_final = pd.read_csv(csv_filename)
        df_sorted = df_final.sort_values(by='Sortino Ratio', ascending=False).reset_index(drop=True)
        return df_sorted
    else:
        return pd.DataFrame()

# --- 使用方式不變 ---
# df_result = scan_all_params(shift_range=..., tp_range=..., sl_range=..., event_df=..., ohlc_data=...)

# --- 使用範例 ---

# 1. 準備數據
my_ohlc = {
    'open': gate_open_fut, 'high': gate_high_fut,
    'low': gate_low_fut, 'close': gate_close_fut
}

# 2. 設定你想測試的範圍 (可以用 list 或 range)
# 這裡示範：測試 Shift 5~15，TP 測試 0.2 和 0.3，SL 測試 0.05 和 0.1
shifts_to_test = range(10,11)  # 11 個值
tps_to_test =  np.arange(0.1, 0.5, 0.1)
sls_to_test =  np.arange(0.1, 0.3, 0.1)
# 總共會跑 11 * 3 * 2 = 66 次回測

# 3. 執行
df_grid_search = scan_all_params(
    shift_range=shifts_to_test,
    tp_range=tps_to_test,
    sl_range=sls_to_test,
    event_df=event_df_corrected,
    ohlc_data=my_ohlc
)

# 4. 看結果
print("\n--- 最佳參數組合 (Top 5) ---")
print(df_grid_search.head(5))


即將執行 8 次回測組合...
結果將即時儲存至: grid_search_results.csv


參數優化中: 100%|██████████| 8/8 [03:46<00:00, 28.29s/it]


--- 最佳參數組合 (Top 5) ---
   Shift   TP   SL  Total Return %  Win Rate %  Sortino Ratio  Max Drawdown %  \
0     10  0.3  0.1       72.015425   53.703704       6.175557      -10.077319   
1     10  0.3  0.2       74.615793   54.629630       5.999476       -8.490344   
2     10  0.4  0.1       66.779106   53.703704       5.753986      -10.077319   
3     10  0.4  0.2       69.379473   54.629630       5.590967       -8.490344   
4     10  0.2  0.2       69.388161   54.629630       5.590768       -8.556963   

   Trades  
0     108  
1     108  
2     108  
3     108  
4     108  





In [None]:
range(9,10)

range(9, 10)

In [None]:
my_ohlc.keys()

dict_keys(['open', 'high', 'low', 'close'])

In [None]:
import plotly.graph_objects as go


aligned, stats=event_window_stats(price_df=gate_close_fut ,event_df=event_df_corrected,n= 350)
# Get the series from the stats dictionary
mean_series = stats['rel_price']['mean']
median_series = stats['rel_price']['median']
q_low_series = stats['rel_price']['q_low']
q_high_series = stats['rel_price']['q_high']
count_series = stats['rel_price']['count']

# Get the total count of events
total_events = count_series.max() # Assuming count is the same for all offsets

# Create a DataFrame for plotting
plot_df = pd.DataFrame({
    'Mean': mean_series,
    'Median': median_series,
    'Q_low (20%)': q_low_series,
    'Q_high (80%)': q_high_series
})

# Create the figure
fig = go.Figure()

# Add lines for each series
fig.add_trace(go.Scattergl(x=plot_df.index, y=plot_df['Mean'], mode='lines', name='Mean'))
fig.add_trace(go.Scattergl(x=plot_df.index, y=plot_df['Median'], mode='lines', name='Median'))
fig.add_trace(go.Scattergl(x=plot_df.index, y=plot_df['Q_low (20%)'], mode='lines', name='Q_low (20%)'))
fig.add_trace(go.Scattergl(x=plot_df.index, y=plot_df['Q_high (80%)'], mode='lines', name='Q_high (80%)'))


# Update layout for title and labels
fig.update_layout(
    title=f'事件前後相對價格走勢 (共 {total_events} 筆事件)',
    xaxis_title='步數 (Steps)',
    yaxis_title='相對價格變化 (Relative Price Change)',
    hovermode='x unified' # Optional: show hover info for all traces at once
)

# Show the plot
fig.show()

  daily_ret_full = price_df.pct_change()
  ev_mask = event_df[sym].fillna(False).to_numpy()


summary

In [None]:
trades = pf_vbt.trades.records_readable

def show_page(df, page=1, page_size=50):
    start = (page - 1) * page_size
    end = start + page_size
    return df.iloc[start:end]

# 第 1 頁（前 50 筆）
show_page(trades, page=1, page_size=50)

Unnamed: 0,Exit Trade Id,Column,Size,Entry Timestamp,Avg Entry Price,Entry Fees,Exit Timestamp,Avg Exit Price,Exit Fees,PnL,Return,Direction,Status,Position Id
0,0,4_USDT,126320.0,2025-10-08 11:29:00+00:00,0.19791,10.0,2025-10-08 11:40:00+00:00,0.24097,12.175736,5417.165378,0.216687,Long,Closed,0
1,1,AERGO_USDT,63171.19,2025-04-16 10:59:00+00:00,0.39575,10.0,2025-04-16 11:00:00+00:00,0.356175,9.0,-2519.0,-0.10076,Long,Closed,1
2,2,AERO_USDT,13368.98,2024-12-04 14:14:00+00:00,1.87,10.0,2024-12-04 14:25:00+00:00,1.9172,10.252406,610.763636,0.024431,Long,Closed,2
3,3,AGT_USDT,858103.9,2025-05-20 10:29:00+00:00,0.029134,10.0,2025-05-20 10:40:00+00:00,0.031952,10.967255,2397.16963,0.095887,Long,Closed,3
4,4,AIN_USDT,279080.2,2025-07-10 10:44:00+00:00,0.08958,10.0,2025-07-10 10:55:00+00:00,0.09149,10.213217,512.829873,0.020513,Long,Closed,4
5,5,AIOT_USDT,120889.7,2025-04-30 12:29:00+00:00,0.2068,10.0,2025-04-30 12:40:00+00:00,0.255,12.330754,5804.555126,0.232182,Long,Closed,5
6,6,AIO_USDT,259282.3,2025-08-13 11:29:00+00:00,0.09642,10.0,2025-08-13 11:40:00+00:00,0.10295,10.677245,1672.436217,0.066897,Long,Closed,6
7,7,AKE_USDT,15542430.0,2025-09-26 12:29:00+00:00,0.001608,10.0,2025-09-26 12:30:00+00:00,0.001448,9.0,-2519.0,-0.10076,Long,Closed,7
8,8,AKT_USDT,6356.471,2024-11-18 11:29:00+00:00,3.933,10.0,2024-11-18 11:40:00+00:00,3.793,9.644038,-909.549962,-0.036382,Long,Closed,8
9,9,ALCH_USDT,128959.0,2025-01-07 11:44:00+00:00,0.19386,10.0,2025-01-07 11:55:00+00:00,0.20381,10.513257,1262.629217,0.050505,Long,Closed,9
