In [15]:
#思路,每次只找服务器要一个合约的k线数据,并同时测试不同策略，使用不同账户进行控制。
%config DisplayFormatter.truncate_bytes = 10000


In [None]:

import json
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, TqSim, TqMultiAccount
from datetime import date
from tqsdk.tafunc import ma,ema
from tqsdk.exceptions import BacktestFinished
import math


# strategy_name_list[0]= {'X_MA': 'MA', 'short': 5, 'long': 15}
def calculate_vwma(data, window_size):
    """
    计算成交量加权移动平均线（VWMA）
    :param data: 包含日期、收盘价和成交量的数据框
    :param window_size: 计算VWMA的窗口大小
    :return: 包含原始数据和VWMA的数据框
    """
    # 确保数据框中的价格和成交量列是数值类型
    data = data.copy()
    data['close'] = pd.to_numeric(data['close'])
    data['volume'] = pd.to_numeric(data['volume'])
    # 计算价格与成交量的乘积
    data['价格_成交量乘积'] = data['close'] * data['volume']
    # 计算VWMA
    data['VWMA'] = data['价格_成交量乘积'].rolling(window=window_size).sum() / data['volume'].rolling(window=window_size).sum()
    return data["VWMA"]

def craet_run_strategy(klines,strategy_params,target_pos_i):
    
    strategy_type = strategy_params["X_MA"]
    strategy_short = strategy_params["short"]
    strategy_long = strategy_params["long"]
    strategy_target_pos = target_pos_i
    if strategy_type == "MA":
        short_avg = ma(klines["close"], strategy_short)  # 短周期
        long_avg = ma(klines["close"],  strategy_long)  # 长周期
    elif strategy_type == "EMA":
        short_avg = ema(klines["close"], strategy_short)  # 短周期
        long_avg = ema(klines["close"],  strategy_long)  # 长周期
    elif strategy_type == "VWMA":
        short_avg = calculate_vwma(klines, strategy_short)  # 短周期
        long_avg = calculate_vwma(klines,  strategy_long)  # 长周期
    elif strategy_type == "MA+kσ":
        short_avg = ma(klines["close"], strategy_short)  # 短周期
        long_avg_top= ma(klines["close"],  strategy_long) + klines["close"].rolling(window=strategy_long).std()*0.2  # 长周期上轨道
        long_avg_bottom = ma(klines["close"],  strategy_long,) -  klines["close"].rolling(window=strategy_long).std()*0.2 # 长周期下轨道
        # 均线下穿，做空
        if long_avg_bottom.iloc[-3] < short_avg.iloc[-3] and long_avg_bottom.iloc[-2] > short_avg.iloc[-2]:
            strategy_target_pos.set_target_volume(-10)
        # 均线上穿，做多
        if short_avg.iloc[-3] < long_avg_top.iloc[-3] and short_avg.iloc[-2] > long_avg_top.iloc[-2]:
            strategy_target_pos.set_target_volume(10) 
        return
    else:
        print("X_MA参数输入错误")
        return 

    # 上穿，做多
    if long_avg.iloc[-3] > short_avg.iloc[-3] and long_avg.iloc[-2] < short_avg.iloc[-2]:
        strategy_target_pos.set_target_volume(10)
    # 下穿，做空
    if short_avg.iloc[-3] > long_avg.iloc[-3] and short_avg.iloc[-2] < long_avg.iloc[-2]:
       strategy_target_pos.set_target_volume(-10)
    return 
def create_backtest_record(R):
  data = {
      "策略-参数1":R["param_1"],
      "策略-参数2":R["param_2"],
      "策略-参数3":R["param_3"],
      "标的":R["标的"],
      "开始时间":R["start_date"],
      "结束时间":R["end_date"],

      "保证金" : R["保证金"],
      "使用金额" : max(R["保证金"]*10*2,R["report"]["max_drawdown"]*R["report"]["start_balance"]*2),
      "起始金额" : R["report"]["start_balance"],
      "结束金额" : R["report"]["end_balance"],
      "最大回撤" : R["report"]["max_drawdown"]*R["report"]["start_balance"]/(R["保证金"]*10*2)*100,
      "盈亏比" : R["report"]["profit_loss_ratio"],
      "胜率" : R["report"]["winning_rate"],
      "收益率" : (R["report"]["end_balance"]-R["report"]["start_balance"])/max(R["保证金"]*10*2,R["report"]["max_drawdown"]*R["report"]["start_balance"]*2)*100

  }
  return pd.DataFrame(data,index=[0])
