In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import warnings
import datetime as dt
from tqdm import tqdm
import logging
import pyTSL as pt
warnings.filterwarnings('ignore')
from Config import factor_config as fc
from Config import backtest_config as bc

In [2]:
class Account:

    def __init__(self, init_cash):

        self.position_types = ['yesterday_long', 'yesterday_short', 'today_long', 'today_short'] # 交易类型

        # 持仓情况
        self.position = {t: 0 for t in self.position_types}

        # 现金情况
        self.wallet = {
            'cash': init_cash,
            'security': init_cash,
        }

        # 每分钟action记录
        self.actions = {t: 0 for t in self.position_types}
        self.action_prices = {t: 0 for t in self.position_types}

        # 交易数据
        self.slip_rate = bc.slip_rate # 滑点
        self.comm_rate = bc.comm_rate # 平昨手续费
        self.secu_rate = bc.secu_rate # 保证金比例
        self.multiplier = bc.multiplier # 乘数

        # 储存结果
        self.result = pd.DataFrame()
        self.day_log = pd.DataFrame(columns=['交易合约', '交易方向', '开仓价格', '平仓时间', '平仓价格', '收益率'])

        # # 日志
        self.logger = logging.getLogger('Account')
        logging.basicConfig(filename=os.path.join(bc.result_save_path, "backtest account.log"), 
                            format="%(levelname)s:%(message)s", 
                            level=logging.INFO)


    def min_action(self, method, pre_price, price, time, pre_head, head):
        '''
        操作账户, 记录操作价\n
        Args: 
            method('close_long', 'close_short', 'go_long', 'go_short', 'No-op'):
            price(float):
        '''
        if method == 'close_long': # 多平 
            slip_price = pre_price * (1 - self.slip_rate)

            # 记录action
            self.actions['yesterday_long'] += 1
            self.action_prices['yesterday_long'] = slip_price

            # 记录每笔交易
            self.record_trade(pre_head, -1, time, slip_price)

            # 记录仓位
            self.position['yesterday_long'] -= 1

        if method == 'close_short': # 空平 
            slip_price = pre_price * (1 + self.slip_rate)

            # 记录action
            self.actions['yesterday_short'] += 1
            self.action_prices['yesterday_short'] = slip_price

            # 记录每笔交易
            self.logger.info(f'{time}: [{pre_head}]----[1][1 hand]')
            self.record_trade(pre_head, 1, time, slip_price)

            # 记录仓位
            self.position['yesterday_short'] -= 1

        if method == 'go_long': # 做多
            slip_price = price * (1 + self.slip_rate)

            # 记录action
            self.actions['today_long'] += 1
            self.action_prices['today_long'] = slip_price

            # 记录每笔交易
            self.logger.info(f'{time}: [{head}]----[1][1 hand]')
            self.record_trade(head, 1, time, slip_price)

            # 记录仓位
            self.position['today_long'] += 1

        if method == 'go_short': # 做空
            slip_price = price * (1 - self.slip_rate)

            # 记录action
            self.actions['today_short'] += 1
            self.action_prices['today_short'] = slip_price

            # 记录每笔交易
            self.logger.info(f'{time}: [{head}]----[-1][1 hand]')
            self.record_trade(head, -1, time, slip_price)

            # 记录仓位
            self.position['today_short'] += 1


    def min_settlement(self, pre_price_0s, pre_price_60s, price_0s, price_60s, time):
        '''每分钟结算现金流'''
        # 计算预估cash
        # 计算昨多
        flow1 = self.position['yesterday_long'] * (pre_price_60s - pre_price_0s) * self.multiplier \
              + self.actions['yesterday_long'] * (self.action_prices['yesterday_long'] - pre_price_0s) * self.multiplier
        
        # 计算昨空
        flow2 = -self.position['yesterday_short'] * (pre_price_60s - pre_price_0s) * self.multiplier \
              - self.actions['yesterday_short'] * (self.action_prices['yesterday_short'] - pre_price_0s) * self.multiplier
        
        # 计算今多
        flow3 = (self.position['today_long'] - self.actions['today_long'])  * (price_60s - price_0s) * self.multiplier \
              + self.actions['today_long'] * (price_60s - self.action_prices['today_long']) * self.multiplier
        
        # 计算今空
        flow4 = -(self.position['today_short'] - self.actions['today_short'])  * (price_60s - price_0s) * self.multiplier \
              - self.actions['today_short'] * (price_60s - self.action_prices['today_short']) * self.multiplier
        
        # 计算手续费
        flow5 = self.actions['yesterday_long'] * self.action_prices['yesterday_long'] * self.multiplier * self.comm_rate \
              + self.actions['yesterday_short'] * self.action_prices['yesterday_short'] * self.multiplier * self.comm_rate \
              + self.actions['today_long'] * self.action_prices['today_long'] * self.multiplier * self.comm_rate \
              + self.actions['today_short'] * self.action_prices['today_short'] * self.multiplier * self.comm_rate

        total_flow = flow1 + flow2 + flow3 + flow4 + flow5

        # 结算cash
        self.wallet['cash'] += total_flow

        # 结算action
        self.actions = {
            'today_long': 0,
            'today_short': 0,
            'yesterday_long': 0,
            'yesterday_short': 0,
        }

        self.action_prices = {
            'today_long': 0,
            'today_short': 0,
            'yesterday_long': 0,
            'yesterday_short': 0,
        }

        # 记录日志信息
        self.logger.info(f'{time}: Profit: {total_flow}')


    def day_settlement(self, close, date):
        '''日末账户结算'''
        if (self.position['yesterday_long'] != 0) or (self.position['yesterday_short'] != 0):
            print(f'{date}昨仓未平完！')

        # 计算保证金
        long_holdings = self.position['today_long']
        short_holdings = self.position['today_short']
        position_expose = max(long_holdings, short_holdings)
        total_value = position_expose * close * self.multiplier
        security = total_value * self.secu_rate

        # 结算保证金
        self.wallet['security'] = security

        # 合约切换
        self.position['yesterday_long'] = self.position['today_long']
        self.position['yesterday_short'] = self.position['today_short']
        self.position['today_long'] = 0
        self.position['today_short'] = 0

        # 记录每日净值
        self.result.loc[date, 'cash'] = self.wallet['cash']
        self.result.loc[date, 'security'] = self.wallet['security']

        return self.wallet['cash']


    def save_result(self):
        '''回测结束, 保存结果'''
        self.result.to_excel(os.path.join(bc.result_save_path, 'Backtest_result.xlsx'))
        
        writer = pd.ExcelWriter(os.path.join(bc.result_save_path, 'Account_log.xlsx'))
        self.day_log.to_excel(writer, sheet_name='action')

        writer.close()


    def record_trade(self, head, dir, time, price):
        '''记录交易'''
        pos_long = self.position['yesterday_long'] +self.position['today_long']
        pos_short = self.position['yesterday_short'] +self.position['today_short']

        if dir == 1: # 多一手
            if pos_long == pos_short: # 当前平仓, 开多

                self.day_log.loc[time] = [head, 1, price, False, False, False]

            else: # 平空

                # 获取开仓价格
                date = self.day_log.loc[self.day_log['平仓时间'] == False].iloc[0].name
                buy_price = self.day_log.loc[date, '开仓价格']

                # 计算收益率
                res = (-(price - buy_price) - (buy_price * self.comm_rate + price * self.comm_rate)) / buy_price
                
                self.day_log.loc[date, ['平仓时间', '平仓价格', '收益率']] = [time, price, res]
                
        elif dir == -1: # 空一手
            if pos_long == pos_short: # 当前平仓, 开空

                self.day_log.loc[time] = [head, -1, price, False, False, False]

            else: # 平多

                # 获取开仓价格
                date = self.day_log.loc[self.day_log['平仓时间'] == False].iloc[0].name
                buy_price = self.day_log.loc[date, '开仓价格']

                # 计算收益率
                res =  (price - buy_price - buy_price * self.comm_rate - price * self.comm_rate) / buy_price
                
                self.day_log.loc[date, ['平仓时间', '平仓价格', '收益率']] = [time, price, res]


