![图片描述](./image/两资产配对交易策略.png)

策略逻辑说明

这个策略的逻辑有些简单，一般而言，配对交易策略属于相对价值策略，通过一定的方法(基于基本面逻辑或者基于统计分析)选择具有相关性的两个股票，当两个股票的价差（A-B)比较低的时候,选择做多A，做空B；当价差比较高的时候，做空A，做多B ；基于的基本原理就是价差是均值回归的，涨的太高了，会下跌；下跌的太多了，会升高。

选定两支股票，建设银行与工商银行，因为都是属于银行股票，经营业务有一定的相似性，假设两者可以做配对交易(可以做一定的统计检验，暂时忽略)；

计算建设银行减去工商银行的价差（正常情况下，至少要做个回归，分析下两个股票计算价差的时候的比例，我们默认是1；而且需要检验一下价差是否是均值回归的，我们也默认是均值回归的）
如果价差小于60天均值减去二倍的60天标准差，做多价差，即做多建设银行，做空工商银行；如果价差大于60天均值加上2倍的60天标准差，做空价差，即做空建设银行，做多工商银行。

如果做多了价差，当价差从60天均值上方死叉均线，平掉做多价差的仓位；如果做空了价差，当价差从60日均线下方金叉60日均线，平掉做空价差的仓位；

工商银行和建设银行，每次交易的时候，资金都使用二分之一，即资金平分。

交易手续费按照万分之二计算。

策略简单分析

这是一份toy strategy ，真实的配对交易策略，可能比这复杂的多，

做配对交易，很可能实际交易的时候，需要有很多的多个配对资产同时进行交易，以便能够分散，获得比较稳定的收益。

In [2]:
import backtesting as bt
import statsmodels.api as sm
import datetime
import pandas as pd
import numpy as np
import os,sys
import copy
import talib
import math 
import warnings
warnings.filterwarnings("ignore")
import pyfolio as pf

# 我们使用的时候，直接用我们新的类读取数据就可以了。
class test_two_ma_strategy(bt.Strategy):
   
    params = (('period',30),
              ('mult',2)
             )

    def log(self, txt, dt=None):
        ''' Logging function fot this strategy'''
        dt = dt or self.datas[0].datetime.date(0)
        print('{}, {}'.format(dt.isoformat(), txt))

    def __init__(self):
        # Keep a reference to the "close" line in the data[0] dataseries
        self.bar_num=0    
        # 保存现有持仓的股票
        self.position_dict={}
        # 当前有交易的股票
        self.stock_dict={}
        # 当天持仓状态
        self.marketposition = 0
        # 计算价差
        self.diff = self.datas[0].close - self.datas[1].close
        # 计算布林带
        bbbands = bt.indicators.BollingerBands(self.diff,period = self.p.period,devfactor=self.p.mult)
        self.mid = bbbands.mid
        self.top = bbbands.top
        self.bot = bbbands.bot
        
       
    def prenext(self):
        
        self.next()
        
        
    def next(self):
        # 假设有100万资金，每次成份股调整，每个股票使用1万元
        self.bar_num+=1
        # 前一交易日和当前的交易日
        pre_date = self.datas[0].datetime.date(-1).strftime("%Y-%m-%d")
        current_date = self.datas[0].datetime.date(0).strftime("%Y-%m-%d")
        # 总的价值
        total_value = self.broker.get_value()
        total_cash  = self.broker.get_cash()
        self.log(f"total_value : {total_value}")
        diff = self.diff
        # 做多价差
        if self.marketposition==0 and diff[-1]<self.bot[-1] and diff[0]>self.bot[0]:
            
            self.order_target_percent(self.datas[0],target=0.5)
            self.order_target_percent(self.datas[1],target=-0.5)
            self.marketposition=1
        # 做空价差
        if self.marketposition==0 and diff[-1]>self.top[-1] and diff[0]<self.bot[0]:
            
            self.order_target_percent(self.datas[0],target=-0.5)
            self.order_target_percent(self.datas[1],target=0.5)
            self.marketposition=-1
        # 平仓多头价差
        if self.marketposition==1 and diff[-1]>self.mid[-1] and diff[0]<self.mid[0]:
            
            self.order_target_percent(self.datas[0],target=0)
            self.order_target_percent(self.datas[1],target=0)
            self.marketposition=0
            
        # 平仓空头价差
        if self.marketposition==-1 and diff[-1]<self.mid[-1] and diff[0]>self.mid[0]:
            
            self.order_target_percent(self.datas[0],target=0)
            self.order_target_percent(self.datas[1],target=0)
            self.marketposition=0
        
                  
    def notify_order(self, order):
        
        if order.status in [order.Submitted, order.Accepted]:
            return
        
        if order.status == order.Rejected:
            self.log(f"Rejected : order_ref:{order.ref}  data_name:{order.p.data._name}")
            
        if order.status == order.Margin:
            self.log(f"Margin : order_ref:{order.ref}  data_name:{order.p.data._name}")
            
        if order.status == order.Cancelled:
            self.log(f"Concelled : order_ref:{order.ref}  data_name:{order.p.data._name}")
            
        if order.status == order.Partial:
            self.log(f"Partial : order_ref:{order.ref}  data_name:{order.p.data._name}")
         
        if order.status == order.Completed:
            if order.isbuy():
                self.log(f" BUY : data_name:{order.p.data._name} price : {order.executed.price} , cost : {order.executed.value} , commission : {order.executed.comm}")

            else:  # Sell
                self.log(f" SELL : data_name:{order.p.data._name} price : {order.executed.price} , cost : {order.executed.value} , commission : {order.executed.comm}")
    
    def notify_trade(self, trade):
        # 一个trade结束的时候输出信息
        if trade.isclosed:
            self.log('closed symbol is : {} , total_profit : {} , net_profit : {}' .format(
                            trade.getdataname(),trade.pnl, trade.pnlcomm))
            
        if trade.isopen:
            self.log('open symbol is : {} , price : {} ' .format(
                            trade.getdataname(),trade.price))
    def stop(self):
        
        pass 
                
        
