# 策略回测代码
此 Notebook 仅保留策略回测部分的代码，已删除 heatmap 和策略优化的相关部分。

In [6]:
# 此处假设策略文件已存在，并且策略类名为 ConnorsReversal
from Test_1 import Test_1


ModuleNotFoundError: No module named 'Strategies'

In [2]:
import ccxt
import pandas as pd
from datetime import datetime
from backtesting import Backtest

def fetch_and_save_klines(symbol, timeframe, limit, filename):
    """
    使用 ccxt 的 binanceusdm 接口获取指定交易对的 k 线数据，
    并将数据保存到本地 csv 文件中。
    """
    # 创建 Binance USDM 实例
    exchange = ccxt.binanceusdm({
        'enableRateLimit': True,
        'proxies': {
            'http': 'http://127.0.0.1:1913',
            'https': 'http://127.0.0.1:1913',
        },
    })
    print("开始拉取数据……")
    # 获取最新的 k 线数据
    ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
    
    # 将数据转换为 DataFrame
    df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"])
    # 将 timestamp 转换为 datetime
    df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms")
    df.set_index("datetime", inplace=True)
    # 删除原 timestamp 列
    df.drop("timestamp", axis=1, inplace=True)
    
    # 将列名转换为大写，符合 backtesting.py 的要求
    df.rename(columns={
        "open": "Open",
        "high": "High",
        "low": "Low",
        "close": "Close",
        "volume": "Volume"
    }, inplace=True)
    
    # 保存到 CSV 文件
    df.to_csv(filename)
    print(f"数据已保存到 {filename}")
    return df

def annotate_signals(data, trades, strategy_name):
    """
    根据 backtesting 回测产生的交易记录，在原始 DataFrame 中增加 signal 列，
    对每个交易的入场和出场时刻分别标注对应信号（多个信号时用 ';' 隔开）。
    
    参数:
        data (pd.DataFrame): 包含 k 线数据的 DataFrame，其索引为 datetime
        trades (list): 回测返回的交易记录列表，每个 trade 应有 entryTime, exitTime, direction 属性
        strategy_name (str): 策略名称，用于拼接信号文本
    返回:
        pd.DataFrame: 增加 signal 列后的 DataFrame
    """
    # 新增 signal 列，默认空字符串
    data["signal"] = ""
    
    # 遍历回测中的每笔交易并标记信号
    for trade in trades:
        # trade.direction 假定为 "long" 或 "short"
        if trade.direction == "long":
            signal_enter = f"{strategy_name} enter_long"
            signal_exit  = f"{strategy_name} exit_long"
        elif trade.direction == "short":
            signal_enter = f"{strategy_name} enter_short"
            signal_exit  = f"{strategy_name} exit_short"
        else:
            continue

        # 对入场时间标注信号
        entry_time = pd.to_datetime(trade.entryTime)
        try:
            # 找到最接近的 k 线时间点
            pos_entry = data.index.get_indexer([entry_time], method="nearest")
            nearest_entry = data.index[pos_entry[0]]
            # 如果已有信号则追加，否则赋值
            if data.loc[nearest_entry, "signal"]:
                data.loc[nearest_entry, "signal"] += ";" + signal_enter
            else:
                data.loc[nearest_entry, "signal"] = signal_enter
        except Exception as e:
            print(f"未能匹配入场时间 {entry_time}: {e}")

        # 对出场时间标注信号
        exit_time = pd.to_datetime(trade.exitTime)
        try:
            pos_exit = data.index.get_indexer([exit_time], method="nearest")
            nearest_exit = data.index[pos_exit[0]]
            if data.loc[nearest_exit, "signal"]:
                data.loc[nearest_exit, "signal"] += ";" + signal_exit
            else:
                data.loc[nearest_exit, "signal"] = signal_exit
        except Exception as e:
            print(f"未能匹配出场时间 {exit_time}: {e}")
    return data

In [3]:
import pandas as pd

