In [3]:
import datetime
import pandas as pd
import okx.MarketData as MarketData

def fetch_kline_bt(days, bar, instId, flag="0"):
    """
    获取最近 days 天的K线数据，并转换为 backtrader 兼容的 pandas DataFrame

    参数:
        days (int): 需要的数据天数（例如最近7天）
        bar (str): 时间粒度，如 "1m", "5m", "1H" 等（注意：1s仅支持最近3个月数据）
        instId (str): 产品ID，如 "BTC-USDT"
        flag (str): 实盘："0"，模拟盘："1"

    返回:
        df (DataFrame): 适用于 backtrader 的 DataFrame，包含字段 ['datetime', 'open', 'high', 'low', 'close', 'volume']
                        且 datetime 已设置为索引，数值列已转换为 float 类型
    """
    # 实例化市场数据API对象
    marketDataAPI = MarketData.MarketAPI(flag=flag)
    
    # 当前时间和起始时间（毫秒）
    now = datetime.datetime.now()
    now_ms = int(now.timestamp() * 1000)
    start_ms = now_ms - days * 24 * 3600 * 1000
    
    all_data = []  # 用于存放所有K线数据
    pagination_ts = None  # 分页参数，初始无值表示获取最新100条数据
    
    while True:
        # 构造请求参数
        params = {
            "instId": instId,
            "bar": bar,
            "limit": "100"  # 每次最多返回100条数据
        }
        if pagination_ts is not None:
            params["after"] = pagination_ts
        
        # 调用接口获取历史K线数据
        result = marketDataAPI.get_history_candlesticks(**params)
        
        if result.get("code") != "0":
            print("Error:", result.get("msg"))
            break
        
        data = result.get("data", [])
        if not data:
            break
        
        all_data.extend(data)
        
        # 获取最旧的时间戳
        oldest_ts = int(data[-1][0])
        if oldest_ts <= start_ms:
            break
        
        # 更新分页参数，继续获取更早的数据
        pagination_ts = data[-1][0]
    
    if not all_data:
        print("未获取到数据")
        return pd.DataFrame()
    
    # 定义列名，确保数据顺序正确
    columns = ["datetime", "open", "high", "low", "close", "volume", "volCcy", "volCcyQuote", "confirm"]
    df = pd.DataFrame(all_data, columns=columns)
    
    # 转换 datetime 列为 pandas datetime，并设置为索引
    df["datetime"] = pd.to_datetime(pd.to_numeric(df["datetime"]), unit="ms")
    df.set_index("datetime", inplace=True)

    # 只保留 backtrader 需要的列，并转换数值类型
    df = df[["open", "high", "low", "close", "volume"]]
    df = df.astype(float)  # 确保所有列都是 float 类型
    
    # 过滤出最近 days 天的数据，确保时间范围正确
    df = df[df.index >= pd.to_datetime(start_ms, unit="ms")]
    
    return df


In [4]:
df = fetch_kline_bt(days=7, bar="5m", instId="BTC-USDT")
print(df.head())

                        open     high      low    close     volume
datetime                                                          
2025-03-19 04:45:00  82971.1  83036.0  82952.5  82952.5  19.013236
2025-03-19 04:40:00  82956.3  83000.0  82944.5  82971.2  10.715554
2025-03-19 04:35:00  82830.8  82956.4  82830.8  82956.4  18.960714
2025-03-19 04:30:00  82821.9  82842.5  82786.9  82830.8   5.328271
2025-03-19 04:25:00  82906.0  82923.6  82818.1  82822.0   4.597650


In [21]:
class EnhancedStrategy(bt.Strategy):
    params = (
        ('ma_period', 5),
        ('trade_ratio', 0.9)
    )

    def __init__(self):
        self.sma = bt.indicators.SMA(self.data.close, period=self.p.ma_period)
        self.order = None
        self.trade_count = 0

    def next(self):
        if len(self) < self.p.ma_period:
            return

        if self.order:  # 等待订单完成
            return

        # 多头逻辑
        if self.data.close[0] > self.sma[0]:
            if self.position.size <= 0:
                if self.position.size < 0:
                    self.close()
                self.buy()
        
        # 空头逻辑
        elif self.data.close[0] < self.sma[0]:
            if self.position.size >= 0:
                if self.position.size > 0:
                    self.close()
                self.sell()

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            return

        if order.status == order.Completed:
            self.trade_count += 1
            if order.isbuy():
                dir = '买入'
            else:
                dir = '卖出'
            
            print(f'{dir}执行 价格：{order.executed.price:.2f} 数量：{order.executed.size:.6f} 手续费：{order.executed.comm:.4f}')
            self.order = None