# 初始化cerebro,获得一个实例
cerebro = bt.Cerebro()
# cerebro.broker = bt.brokers.BackBroker(shortcash=True)  # 0.5%
data_root = "./stock/day/"

params=dict(
    
    fromdate = datetime.datetime(2009,1,4),
    todate = datetime.datetime(2020,7,31),
    timeframe = bt.TimeFrame.Days,
    dtformat = ("%Y-%m-%d"),
    # compression = 1,
    datetime = 0,
    open = 1,
    high = 2,
    low =3,
    close =4,
    volume =5,
    openinterest=-1)
# 加载建设银行
df = pd.read_csv("./stock/day/601939.XSHG.csv")
df.columns = ['datetime','open','high','low','close','volume','openinterest']
df.index = pd.to_datetime(df['datetime'])
df = df[['open','high','low','close','volume','openinterest']]
# df = df[(df.index<=params['todate'])&(df.index>=params['fromdate'])]
feed = bt.feeds.PandasDirectData(dataname = df)
# 添加数据到cerebro
cerebro.adddata(feed, name = "jsyh")
# 加载工商银行
df = pd.read_csv("./stock/day/601398.XSHG.csv")
df.columns = ['datetime','open','high','low','close','volume','openinterest']
df.index = pd.to_datetime(df['datetime'])
df = df[['open','high','low','close','volume','openinterest']]
# df = df[(df.index<=params['todate'])&(df.index>=params['fromdate'])]
feed = bt.feeds.PandasDirectData(dataname = df)
# 添加数据到cerebro
cerebro.adddata(feed, name = "gsyh")

print("加载数据完毕")
# 添加手续费，按照万分之二收取
cerebro.broker.setcommission(commission=0.0002,stocklike=True)
# 设置初始资金为1亿
cerebro.broker.setcash(1_0000_0000)
# 添加策略
cerebro.addstrategy(test_two_ma_strategy)
cerebro.addanalyzer(bt.analyzers.TotalValue, _name='_TotalValue')
cerebro.addanalyzer(bt.analyzers.PyFolio)
# 运行回测
results = cerebro.run()
# 打印相关信息
pyfoliozer = results[0].analyzers.getbyname('pyfolio')
returns, positions, transactions, gross_lev = pyfoliozer.get_pf_items()
pf.create_full_tear_sheet(
    returns,
    positions=positions,
    transactions=transactions,
    # gross_lev=gross_lev,
    live_start_date='2019-01-01',
    )


AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)

策略逻辑

选定了三只股票，建设银行、工商银行和农业银行，计算它们三个的日收益率

求出三个股票的平均收益率以及每个股票的超额收益率，以平均收益率为基准

每个股票分配的金额是：该股票的超额收益率*总的资产/(N个股票的超额收益率的绝对值的和)

该股票的超额收益率大于0,就做空；该股票的超额收益率小于0,就做多；

交易手续费按照万分之二计算。

策略简单分析

从测试的结果来看，策略是亏损的，策略表现不好，这个其实也在意料之中；这个策略逻辑中，对多资产配对交易，处理的不是很好，应该存在更好的逻辑

In [None]:
import backtesting as bt
import statsmodels.api as sm
import datetime
import pandas as pd
import numpy as np
import os,sys
import copy
import talib
import math 
import warnings
warnings.filterwarnings("ignore")
import pyfolio as pf

# 我们使用的时候，直接用我们新的类读取数据就可以了。
class test_two_ma_strategy(bt.Strategy):
   
    params = (('period',30),
              ('mult',2)
             )

    def log(self, txt, dt=None):
        ''' Logging function fot this strategy'''
        dt = dt or self.datas[0].datetime.date(0)
        print('{}, {}'.format(dt.isoformat(), txt))

    def __init__(self):
        # Keep a reference to the "close" line in the data[0] dataseries
        self.bar_num=0    
        # 保存现有持仓的股票
        self.position_dict={}
        # 当前有交易的股票
        self.stock_dict={}
        # 当天持仓状态
        self.marketposition = 0
        # 计算每个资产的收益率
        self.data_rate_dict = {data._name:bt.indicators.PercentChange(data.close,period = self.p.period) for data in self.datas}
        
       
    def prenext(self):
        
        self.next()
        
        
    def next(self):
        # 假设有100万资金，每次成份股调整，每个股票使用1万元
        self.bar_num+=1
        # 前一交易日和当前的交易日
        pre_date = self.datas[0].datetime.date(-1).strftime("%Y-%m-%d")
        current_date = self.datas[0].datetime.date(0).strftime("%Y-%m-%d")
        # 总的价值
        total_value = self.broker.get_value()
        total_cash  = self.broker.get_cash()
        self.log(f"total_value : {total_value}")
        # 平均收益
        mean_rate = sum([i[0] for i in self.data_rate_dict.values()])/len(self.data_rate_dict)
        # 每个资产的超额收益
        self.data_car_dict = {data._name:self.data_rate_dict[data._name][0]- mean_rate   for data in self.datas}
        # 数据准备不足，返回
        if self.bar_num<=self.p.period:
            return 
        # 计算仓位，下单
        if self.bar_num%self.p.period==0:
            for data in self.datas:
                data_name = data._name
                # 先平仓
                data_date = data.datetime.date(0).strftime("%Y-%m-%d")
                size = self.getposition(data).size
                # 如果有仓位
                if size!=0:
                    self.close(data)
                    if data._name in self.position_dict:
                        self.position_dict.pop(data._name)

                # 已经下单，但是订单没有成交
                if data._name in self.position_dict and size==0:
                    order = self.position_dict[data._name]
                    self.cancel(order)
                    self.position_dict.pop(data._name) 
                # 如果两个日期相等，说明股票在交易,就计算收益率，进行排序
                if current_date == data_date:
                    data_rate = self.data_rate_dict[data_name][0]
                    self.log(data_rate)
                    data_invest_value = abs(data_rate)*total_value/sum([abs(i[0]) for i in self.data_rate_dict.values()])
                    lots = int((data_invest_value/(data.close[0]))/100)*100
                    # 如果收益率大于0,做空
                    if data_rate>0:
                        order = self.sell(data,size = lots)
                        self.position_dict[data._name] = order

                    if data_rate<0:
                        order = self.buy(data,size = lots)
                        self.position_dict[data._name] = order
            
                  
    def notify_order(self, order):
        
        if order.status in [order.Submitted, order.Accepted]:
            return
        
        if order.status == order.Rejected:
            self.log(f"Rejected : order_ref:{order.ref}  data_name:{order.p.data._name}")
            
        if order.status == order.Margin:
            self.log(f"Margin : order_ref:{order.ref}  data_name:{order.p.data._name}")
            
        if order.status == order.Cancelled:
            self.log(f"Concelled : order_ref:{order.ref}  data_name:{order.p.data._name}")
            
        if order.status == order.Partial:
            self.log(f"Partial : order_ref:{order.ref}  data_name:{order.p.data._name}")
         
        if order.status == order.Completed:
            if order.isbuy():
                self.log(f" BUY : data_name:{order.p.data._name} price : {order.executed.price} , cost : {order.executed.value} , commission : {order.executed.comm}")

            else:  # Sell
                self.log(f" SELL : data_name:{order.p.data._name} price : {order.executed.price} , cost : {order.executed.value} , commission : {order.executed.comm}")
    
    def notify_trade(self, trade):
        # 一个trade结束的时候输出信息
        if trade.isclosed:
            self.log('closed symbol is : {} , total_profit : {} , net_profit : {}' .format(
                            trade.getdataname(),trade.pnl, trade.pnlcomm))
            
        if trade.isopen:
            self.log('open symbol is : {} , price : {} ' .format(
                            trade.getdataname(),trade.price))
    def stop(self):
        
        pass 
                
        