def execute_signal(file_path: str):
    """
    执行信号功能：
    读取指定交易对的历史记录CSV文件，检查最新K线数据的 signal 列中是否存在信号。
    如果存在多个信号（以分号分隔），则逐个处理，确保多个策略产生的信号都能执行。
    
    参数:
         file_path (str): 历史记录CSV文件路径，例如 "KASUSDT_30m_with_signals.csv"
    """
    try:
        # 读取 CSV 文件时指定第一列为索引，且将索引解析为日期类型
        df = pd.read_csv(file_path, index_col=0, parse_dates=True)
    except Exception as e:
        print(f"读取文件失败: {e}")
        return
    
    if df.empty:
        print("文件无数据")
        return
    
    # 获取最新的K线数据（最后一行）
    latest_row = df.iloc[-1]
    signals = latest_row.get("signal", "")
    
    if isinstance(signals, str) and signals.strip():
        # 如果 signal 字段中有多个信号，则以分号分隔
        signals_list = [s.strip() for s in signals.split(';') if s.strip()]
        for sig in signals_list:
            # 这里可以根据实际的信号内容调用不同的执行逻辑
            print(f"执行信号: {sig}")
            # 根据不同信号执行对应的下单或平仓操作
            if "enter_long" in sig:
                print("执行多头开仓操作")
                # 调用多头开仓的执行函数，例如：execute_long_entry()
            elif "exit_long" in sig:
                print("执行多头平仓操作")
                # 调用多头平仓的执行函数，例如：execute_long_exit()
            elif "enter_short" in sig:
                print("执行空头开仓操作")
                # 调用空头开仓的执行函数，例如：execute_short_entry()
            elif "exit_short" in sig:
                print("执行空头平仓操作")
                # 调用空头平仓的执行函数，例如：execute_short_exit()
            else:
                print(f"未知信号: {sig}")
    else:
        print("最新K线未产生任何信号")


In [4]:
def main():
    # 参数设置
    symbol = "1000PEPEUSDT"           # 根据需要替换交易对
    timeframe = "1m"
    limit = 500                  # 获取最新 500 条数据
    strategy = Test_1            # 策略设置（将策略作为参数传入）
    strategy_name = "Test_1"     # 策略名称
    csv_filename = f"binance_{symbol}_{timeframe}.csv"  # 使用 f-string 格式化文件名

    # 1. 通过 ccxt 获取 k 线数据并存储到本地文件
    kline_data = fetch_and_save_klines(symbol, timeframe, limit, csv_filename)
    
    # 2. 将获取的数据输入给 backtesting 进行回测
    # 初始化回测实例
    bt = Backtest(
        kline_data,
        strategy,
        commission=0.0004,   # 手续费
        margin=1,            # 杠杆倍率，此处假设 1 代表 8 倍杠杆，具体依据 backtesting 库解释
        trade_on_close=True, # 与 TradingView 行为一致
        exclusive_orders=True,
        hedging=False        # 禁止对冲
    )
    
    # 根据 Test_1 策略说明，策略逻辑仅依赖于当前与上一根K线的价格变化，不需要额外的参数设置
    bt.run()
    
    # 3. 根据回测产生的交易记录记录每个 datetime 是否产生过信号
    # 此处使用 bt.trades，部分 backtesting 版本可能需使用 bt._strategy.trades
    try:
        trades = bt.trades
    except AttributeError:
        trades = bt._strategy.trades

    annotated_data = annotate_signals(kline_data.copy(), trades, strategy_name)
    
    # 4. 保存带有信号记录的数据到本地新的 CSV 表格，并根据前面设置的参数构建文件名
    annotated_csv_filename = f"binance_{symbol}_{timeframe}_with_signals.csv"
    annotated_data.to_csv(annotated_csv_filename)
    print(f"带有信号记录的 k 线数据已保存到 {annotated_csv_filename}")
    
    # 每个策略运行完之后，检查最新的信号并执行对应操作
    execute_signal(annotated_csv_filename)


In [None]:

if __name__ == "__main__":
    main()

开始拉取数据……
数据已保存到 binance_1000PEPEUSDT_1m.csv


TypeError: Can't instantiate abstract class Test_1 with abstract method init