class Backtest:

    def __init__(self):

        self.head_future = pd.read_excel(os.path.join(bc.head_path, 'IC_head_future.xlsx')) # 外部数据导入
        self.trade_dates = self.get_trade_dates() # 选取交易时间段

        self.account = Account(bc.init_cash)
        self.strategy = Strategy()


        self.start_time = bc.start_time
        self.end_time = bc.end_time
        self.pre_head = ''


    def get_head(self, date):
        '''获取日期对应的主流合约'''
        next_idx = self.head_future[self.head_future['trade_date'] == date].index
        if next_idx == 0:
            return None
        read_future = self.head_future.iloc[next_idx - 1]['IC'].values[0]
        return read_future


    def get_data(self, time, future):
        '''获取期货价格数据'''
        # 读取合约数据
        if future == '':
            return pd.DataFrame(columns=['price'])
        read_date = time.strftime('%Y%m%d')
        file_path = future + '_' + read_date + '.tdf'
        f_data = pd.read_pickle(os.path.join(fc.kline_data, file_path))[['date', 'price', 'vol']]
        f_data['date'] = f_data.apply(lambda x: pt.DoubleToDatetime(x['date']), axis=1)

        # 合并每秒数据
        f_data['amount'] = f_data['price'] * f_data['vol']
        price = f_data.groupby('date').apply(lambda x: x['amount'].sum() / x['vol'].sum())
        price.name = 'price'
        return price


    def get_trade_dates(self):
        '''获取交易日期'''
        file_list = os.listdir(fc.kline_data)
        file_dates = pd.Series(pd.to_datetime(n_[8: 16], format="%Y%m%d") for n_ in file_list).drop_duplicates()
        file_dates = file_dates[(file_dates > bc.start_date) & (file_dates <= bc.end_date)]
        file_dates = file_dates.reset_index(drop=True)
        return file_dates


    def change_current_head(self, today_head):
        if self.pre_head != today_head:
            self.pre_head = today_head


    def run(self):
        """运行回测"""

        for i in tqdm(range(len(self.trade_dates)), desc='Backtest...'):

            # 获取日期及交易时间
            today = self.trade_dates[i]
            today_str = today.strftime('%Y-%m-%d ')
            day_seconds = pd.date_range(start=today_str+'09:30:00', end=today_str+'15:00:00', freq='S')
            day_seconds = pd.DataFrame(index=day_seconds)
            day_minutes = pd.date_range(start=today_str+'09:30:00', end=today_str+'15:00:00', freq='T')
            start = pd.Timestamp(today_str+self.start_time)
            end = pd.Timestamp(today_str+self.end_time)
            pause = pd.Timestamp(today_str+'11:30:00')
            pauseend = pd.Timestamp(today_str+'13:00:00')
            trade_minutes = day_minutes[((day_minutes >= start) & (day_minutes < pause)) | ((day_minutes >= pauseend) & (day_minutes < end))]

            # 获取当日主流合约数据
            today_head = self.get_head(today)
            today_data = self.get_data(today, today_head)
            today_data = pd.merge(day_seconds, today_data, right_index=True, left_index=True, how='left')
            today_data = today_data.fillna(method='ffill')

            # 获取前日持有主流合约数据
            pre_data = self.get_data(today, self.pre_head)
            pre_data = pd.merge(day_seconds, pre_data, right_index=True, left_index=True, how='left')
            pre_data = pre_data.fillna(method='ffill').fillna(0)

            for j in range(len(trade_minutes)):

                min_ = trade_minutes[j]
                pre_min_ = min_ - pd.DateOffset(minutes=1)
                min_15s = pre_min_ + pd.DateOffset(seconds=15)
                min_ = min_.strftime('%Y-%m-%d %H:%M:%S')
                pre_min_ = pre_min_.strftime('%Y-%m-%d %H:%M:%S')
                min_15s = min_15s.strftime('%Y-%m-%d %H:%M:%S')

                # 分钟初: 获取本分钟交易观点, 更新持仓
                trade_methods = self.strategy.signal_min(min_, self.account)
                today_price_15s = today_data.loc[min_15s, 'price']
                pre_price_15s = pre_data.loc[min_15s, 'price']
                
                for trade_method in trade_methods:
                    self.account.min_action(trade_method, pre_price_15s, today_price_15s, min_15s, self.pre_head, today_head) # 调整持仓

                # 分钟末: 结算本分钟收益
                today_price_0s = today_data.loc[pre_min_, 'price']
                today_price_60s = today_data.loc[min_, 'price']
                pre_price_0s = pre_data.loc[pre_min_, 'price']
                pre_price_60s = pre_data.loc[min_, 'price']
                self.account.min_settlement(pre_price_0s, pre_price_60s, today_price_0s, today_price_60s, min_)

            min_ = end
            pre_min_ = min_ - pd.DateOffset(minutes=1)
            min_15s = pre_min_ + pd.DateOffset(seconds=15)
            min_ = min_.strftime('%Y-%m-%d %H:%M:%S')
            pre_min_ = pre_min_.strftime('%Y-%m-%d %H:%M:%S')
            min_15s = min_15s.strftime('%Y-%m-%d %H:%M:%S')

            # 日末: 获取日末交易观点, 更新持仓
            trade_methods = self.strategy.signal_dayend(self.account)
            today_price_15s = today_data.loc[min_15s, 'price']
            pre_price_15s = pre_data.loc[min_15s, 'price']
            for trade_method in trade_methods:
                self.account.min_action(trade_method, pre_price_15s, today_price_15s, min_15s, self.pre_head, today_head) # 调整持仓

            # 日末: 结算本分钟收益
            today_price_0s = today_data.loc[pre_min_, 'price']
            today_price_60s = today_data.loc[min_, 'price']
            pre_price_0s = pre_data.loc[pre_min_, 'price']
            pre_price_60s = pre_data.loc[min_, 'price']
            self.account.min_settlement(pre_price_0s, pre_price_60s, today_price_0s, today_price_60s, min_)
            
            # 日末: 结算本日账户
            current_cash = self.account.day_settlement(today_price_60s, today)

            # 日末: 切换当月及下月连续
            self.change_current_head(today_head) # 切换当月及下月连续

            # 日末: 根据账户调整手数上限
            self.strategy.change_limit(current_cash, today_price_60s)

        self.account.save_result()
        