def run_backtest():
    cerebro = bt.Cerebro()
    
    # 获取并验证数据
    df = fetch_kline_bt(days=7, bar="5m", instId="BTC-USDT")
    print(f"数据量：{len(df)}条")
    print("最新3条数据：\n", df.tail(3))
    
    data = bt.feeds.PandasData(dataname=df, open=0, high=1, low=2, close=3, volume=4)
    cerebro.adddata(data)

    cerebro.addstrategy(EnhancedStrategy)
    cerebro.broker.setcash(100000)
    cerebro.broker.setcommission(commission=0.0002, commtype=bt.CommInfoBase.COMM_PERC)

    print('初始净值：%.2f' % cerebro.broker.getvalue())
    results = cerebro.run()
    print('最终净值：%.2f' % cerebro.broker.getvalue())
    
    # 分析结果
    strat = results[0]
    print('总交易次数：', strat.trade_count)
    cerebro.plot(style='candlestick')

if __name__ == '__main__':
    run_backtest()

数据量：2015条
最新3条数据：
                         open     high      low    close     volume
datetime                                                          
2025-03-19 05:15:00  82921.9  83015.7  82921.9  83003.3  15.437916
2025-03-19 05:20:00  83003.4  83038.0  82987.1  83038.0  25.807545
2025-03-19 05:25:00  83037.9  83112.8  83000.3  83038.1  31.827067
初始净值：100000.00
买入执行 价格：81976.60 数量：1.000000 手续费：16.3953
卖出执行 价格：81783.00 数量：-1.000000 手续费：16.3566
卖出执行 价格：81783.00 数量：-1.000000 手续费：16.3566
买入执行 价格：81688.50 数量：1.000000 手续费：16.3377
买入执行 价格：81688.50 数量：1.000000 手续费：16.3377
卖出执行 价格：81648.60 数量：-1.000000 手续费：16.3297
卖出执行 价格：81648.60 数量：-1.000000 手续费：16.3297
买入执行 价格：81477.70 数量：1.000000 手续费：16.2955
买入执行 价格：81477.70 数量：1.000000 手续费：16.2955
卖出执行 价格：81466.60 数量：-1.000000 手续费：16.2933
卖出执行 价格：81466.60 数量：-1.000000 手续费：16.2933
买入执行 价格：81530.30 数量：1.000000 手续费：16.3061
买入执行 价格：81530.30 数量：1.000000 手续费：16.3061
卖出执行 价格：82428.00 数量：-1.000000 手续费：16.4856
卖出执行 价格：82428.00 数量：-1.000000 手续费：16.4856
买入执行 价格：

<IPython.core.display.Javascript object>

In [25]:
import numpy as np
class BollingerStrategy:
    def __init__(self, 
                 bb_window=20,
                 bb_std_mult=3,
                 position_ratio=0.1,
                 leverage=10,
                 take_profit_ratio=0.01,
                 stop_loss_ratio=0.01):
        
        # 策略参数
        self.bb_window = bb_window
        self.bb_std_mult = bb_std_mult
        self.position_ratio = position_ratio
        self.leverage = leverage
        self.tp_ratio = take_profit_ratio
        self.sl_ratio = stop_loss_ratio
        
        # 数据缓存
        self.price_buffer = []
        self.indicators = {
            'ma': [],
            'upper': [],
            'lower': []
        }

    def next(self, price, balance):
        """
        输入最新价格数据并生成信号
        :param price: dict 包含'open','high','low','close'
        :param balance: 当前账户余额
        :return: dict 交易信号
        """
        self._update_indicators(price)
        
        if len(self.price_buffer) < self.bb_window:
            return {'signal': 0}
            
        signal_info = self._generate_signal(price, balance)
        return signal_info

    def _update_indicators(self, price):
        # 更新价格数据
        self.price_buffer.append(price['close'])
        
        # 维护固定长度窗口
        if len(self.price_buffer) > self.bb_window:
            self.price_buffer.pop(0)
        
        # 计算布林带
        if len(self.price_buffer) == self.bb_window:
            ma = np.mean(self.price_buffer)
            std = np.std(self.price_buffer)
            upper = ma + std * self.bb_std_mult
            lower = ma - std * self.bb_std_mult
        else:
            ma = upper = lower = None
            
        self.indicators['ma'].append(ma)
        self.indicators['upper'].append(upper)
        self.indicators['lower'].append(lower)

    def _generate_signal(self, price, balance):
        # 获取当前和前一根的指标值
        curr_upper = self.indicators['upper'][-1]
        curr_lower = self.indicators['lower'][-1]
        prev_upper = self.indicators['upper'][-2] if len(self.indicators['upper']) >=2 else None
        prev_lower = self.indicators['lower'][-2] if len(self.indicators['lower']) >=2 else None

        signal = 0
        # 做空信号
        if price['high'] >= curr_upper and price['high'] > prev_upper:
            signal = -1
            
        # 做多信号
        elif price['low'] <= curr_lower and price['low'] < prev_lower:
            signal = 1
            
        if signal == 0:
            return {'signal': 0}
        
        # 计算仓位
        position_value = balance * self.position_ratio * self.leverage
        position_size = position_value / price['close']
        
        # 计算止盈止损
        entry_price = price['close']
        if signal == 1:
            tp = entry_price * (1 + self.tp_ratio)
            sl = entry_price * (1 - self.sl_ratio)
        else:
            tp = entry_price * (1 - self.tp_ratio)
            sl = entry_price * (1 + self.sl_ratio)
            
        return {
            'signal': signal,
            'size': position_size,
            'tp': tp,
            'sl': sl,
            'entry_price': entry_price
        }
