In [None]:
import pandas as pd
import talib as ta
import numpy as np


def preprocess_data(
    z_: pd.DataFrame,
    ret_periods: int = 24,
    vol_window: int = 24,
    atr_period: int = 14,
) -> pd.DataFrame:
    """数据预处理：计算收益率、波动率、ATR（小时级可调）"""
    z = z_.copy()
    z['ret'] = z['close'].pct_change().fillna(0)

    # 小时级：默认用近1天(24小时)的动量与波动率衡量信号强度
    z['rolling_ret'] = z['close'].pct_change(periods=ret_periods).fillna(0)
    z['rolling_vol'] = z['ret'].rolling(window=vol_window).std().fillna(1e-6)
    z['signal_strength'] = z['rolling_ret'] / z['rolling_vol']

    z['atr'] = ta.ATR(z['high'], z['low'], z['close'], timeperiod=atr_period)
    z['position'] = 0.0
    z['flag'] = 0.0
    return z


def run_strategy(
    z: pd.DataFrame,
    signal_threshold: float = 0.3,
    atr_stop_mult: float = 1.5,
    tp_drawdown_pct: float = 0.05,
    ret_periods: int = 24,
    vol_window: int = 24,
    atr_period: int = 14,
) -> tuple:
    """以小时为单位的策略：较低阈值与更紧风控"""
    Buy, Sell = [], []
    max_price = 0.0
    atr_entry = 0.0
    price_in = 0.0

    i_start = max(ret_periods, vol_window, atr_period)
    for i in range(i_start, len(z)):
        signal = z['signal_strength'].iloc[i]

        # 开仓：信号强度 > 阈值
        if z['position'].iloc[i - 1] == 0 and signal > signal_threshold:
            z.at[z.index[i], 'flag'] = 1
            z.at[z.index[i], 'position'] = 1
            price_in = z['close'].iloc[i]
            date_in = z.index[i]
            atr_entry = z['atr'].iloc[i]
            max_price = z['close'].iloc[i]
            Buy.append([date_in, price_in, f'开仓: signal={signal:.2f}, ATR={atr_entry:.2f}'])
            print(z.index[i], f'【开仓】信号={signal:.2f}，ATR={atr_entry:.2f}')

        # 有仓位：检查止损/止盈
        elif z['position'].iloc[i - 1] == 1:
            current_price = z['close'].iloc[i]
            max_price = max(max_price, current_price)
            floating_profit = (current_price - price_in) / price_in
            floating_drawdown = (max_price - current_price) / max_price if max_price != 0 else 0.0
            drawdown_atr = price_in - current_price

            # 止损：跌幅超过 ATR 倍数（更紧）
            if drawdown_atr > atr_stop_mult * atr_entry:
                z.at[z.index[i], 'flag'] = -1
                z.at[z.index[i], 'position'] = 0
                price_out = z['close'].iloc[i]
                date_out = z.index[i]
                Sell.append([date_out, price_out, f'止损: 跌幅={drawdown_atr:.2f} > {atr_stop_mult}ATR={atr_stop_mult*atr_entry:.2f}'])
                print(z.index[i], f'【止损】当前价格较开仓价下跌{drawdown_atr:.2f} > {atr_stop_mult}ATR')

            # 止盈：从最高点回撤超过阈值（更紧，如5%）
            elif floating_profit > 0 and floating_drawdown > tp_drawdown_pct:
                z.at[z.index[i], 'flag'] = -1
                z.at[z.index[i], 'position'] = 0
                price_out = z['close'].iloc[i]
                date_out = z.index[i]
                Sell.append([date_out, price_out, f'止盈: 回撤={floating_drawdown:.2%}'])
                print(z.index[i], f'【止盈】浮盈回撤={floating_drawdown:.2%} > {tp_drawdown_pct:.0%}')

            else:
                z.at[z.index[i], 'position'] = z['position'].iloc[i - 1]
                print(z.index[i], f'持仓中，当前浮盈={floating_profit:.2%}')

    # 交易记录整理
    p1 = pd.DataFrame(Buy, columns=['买入日期', '买入价格', '备注'])
    p2 = pd.DataFrame(Sell, columns=['卖出日期', '卖出价格', '备注'])
    transaction = pd.concat([p1, p2], axis=1)

    # 净值计算
    z['ret'] = z['close'].pct_change().fillna(0)
    z['nav'] = 1 + (z['ret'] * z['position']).cumsum()
    z['benchmark'] = z['close'] / z['close'].iloc[0]

    return z, transaction


In [None]:
import crypto_process
start_month = '2022-01'
end_month = '2023-03'
# 按小时回测
freq = '1h'

z_original = crypto_process.load_data(start_month, end_month)
z_resampled = crypto_process.resample_data(z_original, freq)

# 小时级预处理参数可调（默认 24 小时窗口）
z = preprocess_data(z_resampled, ret_periods=24, vol_window=24, atr_period=14)

# 运行策略：小时级阈值更紧
params = dict(signal_threshold=0.3, atr_stop_mult=1.5, tp_drawdown_pct=0.05,
              ret_periods=24, vol_window=24, atr_period=14)
data_price, transaction = run_strategy(z, **params) 

In [None]:
data_price.nav.plot()


In [None]:
def calculate_performance_metrics(data_price: pd.DataFrame, transactions: pd.DataFrame) -> pd.DataFrame:
    """计算绩效指标（小时年化）"""
    N = 24 * 365  # 小时级年化因子
    rf = 0.02  # 年化无风险收益率

    # 年化收益率（按小时净值序列年化）
    rety = data_price.nav.iloc[-1] ** (N / data_price.shape[0]) - 1

    # 夏普比率（小时收益×仓位）
    strategy_returns = (data_price.ret * data_price.position).fillna(0)
    sharpe = (strategy_returns.mean() * N - rf) / (strategy_returns.std() * np.sqrt(N)) if strategy_returns.std() > 0 else np.nan

    # 胜率（仅在有成对买卖记录时有效）
    try:
        VictoryRatio = ((transactions['卖出价格'] - transactions['买入价格']) > 0).mean()
    except Exception:
        VictoryRatio = np.nan

    # 最大回撤
    DD = 1 - data_price.nav / data_price.nav.cummax()
    MDD = DD.max()

    # 月均交易次数（按小时：每月约 24*30 小时）
    trade_count = data_price.flag.abs().sum() / data_price.shape[0] * (24 * 30)

    result = {
        'Sharpe': sharpe,
        'Annual_Return': rety,
        'MDD': MDD,
        'Winning_Rate': VictoryRatio,
        'Trading_Num': round(trade_count, 1)
    }
    return pd.DataFrame(result, index=[0])


In [None]:
result = calculate_performance_metrics(data_price,transaction)

In [None]:
result

In [None]:
flag = z['flag']

In [None]:
price_ = z[['close','high','low','open']]

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(14,6))
ax.plot(z.index, z['close'], label='Close')

# 标记开仓和平仓点
ax.scatter(z[z['flag'] == 1].index, z[z['flag'] == 1]['close'], marker='^', color='red', label='Buy', s=100)
ax.scatter(z[z['flag'] == -1].index, z[z['flag'] == -1]['close'], marker='v', color='green', label='Sell', s=100)

ax.legend()
plt.title("trading signal")
plt.grid(True)
plt.show()


In [None]:
# 1.掌握事件触发回测框架
# 2.理解复利和单利的区别