# 导入模块

In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyarrow.parquet as pq
from pyarrow import fs
from itertools import combinations
import multiprocessing
from multiprocessing import Manager, Pool
from tqdm import tqdm
import warnings
import seaborn as sns
import matplotlib.pyplot as plt
warnings.filterwarnings('ignore')
import re
from pathlib import Path
import datetime
warnings.filterwarnings("ignore", category=FutureWarning, message="The behavior of DataFrame concatenation")

In [2]:
import pandas as pd
import logging
from datetime import datetime
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, message="The behavior of DataFrame concatenation")

class Exchange:
    def __init__(self, context):
        """
        初始化Exchange类
        """
        self.context = context  # 引入Context对象以便管理账户资金和持仓
        self.current_price_ask_1 = {}
        self.current_price_bid_1 = {}  # 用于存储每个symbol的最新价格
        self.current_price = {}
        self.trade_logs=[]
        self.fee_rate = 0.000023
        self.fut_map = {
        1:200,
        2:300,
        3:300,
        4:200
        }

    def fut_bonus(self, symbol):

        symbol_id=symbol//10000
        bonus=self.fut_map.get(symbol_id,0)
        return bonus
                     
    def set_price(self, symbol, ask1, bid1, price):
        """
        设置symbol的当前价格
        """
        self.current_price_ask_1[symbol] = ask1
        self.current_price_bid_1[symbol] = bid1
        self.current_price[symbol] = price
        #logging.info(f"Set price for {symbol}: {price}")
        
    def set_time(self, time):
        self.time = time

    
    def check_order_book(self, symbol, askp, bidp, askv, bidv):
        order_pending = self.context.get_orders(symbol, status='pending')
        if not order_pending.empty:

            # 有正在待机的订单
            for _, row in order_pending.iterrows():#对于所有待机订单
                order_type = row['order_type']
                order_price = row['price']
                order_quantity = row['quantity']
                order_id = row['id']
                matched = False
                
                if order_type == 'openlong':
                    for i in range(1,6):  # 五档检查
                        if order_price >= askp[i] and order_quantity <= askv[i]:
                            self.open_position(symbol, order_quantity, askp[i], 'long')
                            self.context.update_order_status(order_id, 'filled')
                            askp[i] -= order_quantity
                            matched = True
                            break
                    if not matched:
                        self.context.update_order_status(order_id, 'canceled')

                elif order_type == 'openshort':
                    for i in range(1,6):
                        if order_price <= bidp[i] and order_quantity <= bidv[i]:
                            self.open_position(symbol, order_quantity, bidp[i], 'short')
                            self.context.update_order_status(order_id, 'filled')
                            bidv[i] -= order_quantity
                            matched = True
                            break
                    if not matched:
                        self.context.update_order_status(order_id, 'canceled')

                # 检查平多和平空订单是否能够成交
                elif order_type == 'closelong':  # 平多卖出
                    for i in range(1,6):
                        if order_price <= bidp[i] and order_quantity <= bidv[i]:
                            self.close_position(symbol, order_quantity, bidp[i], 'long')
                            self.context.update_order_status(order_id, 'filled')
                            bidv[i] -= order_quantity
                            matched = True
                            break
                    if not matched:
                        self.context.update_order_status(order_id, 'canceled')
                elif order_type == 'closeshort':  # 平空买入
                    for i in range(1,6):
                        if order_price >= askp[i] and order_quantity <= askv[i]:
                            self.close_position(symbol, order_quantity, askp[i], 'short')
                            self.context.update_order_status(order_id, 'filled')
                            askv[i] -= order_quantity
                            matched = True
                            break
                    if not matched:
                        self.context.update_order_status(order_id, 'canceled')
                
    def open_position(self, symbol, quantity, price, side):
            """
            开仓：处理买入或卖出开仓订单
            """
            bonus=self.fut_bonus(symbol)
            if symbol not in self.current_price:
                logging.warning(f"Cannot open position for {symbol}: price not set.")
                return
            cost = quantity * price*(1+self.fee_rate)
            # 判断是否有足够的资金开仓
            if cost > self.context.balance:
                logging.warning(f"Insufficient balance to open {side} position for {symbol}. Required: {cost}, Available: {self.context.balance}")
                return
            # 根据方向调整数量的正负，并记录仓位方向
            if side == "long":
                cost = quantity * price * bonus
                fee = cost * self.fee_rate
                # 开多仓
                quantity_signed = quantity   # 多头数量为正
                self.context.add_position(symbol, quantity_signed, price, "long")
                self.context.balance = self.context.balance - cost - fee
                self.context.long_position_value +=cost
                #symbol, price, pnl, equity, balance, order_type
                self.record_trade_log(symbol,
                                    price,
                                    -fee,
                                    self.context.long_position_value,
                                    self.context.short_position_value,
                                    self.context.balance,
                                    "openlong",
                                    self.time)
            elif side == "short":
                cost = quantity * price * bonus
                fee = cost * self.fee_rate
                # 开空仓
                quantity_signed = quantity  
                self.context.add_position(symbol, quantity_signed, price, "short")
                self.context.balance = self.context.balance + cost - fee
                self.context.short_position_value -=cost
                self.record_trade_log(symbol,
                                    price,
                                    -fee,
                                    self.context.long_position_value,
                                    self.context.short_position_value,
                                    self.context.balance,
                                    "openshort",
                                    self.time)

    def close_position(self, symbol, quantity, price, side):
        """
        平仓：处理指定 symbol 的 side 持仓，并更新账户资金
        """
        bonus = self.fut_bonus(symbol)
        if symbol not in self.current_price:
            logging.warning(f"Cannot close position for {symbol}: price not set.")
            return
        exit_price = price  # 获取当前的平仓价格
        # 获取指定 symbol 和 side 的持仓记录
        position = self.context.positions[(self.context.positions['symbol'] == symbol) & 
                                                (self.context.positions['side'] == side)]
        if position.empty:
            logging.warning(f"No {side} position found for {symbol}.")
            return
        # 遍历所有匹配的持仓记录并计算盈亏
        for _, row in position.iterrows():
            entry_price = row['entry_price']
            quantity = row['quantity']
            if side=='long': 
                pnl = (exit_price - entry_price) * quantity * bonus
                fee = exit_price * quantity * self.fee_rate * bonus
                self.context.pnl_history.append(pnl)
                self.context.balance = self.context.balance + (exit_price * quantity * bonus) - fee
                self.context.long_position_value -= (exit_price * quantity * bonus)
                self.record_trade_log(symbol,
                                    exit_price,
                                    pnl-fee,
                                    self.context.long_position_value,
                                    self.context.short_position_value,
                                    self.context.balance,
                                    "closelong",
                                    self.time)
            elif side=='short':
                fee = exit_price * quantity * self.fee_rate * bonus
                pnl = (entry_price - exit_price) * quantity * bonus
                self.context.pnl_history.append(pnl)
                self.context.balance = self.context.balance - (exit_price * quantity * bonus) - fee
                self.context.short_position_value = self.context.short_position_value + (exit_price * quantity * bonus)
                self.record_trade_log(symbol,
                                    exit_price,
                                    pnl-fee,
                                    self.context.long_position_value,
                                    self.context.short_position_value,
                                    self.context.balance,
                                    "closeshort",
                                    self.time)
        self.context.remove_position(symbol, side)

    def update_equity(self):
        """
        更新 equity，考虑未平仓持仓的浮动盈亏，仅计算持仓的浮动市值
        """
        # 初始化浮动权益
        long_position_value = 0
        short_position_value = 0
        position = self.context.positions
        
        # 如果没有持仓，直接将 equity 设置为 0
        if position.empty:
            self.context.long_position_value = 0
            self.context.short_position_value = 0
            return
        
        # 遍历持仓，计算每个持仓的浮动市值
        for _, pos in position.iterrows():
            entry_price = pos['entry_price']
            quantity = pos['quantity']
            side = pos['side']
            symbol = pos['symbol']
            # 检查当前价格是否存在
            if symbol in self.current_price:
                current_price = self.current_price[symbol]
                bonus = self.fut_bonus(symbol)
                # 根据仓位方向计算浮动权益
                if side == 'long':
                    long_position_value += current_price * quantity * bonus
                elif side == 'short':
                    short_position_value += - current_price* abs(quantity) * bonus
        
        # 更新 equity 为持仓的浮动市值
        self.context.long_position_value = long_position_value
        self.context.short_position_value = short_position_value

    def get_long_position_value(self):
        
        """
        获取当前账户的权益
        """
        return self.context.long_position_value
 
    def get_short_position_value(self):
        """
        获取当前账户的权益
        """
        return self.context.short_position_value

    def get_balance(self):
        """
        获取当前账户余额
        """
        return self.context.balance

    def get_symbol_positions(self,symbol,side):
        """
        返回当前所有持仓
        """
        position=self.context.positions
        position=position[(position['symbol']==symbol) & (position['side']==side)]
        return position
    
    def get_position(self):
        return self.context.positions
    
    def record_trade_log(self, symbol, price, pnl, long_position_value, short_position_value, balance, order_type, time):
        """
        记录每次交易的日志
        """
        log = {
            "symbol": symbol,
            "exc_price": price,
            "pnl": pnl,
            "long_position_value": long_position_value,
            "short_position_value": short_position_value,
            "balance":balance,
            "order_type": order_type,
            "time": time
        }
        self.trade_logs.append(log)   