class Backtester:
    def __init__(self, data, strategy):
        self.data = data
        self.strategy = strategy
        self.equity = [10000]  # 初始资金
        self.position = {
            'size': 0,
            'entry_price': 0,
            'direction': 0,  # 1: 多 -1: 空
            'tp': 0,
            'sl': 0
        }

    def run_backtest(self):
        for i, row in self.data.iterrows():
            # 获取当前资金
            current_balance = self.equity[-1]
            
            # 生成信号（每根K线开始时）
            signal_info = self.strategy.next(
                price={
                    'open': row['open'],
                    'high': row['high'],
                    'low': row['low'],
                    'close': row['close']
                },
                balance=current_balance
            )
            
            # 检查止损止盈（每根K线开始时）
            if self.position['size'] > 0:
                self._check_exit_conditions(row)
            
            # 执行新信号（无持仓时）
            if signal_info['signal'] != 0 and self.position['size'] == 0:
                self._execute_trade(signal_info, row)
            
            # 更新权益曲线（每根K线结束时）
            self._update_equity(row)

    def _execute_trade(self, signal, row):
        self.position = {
            'size': signal['size'],
            'entry_price': signal['entry_price'],
            'direction': signal['signal'],
            'tp': signal['tp'],
            'sl': signal['sl']
        }

    def _check_exit_conditions(self, row):
        current_low = row['low']
        current_high = row['high']
        
        if self.position['direction'] == 1:  # 多单
            # 止损检查（当日最低价触发）
            if current_low <= self.position['sl']:
                self._close_position(self.position['sl'])
            # 止盈检查（当日最高价触发）
            elif current_high >= self.position['tp']:
                self._close_position(self.position['tp'])
                
        elif self.position['direction'] == -1:  # 空单
            if current_high >= self.position['sl']:
                self._close_position(self.position['sl'])
            elif current_low <= self.position['tp']:
                self._close_position(self.position['tp'])

    def _close_position(self, exit_price):
        # 计算盈亏
        if self.position['direction'] == 1:
            pnl = (exit_price - self.position['entry_price']) * self.position['size']
        else:
            pnl = (self.position['entry_price'] - exit_price) * self.position['size']
        
        # 更新权益
        self.equity[-1] += pnl
        # 重置持仓
        self.position = {'size':0, 'entry_price':0, 'direction':0, 'tp':0, 'sl':0}

    def _update_equity(self, row):
        # 计算浮动盈亏
        if self.position['size'] > 0:
            mark_price = row['close']
            if self.position['direction'] == 1:
                unrealized = (mark_price - self.position['entry_price']) * self.position['size']
            else:
                unrealized = (self.position['entry_price'] - mark_price) * self.position['size']
            self.equity.append(self.equity[-1] + unrealized)
        else:
            self.equity.append(self.equity[-1])
# 准备数据（示例）
import pandas as pd
data = df = fetch_kline_bt(days=7, bar="5m", instId="BTC-USDT")

# 初始化策略
strategy = BollingerStrategy(
    bb_window=20,
    bb_std_mult=2.5,
    position_ratio=0.2,
    leverage=10,
    take_profit_ratio=0.03,
    stop_loss_ratio=0.02
)

# 运行回测
backtester = Backtester(data, strategy)
backtester.run_backtest()

# 获取结果
print(f"最终资金：{backtester.equity[-1]:.2f}")


最终资金：-6685.93