class Strategy:
    
    def __init__(self):
        self.leverage = bc.leverage
        self.limits = bc.limits
        self.multiplier = bc.multiplier # 乘数
        self.secu_rate = bc.secu_rate # 保证金比例
        self.preds = self.get_preds()
        self.pred_list = list(self.preds.columns)


    def get_preds(self):
        pred1 = pd.read_pickle(bc.pred1_path)
        pred2 = pd.read_pickle(bc.pred2_path)
        pred3 = pd.read_pickle(bc.pred3_path)
        pred4 = pd.read_pickle(bc.pred4_path)
        pred5 = pd.read_pickle(bc.pred5_path)
        preds = pd.merge(pred1, pred2, right_index=True, left_index=True, how='outer')
        preds = pd.merge(preds, pred3, right_index=True, left_index=True, how='outer')
        preds = pd.merge(preds, pred4, right_index=True, left_index=True, how='outer')
        preds = pd.merge(preds, pred5, right_index=True, left_index=True, how='outer')
        return preds


    def signal_min(self, time, account):
        pos = account.position
        yesterday_long = pos['yesterday_long']
        today_long = pos['today_long']
        yesterday_short = pos['yesterday_short']
        today_short = pos['today_short']
        pos_long = yesterday_long + today_long
        pos_short = yesterday_short + today_short

        # sig start
        sig1 = self.sig_strategy1(time, pos_long, pos_short, today_long, today_short)

        final_sig = sig1 # 最终观点
        # sig end
        
        trade_methods = self.sig2method(final_sig, yesterday_long, today_long, yesterday_short, today_short)
        return trade_methods

 
    def signal_dayend(self, account):
        '''日末策略信号'''
        pos = account.position
        yesterday_long = pos['yesterday_long']
        today_long = pos['today_long']
        yesterday_short = pos['yesterday_short']
        today_short = pos['today_short']
        pos_long = yesterday_long + today_long
        pos_short = yesterday_short + today_short

        # sig start
        sig1 = self.sig_endlock()

        final_sig = sig1 # 最终观点
        # sig end
        
        trade_methods = self.sig2method(final_sig, yesterday_long, today_long, yesterday_short, today_short)
        return trade_methods


    def sig2method(self, final_sig, yesterday_long, today_long, yesterday_short, today_short):
        '''根据account信息, 将交易观点转化为交易行为'''
        pos_long = yesterday_long + today_long
        pos_short = yesterday_short + today_short
        trade_methods = []

        if pos_long > pos_short: # 当前多仓 

            if final_sig == 1:  # 观点看多, 维持多仓
                pass

            elif final_sig == 0: # 观点平仓, 空仓一手
                if yesterday_long: # 优先多平
                    trade_methods.append('close_long')
                else:  # 今日做空
                    trade_methods.append('go_short')

            elif final_sig == -1: # 观点空仓, 空仓二手
                if yesterday_long >= 2: # 优先多平
                    trade_methods.append('close_long')
                    trade_methods.append('close_long')
                elif yesterday_long == 1:
                    trade_methods.append('close_long')
                    trade_methods.append('go_short')
                else:  # 今日做空
                    trade_methods.append('go_short')
                    trade_methods.append('go_short')


        elif pos_long < pos_short: # 当前空仓

            if final_sig == 1:  # 观点看多, 多仓二手
                if yesterday_short >= 2: # 优先空平
                    trade_methods.append('close_short')
                    trade_methods.append('close_short')
                elif yesterday_short == 1:
                    trade_methods.append('close_short')
                    trade_methods.append('go_long')
                else:  # 今日做多
                    trade_methods.append('go_long')
                    trade_methods.append('go_long')

            elif final_sig == 0: # 观点平仓, 多仓一手
                if yesterday_short: # 优先多平
                    trade_methods.append('close_short')
                else:  # 今日做多
                    trade_methods.append('go_long')

            elif final_sig == -1: # 观点看空, 维持空仓
                pass


        elif pos_long == pos_short: # 当前平仓
            if final_sig == 1:  # 观点看多, 多仓一手
                if yesterday_short >= 1: # 优先空平
                    trade_methods.append('close_short')
                else:  # 今日做多
                    trade_methods.append('go_long')

            elif final_sig == 0: # 观点平仓, 维持平仓
                pass

            elif final_sig == -1: # 观点看空, 空仓一手
                if yesterday_long >= 1: # 优先多平
                    trade_methods.append('close_long')
                else:  # 今日做空
                    trade_methods.append('go_short')

        return trade_methods


    def sig_strategy1(self, time, pos_long, pos_short, today_long, today_short):
        '''交易策略, 产出多/空/平信号'''
        if time not in self.preds.index: # 预测无信号
            return 0
        
        pred_return = self.preds.loc[time]
        
        if pos_long == pos_short: # 当前平仓
            if ((pred_return > 0).sum() == len(self.pred_list)) & (today_long <= self.limits - 1): # 全多，则开多仓
                sig = 1
            elif ((pred_return < 0).sum() == len(self.pred_list)) & (today_short <= self.limits - 1): # 全空，则开空仓
                sig = -1
            else:
                sig = 0

        elif pos_long > pos_short: # 当前多仓
            if ((pred_return < 0).sum() == len(self.pred_list)) & (today_short <= self.limits - 2): # 全空，反手开空仓
                sig = -1
            elif (pred_return < 0).sum() > 0.5 * len(self.pred_list): # 空头信号多, 平多头            
                sig = 0
            else:
                sig = 1

        elif pos_long < pos_short: # 当前空仓
            if ((pred_return > 0).sum() == len(self.pred_list)) & (today_long <= self.limits - 2): # 全多，反手开多仓
                sig = 1
            elif (pred_return > 0).sum() > 0.5 * len(self.pred_list): # 多头信号多, 平空头            
                sig = 0
            else:
                sig = -1

        return sig


    def sig_endlock(self):
        '''日末锁仓'''
        return 0


    def change_limit(self, cash, price):
        value = price * self.multiplier
        secu = value * self.secu_rate
        new_limit = int(cash * self.leverage / secu)
        self.limits = new_limit

In [3]:
if __name__ == '__main__':

    # 运行回测
    mybacktest = Backtest()
    mybacktest.run()

Backtest...: 100%|██████████| 99/99 [03:30<00:00,  2.12s/it]