class Context:

    def __init__(self, initial_balance):
        self.balance = initial_balance  # 初始账户余额
        self.equity = 0   # 初始账户权益
        self.long_position_value = 0 #多头持仓市值
        self.short_position_value = 0 #空头持仓市值
        self.order_book = pd.DataFrame(columns=['id', 'symbol', 'price', 'quantity', 'status', 'timestamp', 'order_type'])
        self.positions = pd.DataFrame(columns=['symbol', 'quantity', 'entry_price', 'side'])  # 添加'side'列
        self.order_id_counter = 0
        self.pnl_history = []

    def add_order(self, symbol, price, quantity, time_sec, order_type, status='pending'):

        order_id = self.order_id_counter
        self.order_id_counter += 1
        new_order = pd.DataFrame([[order_id, symbol, price,quantity, status, time_sec, order_type]], 
                                 columns=self.order_book.columns)
        self.order_book = pd.concat([self.order_book, new_order], ignore_index=True)

    def update_order_status(self, order_id, new_status):
        # 更新订单状态并记录日志
        self.order_book.loc[self.order_book['id'] == order_id, 'status'] = new_status

    def add_position(self, symbol, quantity, price, side): 
        """
        添加持仓记录
        """
        new_position = pd.DataFrame([[symbol, quantity, price, side]], columns=self.positions.columns)
        self.positions = pd.concat([self.positions, new_position], ignore_index=True)
        
    def remove_position(self, symbol, side):
        """
        移除仓位记录
        """
        self.positions = self.positions[~((self.positions['symbol'] == symbol) & (self.positions['side'] == side))]
     
    def get_open_positions(self):
        return self.positions
    
    def get_orders(self, symbol, status):
        # 获取所有当前订单
        return self.order_book[(self.order_book['symbol'] == symbol) & (self.order_book['status'] == status)]

