In [2]:
import ccxt
import pandas as pd
import numpy as np

# 初始化交易所
exchange = ccxt.binance({
    'rateLimit': 1200,
    'enableRateLimit': True,
    'proxies': {
        'http': 'http://127.0.0.1:7890',
        'https': 'http://127.0.0.1:7890',
    },
})

# 获取OHLCV数据
def fetch_ohlcv(symbol, timeframe):
    ohlcv = exchange.fetch_ohlcv(symbol, timeframe)
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    return df

# 计算EMA
def calculate_ema(prices, period):
    return prices.ewm(span=period, adjust=False).mean()

# 检查EMA多头排列
def check_ema_alignment(df):
    ema20 = calculate_ema(df['close'], 20)
    ema60 = calculate_ema(df['close'], 60)
    ema120 = calculate_ema(df['close'], 120)
    return (ema20[-1] > ema60[-1]) and (ema60[-1] > ema120[-1])

# 定义楔形模式检测函数
def detect_wedge(df, lookback=20):
    highs = df['high'][-lookback:]
    lows = df['low'][-lookback:]
    
    # 使用线性回归找到高点和低点的趋势线
    high_trend = np.polyfit(range(lookback), highs, 1)
    low_trend = np.polyfit(range(lookback), lows, 1)
    
    # 检查趋势线是否正在汇聚（斜率的乘积为负）
    converging = high_trend[0] * low_trend[0] < 0
    
    # 检查价格是否在趋势线之间
    in_wedge = lows.iloc[-1] > (low_trend[0] * lookback + low_trend[1]) and highs.iloc[-1] < (high_trend[0] * lookback + high_trend[1])
    
    return converging and in_wedge

# 定义楔形突破检测函数
def detect_wedge_breakout(df, lookback=20):
    if not detect_wedge(df, lookback):
        return False
    
    # 获取楔形的高点和低点趋势线
    highs = df['high'][-lookback:]
    lows = df['low'][-lookback:]
    high_trend = np.polyfit(range(lookback), highs, 1)
    low_trend = np.polyfit(range(lookback), lows, 1)
    
    # 检查最新的收盘价是否突破了楔形的高点趋势线
    breakout = df['close'].iloc[-1] > (high_trend[0] * lookback + high_trend[1])
    
    return breakout

# 回测函数
def backtest(symbol, timeframe, initial_balance, risk_per_trade):
    df = fetch_ohlcv(symbol, timeframe)
    balance = initial_balance
    entry_price = 0
    stop_loss = 0
    take_profit = 0
    position_size = 0
    trade_history = []

    for i in range(20, len(df)):
        current_df = df.iloc[i-20:i+1]
        if check_ema_alignment(current_df) and detect_wedge(current_df) and detect_wedge_breakout(current_df):
            if position_size == 0:  # 如果当前没有仓位
                entry_price = current_df['open'].iloc[-1]
                stop_loss = current_df['low'].min() - current_df['low'].min() * 0.0001
                stop_loss_distance = entry_price - stop_loss
                position_size = (balance * risk_per_trade) / stop_loss_distance
                balance -= position_size * entry_price  # 假设买入时立即扣除资金
                take_profit = current_df['high'].max()
                trade_history.append(('buy', entry_price, position_size))
        if position_size > 0:  # 如果有仓位，检查是否触发止损或止盈
            if df['low'].iloc[i] <= stop_loss or df['high'].iloc[i] >= take_profit:
                exit_price = stop_loss if df['low'].iloc[i] <= stop_loss else take_profit
                balance += position_size * exit_price
                trade_history.append(('sell', exit_price, position_size))
                position_size = 0  # 清空仓位

    return balance, trade_history

# 运行回测
final_balance, trade_history = backtest('BTC/USDT', '5m', 10000, 0.02)
print(f"Final balance: {final_balance}")
for trade in trade_history:
    print(trade)



Final balance: 10000


  return (ema20[-1] > ema60[-1]) and (ema60[-1] > ema120[-1])