def create_params(symbol,strategy_name_list):
    #测试标的
    symbol,start_date,end_date = symbol["symbol"],symbol["start_date"],symbol["end_date"]
    y,m,d = start_date.split("-")
    y,m,d = int(y),int(m),int(d)
    e_y,e_m,e_d = end_date.split("-")
    e_y,e_m,e_d = int(e_y),int(e_m),int(e_d)
    #开始和结束时间
    start_date = date(y,m,d)
    end_date = date(e_y,e_m,e_d)
   
    #按照策略总数量，生成模拟账号
    acc_list = [TqSim(10000000) for _ in range(len(strategy_name_list))]

    #最大的长周期，用于确认订阅的K线长度

    long = max([int(strategy_name_list[i]["long"]) for i in range(len(strategy_name_list))])
    

    return symbol,start_date,end_date,acc_list,long



#第一部分获取标的
symbols_pool_path = r"C:/Users/zhaoc/OneDrive/LH2024/HL2024-flask/策略研究/双均线策略/Symbols_and_Strategy/20250319标的池.json"
strategy_pool_path = r"C:/Users/zhaoc/OneDrive/LH2024/HL2024-flask/策略研究/双均线策略/Symbols_and_Strategy/策略池.json"
user_path = r"C:/Users/zhaoc/OneDrive/LH2024/HL2024-flask/策略研究/双均线策略/Symbols_and_Strategy/账户.json"
backtest_record_path = r"C:/Users/zhaoc/OneDrive/LH2024/HL2024-flask/策略研究/双均线策略/策略回测结果{}.csv".format(date.today().strftime("%Y%m%d"))

# 第二部分获取账户信息
with open(user_path,'r',encoding='utf8')as fp:
    json_data = json.load(fp)
    user = json_data['TQ_acc']

with open(symbols_pool_path,'r',encoding='utf8')as fp:
    json_data = json.load(fp)
    symbols = list(json_data['回测池——20250320'].values())

with open(strategy_pool_path,'r',encoding='utf8')as fp:
    json_data = json.load(fp)
    strategy_list = json_data['双均线策略-均线类型']
    param_list = json_data['双均线策略-均线周期']
    strategy_name_list = []
    for strategy in list(strategy_list.values()):
        for param in list(param_list.values()):
            strategy_name = {"X_MA": strategy,"short": param[0],"long": param[1]}
            strategy_name_list.append(strategy_name)





#需要的标的

result_dict_list = []
for symbol in symbols[0:1]:
    symbol,start_date,end_date,acc_list,long = create_params(symbol,strategy_name_list)
    api = TqApi(account=TqMultiAccount(acc_list),backtest=TqBacktest(start_dt=start_date, end_dt=end_date),auth=TqAuth(user["username"], user["password"]))
    quate = api.get_quote(symbol)
    margin = quate.margin
    klines = api.get_kline_serial(symbol, duration_seconds=60*60*24, data_length=long+3)
    target_pos_list = [TargetPosTask(api, symbol,account=acc_list[i]) for i in range(len(acc_list))]
    print("开始运行{}的回测".format(symbol))
    try:
        while True:
            api.wait_update()
            if api.is_changing(klines.iloc[-1], "datetime"):
                for i in range(len(strategy_name_list)):
                    craet_run_strategy(klines,strategy_name_list[i],target_pos_list[i])

    #strategy_name_list[0]= {'X_MA': 'MA', 'short': 5, 'long': 15}
    except BacktestFinished:
        print("回测结束")
        api.close()
        
        for r in range(len(acc_list)):
            result_dict = {"param_1":strategy_name_list[r]["X_MA"],
                                "param_2":str(strategy_name_list[r]["short"]),
                                "param_3":str(strategy_name_list[r]["long"]),
                                "标的":symbol,
                                "保证金" : margin,
                                "start_date":start_date,"end_date":end_date,"report":acc_list[r].tqsdk_stat}
            result_dict_list.append(create_backtest_record(result_dict))

backtest_record_df = pd.concat(result_dict_list)
backtest_record_df.to_csv(backtest_record_path,index=False)