In [3]:
def get_quantile(quantile1_dict, quantile99_dict, signal_id, date_list, data_type):
    root_path = f'/home/dja/data/signal/mxy_data/{data_type}/{signal_id}'
    tmp_list = []
    for date in date_list:
        path = root_path + '/' + date
        tmp = pd.read_parquet(path)
        tmp_list.append(tmp)
    data = pd.concat(tmp_list, axis=0)
    data['pv1'] = data['pv1'] + np.random.normal(0, 1e-10, size=len(data))
    quantile1_dict[signal_id] = data['pv1'].quantile(0.01).round(3)
    quantile99_dict[signal_id] = data['pv1'].quantile(0.99).round(3)

# 策略函数

In [None]:
# 创建 Context 和 Exchange 实例
def strategy(exchange, symbol, askp, bidp, askv, bidv, price, signal, time, quantile1, quantile99):
    openlong=quantile99
    openshort=quantile1
    #进入策略先检查order_book
    exchange.set_price(symbol, askp[1], bidp[1], price) 
    exchange.set_time(time)
    order_price_bonus=0.0001
    exchange.update_equity()
    exchange.check_order_book(symbol, askp, bidp, askv, bidv)
#add_order(self, symbol, price, quantity, time_sec, order_type, status='pending'):
    if signal > openlong :  # 做多信号
        if exchange.get_symbol_positions(symbol, "long").empty: #没有多头仓位
            
            if exchange.get_symbol_positions(symbol, "short").empty: #也没有空头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time, "openlong") #开一个多头就行
            else: #有空头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time,  "closeshort") #关闭一个空头
                exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time,  "openlong")   #再开一个多头

    elif signal < openshort:  # 做空信号
        if exchange.get_symbol_positions(symbol,"short").empty: #没有空头仓位
            if exchange.get_symbol_positions(symbol, "long").empty: #也没有多头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "openshort") #开一个空头就行
            else: #有空头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "closelong") #关闭一个多头
                exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "openshort")   #再开一个空头

# 线下数据

