In [None]:
strategy_name = "IC"
verbose = True
plot_graph = True
recent_data_only=True

# equity_index_future_basis_analysis
股指期货基差套利分析

## 背景原理

### 关于股指期货

[https://wiki.mbalib.com/wiki/%E8%82%A1%E6%8C%87%E6%9C%9F%E8%B4%A7](https://wiki.mbalib.com/wiki/%E8%82%A1%E6%8C%87%E6%9C%9F%E8%B4%A7)

股指期货相当于对股票指数中所包含的一篮子股票打包作为期货进行买卖的概念。买入

### 关于贴水和升水

股指期货与现货指数价格的差被称为基差，当股指期货价格高于现货指数价格时，股指期货处于升水，基差为正;反之，股指期货处于贴水，基差为负。

### 关于股指期货贴水获利的原理

[https://xueqiu.com/1445029417/143207882](https://xueqiu.com/1445029417/143207882)



## 回测分析

回测的目的是：
1. 确认在过去的回测期间内，策略是有效可盈利的。
2. 在策略的大框架下选择标的证券。

### 安装环境依赖

### 策略1

最基础的策略为，选定一个标的合约，并不断展期持有。例如持有当月合约，在到期时展期至下一时间的合约。例如在4月份持有 IC2104，并在4月交割日展期为 IC2105。

在这个策略中可选参数为：
1. 合约的期限。当月、下月、当季、下季。
2. 标的。IC、IF、IH

回测后发现，持有下月的 IC 合约，基差获利最多。

### 策略2

在策略1的基础上，每次展期所获得的基差收益来源于两个合约的基差之差。而基差之差是会浮动变化的，具体表现为：

在上涨预期强或者上涨趋势中，基差缩小，甚至部分时候会出现升水。
在震荡预期中，基差保持不变。
在下跌预期中，基差扩大。

利用这个特性，可以对策略1进行优化：
1. 在过去N个交易日中，如果是上涨趋势，则暂缓展期，回退至持有当月合约的策略。
2. 在过去N个交易日中，如果是震荡预期或下跌预期，则持有次月合约，次月合约转当月合约时进行展期。

展期本质上是走某个时间点，将时间价值兑换为对应的基差价值。而对应时点的基差价值是由当时的做空成本决定的，择时展期本质上就是对做空成本的择时。

具体策略表现为，计算每个合约的年化基差率，并将年化基差率作为时间序列进行计算，同时生成技术指标布林通道，布林通道参数为 120 日均值，0.5 标准差。当目前的年化基差率大于布林通道上轨时进行展期。2017年至今回测可以提高20%收益率。

# Prepare Data

In [None]:
if strategy_name == "IC":
    from utils.tushare_data_download import prepare_ic_data_set
    ic_symbol_list = prepare_ic_data_set(recent_data_only)

if strategy_name == "IF":
    from utils.tushare_data_download import prepare_if_data_set
    if_symbol_list = prepare_if_data_set(recent_data_only)

# Plot import

In [None]:
from datetime import datetime
from typing import  Dict
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
from vnpy.app.cta_strategy import ArrayManager
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
import pyqtgraph as pg
from vnpy.trader.ui import create_qapp, QtCore, QtGui
from vnpy.trader.object import BarData
from vnpy.chart.manager import BarManager
from typing import List, Dict, Tuple

In [None]:
plot_app = None
if plot_graph:
    plot_app = create_qapp()

# Backtesting Import

In [None]:
import json
from datetime import datetime
from datetime import timedelta
from importlib import reload
from collections import defaultdict

import vnpy.app.portfolio_strategy
reload(vnpy.app.portfolio_strategy)

from vnpy.app.portfolio_strategy import BacktestingEngine
from vnpy.trader.constant import Interval
from vnpy.trader.utility import BarGenerator, ArrayManager

# Strategy

In [None]:
import re

from typing import List, Dict
from datetime import datetime

import numpy as np

from vnpy.app.portfolio_strategy import StrategyTemplate, StrategyEngine
from vnpy.trader.utility import BarGenerator
from vnpy.trader.object import TickData, BarData

from utils.email_util import send_notification

In [None]:
class IndexFutureBasisStrategy(StrategyTemplate):
    """"""

    author = "chendi"

    # params
    min_basis = 0
    underlying = "000905.SSE"
    basis_return_only = False
    future_prefix = "IC"

    # vars
    underlying_bars = []
    basis_bars = []
    basis_am = ArrayManager(2000)
    
    parameters = [
        "min_basis",
        "underlying",
        "basis_return_only",
        "future_prefix"
    ]
    variables = [
    ]

    def __init__(
        self,
        strategy_engine: StrategyEngine,
        strategy_name: str,
        vt_symbols: List[str],
        setting: dict
    ):
        """"""
        super().__init__(strategy_engine, strategy_name, vt_symbols, setting)

        self.targets: Dict[str, int] = {}

    def on_init(self):
        """
        Callback when strategy is inited.
        """
        self.write_log("策略初始化")

        self.load_bars(1)

    def on_start(self):
        """
        Callback when strategy is started.
        """
        self.write_log("策略启动")

    def on_stop(self):
        """
        Callback when strategy is stopped.
        """
        self.write_log("策略停止")

    def get_expired_date(self, code):
        if code.endswith(".CFFEX"):
            expire_month_str = re.match("{}(\d+).CFFEX".format(self.future_prefix), code).groups()[0]
            expire_month_date = datetime.strptime(expire_month_str + "15", "%y%m%d")
            expire_month_date = expire_month_date.replace(day=(15 + (4 - expire_month_date.weekday()) % 7))
            return expire_month_date
        else:
            return None
        
    def get_curr_pos(self):
        curr_pos = None
        for pos, vol in self.pos.items():
            if not pos.endswith(".CFFEX"):
                continue
            if vol == 0:
                continue
            assert(curr_pos == None)
            curr_pos = pos
        return curr_pos
        
    def on_bars(self, bars: Dict[str, BarData]):
        """""" 
        self.cancel_all()
        
        current_date = list(bars.values())[0].datetime
        curr_pos = self.get_curr_pos()

        # 当前有数据的合约列表
        code_list = sorted(filter(lambda x: x.endswith(".CFFEX"), bars.keys()))
        if verbose:
            print(f"{current_date}  code list: {code_list}")
        
        # Check if we have position, buy first future if no position
        if curr_pos == None:
            symbol_to_buy = code_list[0]
            self.write_log("Init buy:" + symbol_to_buy)
            price = bars[symbol_to_buy].close_price * 1.08
            self.buy(vt_symbol=code_list[0], price=price, volume=1)
            # If we only want to analyze basis return, sell an underlying to hedge the beta.
            if self.basis_return_only:
                self.sell(vt_symbol=self.underlying, price=bars[self.underlying].close_price * 0.92, volume=1)
            self.put_event()
            return 
        
        # If we find future basis more attractive, we roll
        # basis point = underlying price - future price
        basis_point_map = {}
        basis_point_percentage_map = {}
        # relative basis point = next month's basis point - current month's basis point
        relative_basis_point_map = {}
        # relative basis_percentage = (relative basis point / relative expire day / underlying price) * 365
        relative_basis_percentage_map = {}

        for code in code_list:
            basis_point = bars[self.underlying].close_price - bars[code].close_price
            basis_point_map[code] = basis_point
            
        # Anualized basis return percentage
        for code in code_list:
            days_to_expire = (self.get_expired_date(code) - current_date).days + 1
            basis_point_percentage_map[code] = basis_point_map[code] * 100 / bars[self.underlying].close_price / days_to_expire * 365
            
            relative_basis_point_map[code] = basis_point_map[code] - basis_point_map[code_list[0]]
            relative_days_to_expire = (self.get_expired_date(code) - self.get_expired_date(code_list[0])).days + 1
            adj_basis_point = relative_basis_point_map[code] * 100 / bars[self.underlying].close_price / relative_days_to_expire * 365
            relative_basis_percentage_map[code] = adj_basis_point
            

        
        # Update basis bar time series
        near_code = code_list[1]
        
        avg_adj_basis = sum([x for x in relative_basis_percentage_map.values() if x != 0]) / 3
        #print(avg_adj_basis)
        underlying_bar = bars[self.underlying]
        self.underlying_bars.append(underlying_bar)

        basis_bar = BarData(gateway_name='backtesting', 
                          symbol=underlying_bar.symbol + "basis",  
                          exchange=underlying_bar.exchange, 
                          datetime=underlying_bar.datetime,
                          interval=underlying_bar.interval,
                          volume=underlying_bar.volume,
                          open_price=avg_adj_basis,
                          high_price=avg_adj_basis,
                          low_price=avg_adj_basis,
                          close_price=avg_adj_basis
                         )
        self.basis_bars.append(basis_bar)
        
        self.basis_am.update_bar(basis_bar)

        indicator_sma120 = self.basis_am.sma(120)
        boll_up, boll_down = self.basis_am.boll(120, 0.5)

        if verbose:
            print(f"Relative basis map: {relative_basis_percentage_map} boll_up: {boll_up}")
        
        
        # Calculate which code to roll
        roll_to_code = None
        basis_map_str = json.dumps(basis_point_percentage_map, indent=2)
        relative_basis_percentage_map_str = json.dumps(relative_basis_percentage_map, indent=2)
        roll_reason = f"basis_map {basis_map_str} \n\n relative_basis_map {relative_basis_percentage_map_str} \n\n boll_up {boll_up}"
       
        days_to_expire = (self.get_expired_date(curr_pos).date() - current_date.date()).days
        if verbose:
            print(f"{current_date} 当前合约 {curr_pos} {days_to_expire} 日后到期")
        if days_to_expire == 1:
            roll_reason += "\n\n当前合约{curr_pos} {days_to_expire} 日后到期，即将到期，强制滚动"
            roll_to_code = code_list[1]
            
        
        for code in code_list:
            # 只向后滚动，获取额外基差
            if code <= curr_pos:
                continue
                
            # 基差为负数，不操作
            if relative_basis_percentage_map[code] < self.min_basis:
                continue

            # 只有在基差大于 boll_up 
            if relative_basis_percentage_map[code] < boll_up:
                roll_reason += f"\n\n{code} relative basis < boll_up"
                continue
            
            # 只有基差更大时滚动
            if basis_point_percentage_map[code] < basis_point_percentage_map[curr_pos]:
                roll_reason += f"\n\n{code} relative basis < {curr_pos}"
                continue
            
            roll_reason += f"\n\nRoll {curr_pos} to {code}"
            roll_to_code = code
            break
                
        # 强制挂钩实盘仓位
        if current_date.date() == datetime(2021,6,28).date() and "IC2107.CFFEX" in code_list:
            roll_to_code = "IC2107.CFFEX"
        if current_date.date() == datetime(2021,6,28).date() and "IF2107.CFFEX" in code_list:
            roll_to_code = "IF2107.CFFEX"
        
        msg = f"{current_date} {roll_reason}"
        if roll_to_code is not None:
            # Roll position:
            print(msg)
            self.sell(curr_pos, price=bars[curr_pos].close_price * 0.92, volume=1)
            self.buy(roll_to_code, price=bars[roll_to_code].close_price * 1.08, volume=1)
            
        # 如果日期是今日，发送操作邮件
        if current_date.date() == datetime.today().date():
            print("Sending action notification")
            if roll_to_code:
                send_notification(f"滚{strategy_name}策略 - 明日滚动", msg)
            else:
                send_notification(f"滚{strategy_name}策略 - 不滚动", msg)

        self.put_event()

In [None]:
engine = BacktestingEngine()

rates = defaultdict(lambda:0)
slippages = defaultdict(lambda:0)
sizes = defaultdict(lambda:300)
priceticks = defaultdict(lambda:0.1)

if strategy_name == "IC":
    start = datetime(2017,1,12)
    end = datetime.today()
    
    engine.set_parameters(
        vt_symbols=ic_symbol_list,
        interval=Interval.DAILY,
        start=start,
        end=end,
        rates=rates,
        slippages=slippages,
        sizes=sizes,
        priceticks=priceticks,
        capital=1_000_000,
    )

    setting = {
        "min_basis": 0,
        "underlying": '000905.SSE',
        "basis_return_only": False,
        "future_prefix": "IC"
    }
    engine.add_strategy(IndexFutureBasisStrategy, setting)

if strategy_name == "IF":
    start = datetime(2017,1,1)
    end = datetime.today()
    
    engine.set_parameters(
        vt_symbols=if_symbol_list,
        interval=Interval.DAILY,
        start=start,
        end=end,
        rates=rates,
        slippages=slippages,
        sizes=sizes,
        priceticks=priceticks,
        capital=1_000_000,
    )

    setting = {
        "min_basis": 0,
        "underlying": '000300.SSE',
        "basis_return_only": False,
        "future_prefix": "IF"
    }
    engine.add_strategy(IndexFutureBasisStrategy, setting)

In [None]:
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()

In [None]:
engine.calculate_statistics()
engine.show_chart()

# Plot candle graph

In [None]:
class BaseIndicatorLine(CandleItem):
    """自定义指标显示"""
    bar_attr_name="down_line"
    rgb_color = (100, 100, 255)
    
    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.pen: QtGui.QPen = pg.mkPen(color=self.rgb_color, width=2)
        self.indicator_data: Dict[int, float] = {}

    def get_indicator_value(self, ix: int) -> float:
        """"""
        if not self.indicator_data:
            bars = self._manager.get_all_bars()
            indicator_array = [getattr(bar, self.bar_attr_name) for bar in bars]

            for n, value in enumerate(indicator_array):
                self.indicator_data[n] = value

        return self.indicator_data.get(ix, 0)


    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        indicator_value = self.get_indicator_value(ix)
        last_indicator_value = self.get_indicator_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_indicator_value)
        end_point = QtCore.QPointF(ix, indicator_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        rect = QtCore.QRectF(
            0,
            0,
            len(self._bar_picutures),
            30
        )
        return rect
    
    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        return -10, 30
    
    def get_info_text(self, ix: int) -> str:
        """"""
        value = self.indicator_data.get(ix, "-")
        text = f"{self.bar_attr_name} {value}"
        return text



In [None]:
# Compute indicator
if plot_app is not None:
    underlying_bars = engine.strategy.underlying_bars
    basis_bars = engine.strategy.basis_bars
    assert(len(underlying_bars) == len(basis_bars))
    template_bar = underlying_bars[-1]

    basis_am = ArrayManager(2000)
    for attribute_name in dir(template_bar):
        if not attribute_name.startswith("indicator_"):
            continue
        for ix, bar in enumerate(underlying_bars):
            delattr(bar, attribute_name)

    for ix, bar in enumerate(underlying_bars):
        basis_am.update_bar(basis_bars[ix])

        bar.indicator_basis = basis_bars[ix].close_price
        bar.indicator_sma120 = basis_am.sma(120)
        bar.indicator_boll_up, bar.indicator_boll_down = basis_am.boll(120, 2)

    # Start plot
    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("indicator", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=250)

    color_list = [
        (255, 0, 0),
        (128, 0, 0),
        (255, 255, 0),
        (128, 128, 0),
        (0, 255, 0),
        (0, 128, 0),
        (0, 128, 128),
        (0, 0, 255)
    ]
    curr_color = 0
    for attribute_name in dir(template_bar):
        if not attribute_name.startswith("indicator_"):
            continue
        indicator_class = type('SubClass', (BaseIndicatorLine,), {'bar_attr_name': attribute_name, 'rgb_color': color_list[curr_color]})
        widget.add_item(indicator_class, attribute_name, "indicator")
        curr_color += 1

    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")


    widget.add_cursor()

    widget.update_history(underlying_bars)

    widget.show()
    app.exec_()