# 初始化cerebro,获得一个实例
cerebro = bt.Cerebro()
# cerebro.broker = bt.brokers.BackBroker(shortcash=True)  # 0.5%
data_root = "./stock/day/"

params=dict(
    
    fromdate = datetime.datetime(2012,1,4),
    todate = datetime.datetime(2020,7,31),
    timeframe = bt.TimeFrame.Days,
    dtformat = ("%Y-%m-%d"),
    # compression = 1,
    datetime = 0,
    open = 1,
    high = 2,
    low =3,
    close =4,
    volume =5,
    openinterest=-1)
# 加载建设银行
df = pd.read_csv("./stock/day/601939.XSHG.csv")
df.columns = ['datetime','open','high','low','close','volume','openinterest']
df.index = pd.to_datetime(df['datetime'])
df = df[['open','high','low','close','volume','openinterest']]
df = df[(df.index<=params['todate'])&(df.index>=params['fromdate'])]
feed = bt.feeds.PandasDirectData(dataname = df)
# 添加数据到cerebro
cerebro.adddata(feed, name = "jsyh")

# 加载工商银行
df = pd.read_csv("./stock/day/601398.XSHG.csv")
df.columns = ['datetime','open','high','low','close','volume','openinterest']
df.index = pd.to_datetime(df['datetime'])
df = df[['open','high','low','close','volume','openinterest']]
df = df[(df.index<=params['todate'])&(df.index>=params['fromdate'])]
feed = bt.feeds.PandasDirectData(dataname = df)
# 添加数据到cerebro
cerebro.adddata(feed, name = "gsyh")

# 加载农业银行
df = pd.read_csv("./stock/day/601288.XSHG.csv")
df.columns = ['datetime','open','high','low','close','volume','openinterest']
df.index = pd.to_datetime(df['datetime'])
df = df[['open','high','low','close','volume','openinterest']]
df = df[(df.index<=params['todate'])&(df.index>=params['fromdate'])]
feed = bt.feeds.PandasDirectData(dataname = df)
# 添加数据到cerebro
cerebro.adddata(feed, name = "nyyh")


print("加载数据完毕")
# 添加手续费，按照万分之二收取
cerebro.broker.setcommission(commission=0.0002,stocklike=True)
# 设置初始资金为1亿
cerebro.broker.setcash(1_0000_0000)
# 添加策略
cerebro.addstrategy(test_two_ma_strategy)
cerebro.addanalyzer(bt.analyzers.TotalValue, _name='_TotalValue')
cerebro.addanalyzer(bt.analyzers.PyFolio)
# 运行回测
results = cerebro.run()
# 打印相关信息
pyfoliozer = results[0].analyzers.getbyname('pyfolio')
returns, positions, transactions, gross_lev = pyfoliozer.get_pf_items()
pf.create_full_tear_sheet(
    returns,
    positions=positions,
    transactions=transactions,
    # gross_lev=gross_lev,
    live_start_date='2019-01-01',
    )