In [13]:
def get_pnl_for_date(date, c, quantile1_dict, quantile99_dict,  data_type):
    """
    计算特定日期的 PnL，并返回结果。
    """
    import warnings
    warnings.filterwarnings("ignore", category=FutureWarning)
    import pandas as pd
    from pyarrow import fs
    import pyarrow.parquet as pq
    from datetime import datetime

    # 初始化结果容器
    pnl_results = []
    
    #print(f'正在处理日期 {date}, 信号 {c}')
    
    # 初始化 contract_month
    contract_month = int(date[4:6])

    # 获取 data_3020 数据
    path = f'/home/dja/data/signal/mxy_data/{data_type}/{c}/{date}'
    data_3020 = pd.read_parquet(path)
    data_3020 = data_3020.sort_values(by=['time_sec', 'code']).reset_index(drop=True).round(3).rename(columns={'code': 'symbol'})
    data_3020['price'] = (data_3020['ask1'] + data_3020['bid1']) / 2
    data_3020 = data_3020[data_3020['symbol'] % 100 == contract_month].reset_index(drop=True)

    # 如果没有数据，切换到下一个合约
    if data_3020.empty:
        contract_month += 1
        path = f'/home/dja/data/signal/mxy_data/{data_type}/{c}/{date}'
        data_3020 = pd.read_parquet(path)
        data_3020 = data_3020.sort_values(by=['time_sec', 'code']).reset_index(drop=True).round(3).rename(columns={'code': 'symbol'})
        data_3020['price'] = (data_3020['ask1'] + data_3020['bid1']) / 2
        data_3020 = data_3020[data_3020['symbol'] % 100 == contract_month].reset_index(drop=True)
    data_3020 = data_3020[data_3020['time_sec'] < 14110]

    # 获取 depth_data 数据
    hdfs_fs = fs.HadoopFileSystem('hdfs://ftxz-hadoop', user='zli')
    depth_path = f'/user/zli/lyt/data/StockIndexFutures/DepthData/{date}'
    depth_data = pq.read_table(depth_path, filesystem=hdfs_fs).to_pandas()
    depth_data = depth_data[['symbol_id', 'delivery_code', 'time',
                             'asks1p', 'bids1p',
                             'asks1v', 'bids1v',
                             'asks2p', 'bids2p',
                             'asks2v', 'bids2v',
                             'asks3p', 'bids3p',
                             'asks3v', 'bids3v',
                             'asks4p', 'bids4p',
                             'asks4v', 'bids4v',
                             'asks5p', 'bids5p',
                             'asks5v', 'bids5v']]

    # 转换时间戳为标准格式
    def convert_ns_to_standard_time(ns):
        timestamp_s = ns / 1e9
        return datetime.fromtimestamp(timestamp_s).strftime('%H:%M:%S.%f')[:-5]

    depth_data['time'] = depth_data['time'].apply(convert_ns_to_standard_time)
    depth_data = depth_data[(depth_data['time'] < '15:00:00') & (depth_data['time'] > '09:30:00')]
    depth_data = depth_data[depth_data['delivery_code'] % 100 == contract_month].reset_index(drop=True)

    # 转换时间为秒
    def convet_time_to_sec(time_str, base_str="09:30:00.0"):
        time_obj = datetime.strptime(time_str, "%H:%M:%S.%f")
        base_obj = datetime.strptime(base_str, "%H:%M:%S.%f")
        seconds_diff = (time_obj - base_obj).total_seconds()
        return max(0, seconds_diff - 5400 if seconds_diff > 7200 else seconds_diff)

    depth_data['time'] = depth_data['time'].apply(convet_time_to_sec)

    # 映射符号
    FtMap1 = {3692: 'IC', 3693: 'IF', 3694: 'IH', 5756: 'IM'}
    fut_map = {'1': 'IC', '2': 'IF', '3': 'IH', '4': 'IM', 'IC': 1, 'IF': 2, 'IH': 3, 'IM': 4}
    depth_data['symbol_id'] = depth_data['symbol_id'].map(FtMap1).map(fut_map) * 10000 + depth_data['delivery_code'].astype(int)
    depth_data = depth_data[['symbol_id', 'time',
                             'asks1p', 'bids1p',
                             'asks1v', 'bids1v',
                             'asks2p', 'bids2p',
                             'asks2v', 'bids2v',
                             'asks3p', 'bids3p',
                             'asks3v', 'bids3v',
                             'asks4p', 'bids4p',
                             'asks4v', 'bids4v',
                             'asks5p', 'bids5p',
                             'asks5v', 'bids5v']]
    depth_data = depth_data.sort_values(by=['time', 'symbol_id']).reset_index(drop=True)

    # 合并 data_3020 和 depth_data
    data_3020['time_sec'] = data_3020['time_sec'].round(1)
    depth_data['time'] = depth_data['time'].round(1)
    data_3020 = pd.merge(data_3020, depth_data, how='left', left_on=['symbol', 'time_sec'], right_on=['symbol_id', 'time'])
    data_3020 = data_3020.drop(['ask1', 'bid1', 'symbol_id', 'time'], axis=1)

    # 模拟交易
    context = Context(initial_balance=10000000)
    exchange = Exchange(context)
    for _, row in data_3020.iterrows():
        symbol = row['symbol']
        price = row['price']
        time = row['time_sec']
        signal = row['pv1']
        askp = {i: row[f'asks{i}p'] for i in range(1, 6)}
        bidp = {i: row[f'bids{i}p'] for i in range(1, 6)}
        askv = {i: row[f'asks{i}v'] for i in range(1, 6)}
        bidv = {i: row[f'bids{i}v'] for i in range(1, 6)}
        strategy(exchange, symbol, askp, bidp, askv, bidv, price, signal, time, quantile1_dict[c], quantile99_dict[c])

    # 计算 PnL
    pnl_tmp = (abs(exchange.get_long_position_value()) + exchange.get_short_position_value() + exchange.get_balance()) - 10000000
    pnl_results.append((date, c, pnl_tmp, len(exchange.trade_logs)))

    return pnl_results

# 示例：外部调用
from multiprocessing import Pool

if __name__ == "__main__":
    data_type='offlinedata'
    quantile1_dict = {}
    quantile99_dict = {}
    del_date=['20240927','20241008','20240930','20241009']
    signal_list = [3232,9232,9278,9279,9280,9281]
    allsignaldate_ls = []
    for signal in signal_list:
        date_list_tmp = [file for file in os.listdir(f'/home/dja/data/signal/mxy_data/{data_type}/{signal}')]
        allsignaldate_ls.append(date_list_tmp)
    # 使用 set.intersection 动态计算交集
    date_all = sorted(set(allsignaldate_ls[0]).intersection(*allsignaldate_ls[1:]))
    date_all = [date for date in date_all if date not in del_date]
    date_list=date_all[-16:]
    for c in signal_list:
        print(f'正在获取信号{c}分位点')
        get_quantile(quantile1_dict, quantile99_dict, c, date_list, data_type)
    with Pool(processes=40) as pool:  # 启动 4 个进程
        args = [(date, c, quantile1_dict, quantile99_dict, data_type) for date in date_list for c in signal_list]
        results = pool.starmap(get_pnl_for_date, args)

正在获取信号3232分位点
正在获取信号9232分位点
正在获取信号9278分位点
正在获取信号9279分位点
正在获取信号9280分位点
正在获取信号9281分位点