开始运行SHFE.au2402的回测
    INFO - 模拟交易下单 TQSIM, PYSDK_target_8883cc8139c90ce63e3d3ca27716c32c: 时间: 2023-01-30 09:00:00.000000, 合约: SHFE.au2402, 开平: OPEN, 方向: BUY, 手数: 10, 价格: 420.09999999999997
    INFO - 模拟交易委托单 TQSIM, PYSDK_target_8883cc8139c90ce63e3d3ca27716c32c: 全部成交
    INFO - 模拟交易下单 TQSIM, PYSDK_target_120f4d5b5fd8d53477e979721255c439: 时间: 2023-01-30 09:00:00.000000, 合约: SHFE.au2402, 开平: OPEN, 方向: BUY, 手数: 10, 价格: 420.09999999999997
    INFO - 模拟交易委托单 TQSIM, PYSDK_target_120f4d5b5fd8d53477e979721255c439: 全部成交
    INFO - 模拟交易下单 TQSIM, PYSDK_target_84063ac747d935a362297ce43a48f947: 时间: 2023-01-30 09:00:00.000000, 合约: SHFE.au2402, 开平: OPEN, 方向: BUY, 手数: 10, 价格: 420.09999999999997
    INFO - 模拟交易委托单 TQSIM, PYSDK_target_84063ac747d935a362297ce43a48f947: 全部成交
    INFO - 模拟交易下单 TQSIM, PYSDK_target_e0094a5fca1937c113f5ed85e660aba3: 时间: 2023-01-30 09:00:00.000000, 合约: SHFE.au2402, 开平: OPEN, 方向: BUY, 手数: 10, 价格: 420.09999999999997
    INFO - 模拟交易委托单 TQSIM, PYSDK_target_e0094a5fca1937c113f5ed85

ValueError: If using all scalar values, you must pass an index

In [38]:
def create_backtest_record(R):
  data = {
      "策略-参数1":R["param_1"],
      "策略-参数2":R["param_2"],
      "策略-参数3":R["param_3"],
      "标的":R["标的"],
      "开始时间":R["start_date"],
      "结束时间":R["end_date"],

      "保证金" : R["保证金"],
      "使用金额" : max(R["保证金"]*10*2,R["report"]["max_drawdown"]*R["report"]["start_balance"]*2),
      "起始金额" : R["report"]["start_balance"],
      "结束金额" : R["report"]["end_balance"],
      "最大回撤" : R["report"]["max_drawdown"]*R["report"]["start_balance"]/(R["保证金"]*10*2)*100,
      "盈亏比" : R["report"]["profit_loss_ratio"],
      "胜率" : R["report"]["winning_rate"],
      "收益率" : (R["report"]["end_balance"]-R["report"]["start_balance"])/max(R["保证金"]*10*2,R["report"]["max_drawdown"]*R["report"]["start_balance"]*2)*100

  }
  return pd.DataFrame(data,index=[0])
for r in range(len(acc_list)):
    result_dict = {"param_1":strategy_name_list[r]["X_MA"],
                        "param_2":str(strategy_name_list[r]["short"]),
                        "param_3":str(strategy_name_list[r]["long"]),
                        "标的":symbol,
                        "保证金" : margin,
                        "start_date":start_date,"end_date":end_date,"report":acc_list[r].tqsdk_stat}
    result_dict_list.append(create_backtest_record(result_dict))

In [39]:
backtest_record_df = pd.concat(result_dict_list)
backtest_record_df.to_csv(backtest_record_path,index=False)

In [2]:
#!/usr/bin/python3

import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, delay):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.delay = delay
    def run(self):
        print ("开始线程：" + self.name)
        print_time(self.name, self.delay, 5)
        print ("退出线程：" + self.name)

def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            threadName.exit()
        time.sleep(delay)
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1

# 创建新线程
thread_list = []
for i in range(1, 3):
    thread = myThread(i, "Thread-{}".format(i), i)  
    thread.start()
    thread_list.append(thread)

for t in thread_list:
    t.join()

print ("退出主线程")



开始线程：Thread-1
开始线程：Thread-2
Thread-1: Mon Mar 31 01:01:25 2025
Thread-2: Mon Mar 31 01:01:26 2025
Thread-1: Mon Mar 31 01:01:26 2025
Thread-1: Mon Mar 31 01:01:27 2025
Thread-2: Mon Mar 31 01:01:28 2025
Thread-1: Mon Mar 31 01:01:28 2025
Thread-1: Mon Mar 31 01:01:29 2025
退出线程：Thread-1
Thread-2: Mon Mar 31 01:01:30 2025
Thread-2: Mon Mar 31 01:01:32 2025
Thread-2: Mon Mar 31 01:01:34 2025
退出线程：Thread-2
退出主线程


In [3]:
start_backtest_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("开始回测时间:{}".format(start_backtest_time))

开始回测时间:2025-03-31 02:51:32