In [None]:
quantile1_dict = {}
quantile99_dict = {}
signal_list=[3232,9232,9278,9279,9280,9281]
pnl_df=pd.DataFrame(index=date_list,columns=signal_list).fillna(0)
trade_count=pd.DataFrame(index=date_list,columns=signal_list).fillna(0)
def get_pnl(c):
    # 先获取这个信号的quantile99和quantile1
    import warnings
    warnings.filterwarnings("ignore", category=FutureWarning)
    get_quantile(quantile1_dict, quantile99_dict, c, date_list)
    pnl_results = []
    print(f'信号{c}')
    for date in date_list:
        print(date)
        contract_month=int(date[4:6])
        path = f'/home/dja/data/signal/mxy_data/{data_type}/{c}/{date}'
        data_3020 = pd.read_parquet(path)
        #print(len(data_3020))
        data_3020=data_3020.sort_values(by=['time_sec','code'])\
                            .reset_index(drop=True).round(3).rename(columns={'code':'symbol'})\
                            # .drop(['event_id','factor_trigger_type'],axis=1)
        data_3020['price']=(data_3020['ask1']+data_3020['bid1'])/2
        data_3020 = data_3020[data_3020['symbol']%100 == contract_month].reset_index(drop=True)
        if data_3020.empty:
            contract_month=contract_month+1
            data_3020 = data_3020[data_3020['symbol']%100 ==contract_month].reset_index(drop=True)
        data_3020 = data_3020[data_3020['time_sec']<14110]
        from pyarrow import fs
        import pyarrow.parquet as pq 
        fs = fs.HadoopFileSystem('hdfs://ftxz-hadoop', user='zli')
        path=f'/user/zli/lyt/data/StockIndexFutures/DepthData/{date}'
        depth_data = pq.read_table(path, filesystem=fs).to_pandas()
        depth_data = depth_data[['symbol_id','delivery_code','time',
                                'asks1p','bids1p',
                                'asks1v','bids1v',
                                'asks2p','bids2p',
                                'asks2v','bids2v',
                                'asks3p','bids3p',
                                'asks3v','bids3v',
                                'asks4p','bids4p',
                                'asks4v','bids4v',
                                'asks5p','bids5p',
                                'asks5v','bids5v']]
        # 定义一个转换函数
        def convert_ns_to_standard_time(ns):
            # 将纳秒级时间戳转换为秒级时间戳
            timestamp_s = ns / 1e9
            # 格式化为标准时间，精确到秒的小数点后一位
            return datetime.fromtimestamp(timestamp_s).strftime('%H:%M:%S.%f')[:-5]
        depth_data['time']= depth_data['time'].apply(convert_ns_to_standard_time)
        depth_data = depth_data[(depth_data['time']<'15:00:00') & (depth_data['time']>'09:30:00')]
        depth_data = depth_data[depth_data['delivery_code']%100 == contract_month].reset_index(drop=True)
        def convet_time_to_sec(time_str, base_str="09:30:00.0"):
            # 将输入的时间字符串解析为 datetime 对象
            time_obj = datetime.strptime(time_str, "%H:%M:%S.%f")
            base_obj = datetime.strptime(base_str, "%H:%M:%S.%f")
            
            # 计算从 base 时间起的秒数差值
            seconds_diff = (time_obj - base_obj).total_seconds()
            if seconds_diff < 0:
                return 0
            elif seconds_diff < 7200:
                return seconds_diff
            elif seconds_diff >7200:
                return seconds_diff-5400
        depth_data['time']=depth_data['time'].apply(convet_time_to_sec)
        FtMap1 = {
            3692:'IC',
            3693:'IF',
            3694:'IH',
            5756:'IM'
        }
        fut_map = {
        '1':'IC',
        '2':'IF',
        '3':'IH',
        '4':'IM',
        'IC':1,
        'IF':2,
        'IH':3,
        'IM':4
        }
        depth_data['symbol_id']=depth_data['symbol_id'].map(FtMap1).map(fut_map)*10000 + depth_data['delivery_code'].astype(int)
        depth_data = depth_data[['symbol_id','time',
                                'asks1p','bids1p',
                                'asks1v','bids1v',
                                'asks2p','bids2p',
                                'asks2v','bids2v',
                                'asks3p','bids3p',
                                'asks3v','bids3v',
                                'asks4p','bids4p',
                                'asks4v','bids4v',
                                'asks5p','bids5p',
                                'asks5v','bids5v']] 
        depth_data=depth_data.sort_values(by=['time','symbol_id']).reset_index(drop=True)
        data_3020['time_sec'] = data_3020['time_sec'].round(1)
        #print(data_3020['time_sec'])
        depth_data['time'] = depth_data['time'].round(1)
        data_3020=pd.merge(data_3020,depth_data,how='left',left_on=['symbol','time_sec'],right_on=['symbol_id','time'])
        data_3020=data_3020.drop(['ask1','bid1','symbol_id','time'],axis=1)
        context = Context(initial_balance=10000000)
        exchange = Exchange(context)
        for _, row in data_3020.iterrows():
            symbol = row['symbol']
            price = row['price']
            time= row['time_sec']
            signal = row['pv2']
            askp={
                1:row['asks1p'],
                2:row['asks2p'],
                3:row['asks3p'],
                4:row['asks4p'],
                5:row['asks5p']
            }
            bidp={
                1:row['bids1p'],
                2:row['bids2p'],
                3:row['bids3p'],
                4:row['bids4p'],
                5:row['bids5p']
            }
            askv={
                1:row['asks1v'],
                2:row['asks2v'],
                3:row['asks3v'],
                4:row['asks4v'],
                5:row['asks5v']
            }
            bidv={
                1:row['bids1v'],
                2:row['bids2v'],
                3:row['bids3v'],
                4:row['bids4v'],
                5:row['bids5v']
            }
            strategy(exchange, symbol, askp, bidp, askv, bidv, price, signal, time, quantile1_dict[c], quantile99_dict[c])
        print("回测结束")
        #print(f'{date}信号{c}取消的订单个数{len(context.order_book[context.order_book["status"]=="canceled"])/len(context.order_book)}')
        order_log=pd.DataFrame(exchange.trade_logs)
        a=exchange.get_long_position_value()
        b=exchange.get_short_position_value()
        pnl_tmp=(abs(a)+b+exchange.get_balance())-10000000
        pnl_results.append((date, c, pnl_tmp,len(exchange.trade_logs)))
    return pnl_results

In [14]:

pnl_df=pd.DataFrame(index=date_list,columns=signal_list)
trade_count=pd.DataFrame(index=date_list,columns=signal_list)
# 主线程中合并结果到 pnl_df
for result in results:
    for date, signal, pnl_tmp, td_count in result:
        pnl_df.loc[date, signal] = pnl_tmp
        trade_count.loc[date, signal] = td_count

In [15]:
pnl_df.mean()/trade_count.mean()

3232    163.837389
9232    172.513178
9278     80.308742
9279    140.600108
9280    117.568406
9281    119.269462
dtype: object

In [16]:
pnl_df


Unnamed: 0,3232,9232,9278,9279,9280,9281
20241101,38389.03634,75095.1128,84576.68646,45566.40228,126131.40506,94725.35154
20241104,-13683.56398,61804.9678,43983.62924,45298.21472,520.39498,18715.04864
20241105,-50538.34748,-82598.16554,-3869.42852,-64033.23794,-45901.15212,-70530.34728
20241106,109385.06762,148942.95512,27063.31794,125549.88506,76369.77282,134931.22394
20241107,-54343.31004,-52811.8568,19787.5124,-27619.8078,-21977.12046,-44139.77634
20241108,128078.18744,89732.32814,25956.51276,52267.8495,68020.51512,63548.3675
20241111,68500.52036,37705.63634,61780.94868,50632.51422,37555.03686,43911.36256
20241112,61521.44284,89537.63152,-1174.0045,68704.69644,68482.49124,43411.54788
20241113,22326.76312,47396.0028,35422.01474,68940.45346,32778.19734,31982.6117
20241114,22867.1792,31365.40798,-29003.43756,-17452.96306,18630.78526,2574.80734


In [17]:
trade_count

Unnamed: 0,3232,9232,9278,9279,9280,9281
20241101,261,250,236,242,214,211
20241104,51,74,98,67,62,64
20241105,116,165,233,162,146,164
20241106,174,210,232,166,182,208
20241107,106,124,218,161,128,137
20241108,204,222,280,230,226,234
20241111,98,94,140,88,86,98
20241112,192,197,182,212,171,188
20241113,84,132,251,156,116,144
20241114,152,193,280,169,161,153


In [18]:
pnl_df.mean()

3232     22005.40936
9232    28669.533729
9278    17893.791481
9279    26485.545398
9280     19347.35085
9281    20357.806264
dtype: object

In [19]:
trade_count.mean()

3232    134.3125
9232    166.1875
9278    222.8125
9279     188.375
9280    164.5625
9281    170.6875
dtype: object

# 线上数据分析

In [129]:
# 创建 Context 和 Exchange 实例
def strategy(exchange, symbol, askp, bidp, askv, bidv, price, signal, time, quantile1, quantile99):
    openlong=quantile99
    openshort=quantile1
    closelong=quantile1
    closeshort=quantile99
    #进入策略先检查order_book
    exchange.set_price(symbol, askp[1], bidp[1], price) 
    exchange.set_time(time)
    order_price_bonus=0.0001
    exchange.update_equity()
    exchange.check_order_book(symbol, askp, bidp, askv, bidv)
#add_order(self, symbol, price, quantity, time_sec, order_type, status='pending'):
    if signal > openlong :  # 做多信号
        if exchange.get_symbol_positions(symbol, "long").empty: #没有多头仓位
            if exchange.get_symbol_positions(symbol, "short").empty: #也没有空头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time, "openlong") #开一个多头就行
            else: #有空头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time,  "closeshort") #关闭一个空头
                exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time,  "openlong")   #再开一个多头

    elif signal < openshort:  # 做空信号
        if exchange.get_symbol_positions(symbol,"short").empty: #没有空头仓位
            if exchange.get_symbol_positions(symbol, "long").empty: #也没有多头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "openshort") #开一个空头就行
            else: #有空头仓位
                quantity = 1  # 假设每次买入1个单位
                exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "closelong") #关闭一个多头
                exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "openshort")   #再开一个空头

    elif signal < closelong:  # 平多信号
        if not exchange.get_symbol_positions(symbol, "long").empty: #有多头仓位
            quantity = 1  
            exchange.context.add_order(symbol, bidp[1]*(1-order_price_bonus), quantity, time, "closelong") #平掉多头

    elif signal > closeshort:  # 平空信号
        if not exchange.get_symbol_positions(symbol, "short").empty: #有空头仓位
            quantity = 1
            exchange.context.add_order(symbol, askp[1]*(1+order_price_bonus), quantity, time,  "closeshort") #平掉空头        

    
    

In [130]:
date_list=[file for file in os.listdir('/home/dja/data/signal/mxy_data/onlinedata/9278')]
date_list=date_list[-10:]
quantile1_dict = {}
quantile99_dict = {}
signal_list=[3232,9232,9278,9279,9280,9281]
pnl_df=pd.DataFrame(index=date_list,columns=signal_list).fillna(0)
trade_count=pd.DataFrame(index=date_list,columns=signal_list).fillna(0)
def get_pnl(c):
    # 先获取这个信号的quantile99和quantile1
    
    import warnings
    warnings.filterwarnings("ignore", category=FutureWarning)
    get_quantile(quantile1_dict, quantile99_dict, c, date_list)
    pnl_results = []
    print(f'信号{c}')
    
    for date in date_list:
        print(date)
        path = f'/home/dja/data/signal/mxy_data/onlinedata/{c}/{date}'
        data_3020 = pd.read_parquet(path)
        data_3020=data_3020.sort_values(by=['time_sec','code'])\
                            .reset_index(drop=True).round(3).rename(columns={'code':'symbol'})\
                            # .drop(['event_id','factor_trigger_type'],axis=1)
        data_3020['price']=(data_3020['ask1']+data_3020['bid1'])/2
        data_3020 = data_3020[data_3020['symbol']%100 == 11].reset_index(drop=True)
        data_3020 = data_3020[data_3020['time_sec']<14110]
        from pyarrow import fs
        import pyarrow.parquet as pq 
        fs = fs.HadoopFileSystem('hdfs://ftxz-hadoop', user='zli')
        path=f'/user/zli/lyt/data/StockIndexFutures/DepthData/{date}'
        depth_data = pq.read_table(path, filesystem=fs).to_pandas()
        depth_data = depth_data[['symbol_id','delivery_code','time',
                                'asks1p','bids1p',
                                'asks1v','bids1v',
                                'asks2p','bids2p',
                                'asks2v','bids2v',
                                'asks3p','bids3p',
                                'asks3v','bids3v',
                                'asks4p','bids4p',
                                'asks4v','bids4v',
                                'asks5p','bids5p',
                                'asks5v','bids5v']]
        # 定义一个转换函数
        def convert_ns_to_standard_time(ns):
            # 将纳秒级时间戳转换为秒级时间戳
            timestamp_s = ns / 1e9
            # 格式化为标准时间，精确到秒的小数点后一位
            return datetime.fromtimestamp(timestamp_s).strftime('%H:%M:%S.%f')[:-5]
        depth_data['time']= depth_data['time'].apply(convert_ns_to_standard_time)
        depth_data = depth_data[(depth_data['time']<'15:00:00') & (depth_data['time']>'09:30:00')]
        depth_data = depth_data[depth_data['delivery_code']%100 == 11].reset_index(drop=True)
        def convet_time_to_sec(time_str, base_str="09:30:00.0"):
            # 将输入的时间字符串解析为 datetime 对象
            time_obj = datetime.strptime(time_str, "%H:%M:%S.%f")
            base_obj = datetime.strptime(base_str, "%H:%M:%S.%f")
            
            # 计算从 base 时间起的秒数差值
            seconds_diff = (time_obj - base_obj).total_seconds()
            if seconds_diff < 0:
                return 0
            elif seconds_diff < 7200:
                return seconds_diff
            elif seconds_diff >7200:
                return seconds_diff-5400
        depth_data['time']=depth_data['time'].apply(convet_time_to_sec)
        FtMap1 = {
            3692:'IC',
            3693:'IF',
            3694:'IH',
            5756:'IM'
        }
        fut_map = {
        '1':'IC',
        '2':'IF',
        '3':'IH',
        '4':'IM',
        'IC':1,
        'IF':2,
        'IH':3,
        'IM':4
        }
        depth_data['symbol_id']=depth_data['symbol_id'].map(FtMap1).map(fut_map)*10000 + depth_data['delivery_code'].astype(int)
        depth_data = depth_data[['symbol_id','time',
                                'asks1p','bids1p',
                                'asks1v','bids1v',
                                'asks2p','bids2p',
                                'asks2v','bids2v',
                                'asks3p','bids3p',
                                'asks3v','bids3v',
                                'asks4p','bids4p',
                                'asks4v','bids4v',
                                'asks5p','bids5p',
                                'asks5v','bids5v']] 
        depth_data=depth_data.sort_values(by=['time','symbol_id']).reset_index(drop=True)
        data_3020['time_sec'] = data_3020['time_sec'].round(1)
        depth_data['time'] = depth_data['time'].round(1)
        data_3020=pd.merge(data_3020,depth_data,how='left',left_on=['symbol','time_sec'],right_on=['symbol_id','time'])
        data_3020=data_3020.drop(['ask1','bid1','symbol_id','time'],axis=1)
        context = Context(initial_balance=10000000)
        exchange = Exchange(context)
        for _, row in data_3020.iterrows():
            symbol = row['symbol']
            price = row['price']
            time= row['time_sec']
            signal = row['pv2']
            askp={
                1:row['asks1p'],
                2:row['asks2p'],
                3:row['asks3p'],
                4:row['asks4p'],
                5:row['asks5p']
            }
            bidp={
                1:row['bids1p'],
                2:row['bids2p'],
                3:row['bids3p'],
                4:row['bids4p'],
                5:row['bids5p']
            }
            askv={
                1:row['asks1v'],
                2:row['asks2v'],
                3:row['asks3v'],
                4:row['asks4v'],
                5:row['asks5v']
            }
            bidv={
                1:row['bids1v'],
                2:row['bids2v'],
                3:row['bids3v'],
                4:row['bids4v'],
                5:row['bids5v']
            }
            strategy(exchange, symbol, askp, bidp, askv, bidv, price, signal, time, quantile1_dict[c], quantile99_dict[c])

        if exchange.get_long_position_value() !=0:
            exchange.close_position(symbol,1,price,'long')
        elif exchange.get_short_position_value() !=0:
            exchange.close_position(symbol,1,price,'short')
        print("回测结束")
        # 检查是否有持仓
        #print(f'{date}信号{c}取消的订单个数{len(context.order_book[context.order_book["status"]=="canceled"])/len(context.order_book)}')
        order_log=pd.DataFrame(exchange.trade_logs)
        a=exchange.get_long_position_value()
        b=exchange.get_short_position_value()
        pnl_tmp=(abs(a)+abs(b)+order_log['pnl'].sum()+exchange.get_balance())-10000000
        pnl_results.append((date, c, pnl_tmp,len(exchange.trade_logs)))
    return pnl_results

In [131]:
from joblib import Parallel, delayed

results = Parallel(n_jobs=-1)(delayed(get_pnl)(c) for c in signal_list)

# 主线程中合并结果到 pnl_df
for result in results:
    for date, signal, pnl_tmp, td_count in result:
        pnl_df.loc[date, signal] = pnl_tmp.round(3)
        trade_count.loc[date, signal] = td_count

信号9280
20241030
信号9281
20241030
信号9279
20241030
信号3232
20241030
信号9278
20241030
信号9232
20241030
回测结束
20241031
回测结束
20241031
回测结束
20241031




回测结束
20241031
回测结束
20241031
回测结束
20241031
回测结束
20241101
回测结束
20241101
回测结束
20241101
回测结束
20241101
回测结束
20241101




回测结束
20241101
回测结束
20241104
回测结束
20241104
回测结束
20241104




回测结束
20241104
回测结束
20241104




回测结束
20241104
回测结束
20241105
回测结束
20241105




回测结束
20241105
回测结束
20241105
回测结束
20241105
回测结束
20241105
回测结束
20241106
回测结束
20241106
回测结束
20241106
回测结束
20241106
回测结束
20241106
回测结束
20241106




回测结束
20241107
回测结束
20241107




回测结束
20241107




回测结束
20241107
回测结束
20241107




回测结束
20241107
回测结束
20241108




回测结束
20241108
回测结束
20241108
回测结束
20241108




回测结束
20241108
回测结束
20241108
回测结束
20241111
回测结束
20241111




回测结束
20241111




回测结束
20241111
回测结束
20241111
回测结束
20241111
回测结束
20241112
回测结束
20241112




回测结束
20241112




回测结束
20241112
回测结束
20241112




回测结束
20241112
回测结束
回测结束




回测结束
回测结束
回测结束
回测结束


In [132]:
pnl_df

Unnamed: 0,3232,9232,9278,9279,9280,9281
20241030,8246.999,21555.558,-43691.821,-20557.221,-26307.76,-34966.621
20241031,25481.635,39567.388,55707.05,24662.593,3938.257,51682.631
20241101,-10591.678,19544.607,22866.897,11459.751,60363.703,24450.459
20241104,26732.486,37287.066,38465.021,46128.972,17768.683,39648.934
20241105,66840.972,87486.565,62301.406,57459.057,43321.045,99721.096
20241106,126772.701,87434.69,25747.737,47103.228,65666.516,85889.661
20241107,32589.183,3324.869,77249.077,64762.264,79213.179,33598.584
20241108,31629.564,23597.777,-118526.995,-42270.154,-84808.077,-72930.35
20241111,156209.931,63816.965,122241.466,94975.489,120292.405,103537.424
20241112,-369.872,37713.342,-18189.718,-24066.55,2355.674,-29011.83


In [133]:
trade_count

Unnamed: 0,3232,9232,9278,9279,9280,9281
20241030,81,221,304,247,200,209
20241031,106,195,222,214,207,241
20241101,193,284,292,254,239,241
20241104,30,71,187,108,119,127
20241105,101,163,285,273,253,253
20241106,132,168,237,213,195,215
20241107,83,150,305,241,230,256
20241108,182,214,322,297,257,305
20241111,79,95,244,161,139,143
20241112,164,239,305,291,229,247


In [134]:
pnl_df.mean()

3232    46354.1921
9232    42132.8827
9278    22417.0120
9279    25965.7429
9280    28180.3625
9281    30161.9988
dtype: float64

In [135]:
trade_count.mean()

3232    115.1
9232    180.0
9278    270.3
9279    229.9
9280    206.8
9281    223.7
dtype: float64

In [136]:
pnl_df.mean()/trade_count.mean()

3232    402.729732
9232    234.071571
9278     82.933822
9279    112.943640
9280    136.268677
9281    134.832359
dtype: float64