# 海龟ATR仓位管理
信号沿用布林带指标策略

In [1]:
# 数据接口 
import akshare as ak
import baostock as bs
import tushare as ts

# 基础模块
import datetime as dt
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# 回测框架
import backtrader as bt
import backtrader.indicators as btind
import backtrader.feeds as btfeed

# 基础函数
import utilsJ

## 策略主体

买入信号： 由下向上突破布林带上界和下界时买入。

卖出信号： 由上向下突破布林带上界和下界时卖出。

买卖仓位：每次买入和卖出时根据海龟ATR模型计算手数，若开仓直接买入最优化手数，若加仓/反向开仓则调整至最优持有手数。

### 单股版本

In [None]:
class ATR_s(bt.Strategy):
    params = (
        ('printlog', False),
        ('atr_period', 14),
        ('atr_percent', 1),
        ('atr_risk', 1),
        ('b_per', 20),
        ('b_dev', 2),
        ('allow_short', False)
    )
    
    def log(self, txt, dt=None, doprint=False):
        ''' Logging function fot this strategy'''
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print('%s: %s' % (dt.isoformat(), txt))
        
    def __init__(self):

        # Initialization
        self.order = None
        self.buyprice = None
        self.sellprice = None

        # Alias
        self.dataclose = self.data.close
        self.datahigh = self.data.high
        self.datalow = self.data.low

        # Indicators
        ## ATR
        self.atr_initial = self.broker.get_cash()
        self.tr = btind.Max((self.datahigh - self.datalow),
                            abs(self.dataclose(-1) - self.datahigh),
                            abs(self.dataclose(-1) - self.datalow))
        self.atr = btind.SMA(self.tr, period=self.p.atr_period)
        
        ## Bollinger Bands 
        self.Bolling = btind.BollingerBands(self.dataclose, period=self.p.b_per, devfactor=self.p.b_dev)
        self.Bolling_buy = bt.Or(bt.And(self.dataclose > self.Bolling.top,
                                        self.dataclose(-1) < self.Bolling.top(-1)),
                                 bt.And(self.dataclose > self.Bolling.bot,
                                        self.dataclose(-1) < self.Bolling.bot(-1)))
        self.Bolling_sell = bt.Or(bt.And(self.dataclose < self.Bolling.top,
                                        self.dataclose(-1) > self.Bolling.top(-1)),
                                  bt.And(self.dataclose < self.Bolling.bot,
                                        self.dataclose(-1) > self.Bolling.bot(-1)))


    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # Buy/Sell order submitted/accepted to/by broker - Nothing to do
            return

        # Check if an order has been completed
        # Attention: broker could reject order if not enough cash
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log('BUY EXECUTED, Price: %.2f, Lot:%i, Position:%i, Cash: %i, Value: %i' %
                         (order.executed.price, order.executed.size,
                          self.getposition(self.data).size,
                          self.broker.get_cash(), self.broker.get_value()))
                self.buyprice = order.executed.price


            else:  # Sell
                self.log('SELL EXECUTED, Price: %.2f, Lot:%i, Position:%i, Cash: %i, Value: %i' %
                         (order.executed.price, -order.executed.size,
                          self.getposition(self.data).size,
                          self.broker.get_cash(), self.broker.get_value()))
                self.sellprice = order.executed.price

        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log('Order Canceled/Margin/Rejected')

        # Write down: no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' %
                 (trade.pnl, trade.pnlcomm))


    def next(self):

        # Check if an order is pending ... if yes, we cannot send a 2nd one
        if self.order:
            return
 
        # Buy Singal
        if self.Bolling_buy[0]:
            opt_position = np.round(self.atr_initial * self.p.atr_percent / 100 / self.atr[0])
            if opt_position > self.getposition(self.data).size:
                self.log('BUY CREATE, Price: %.2f, Lots: %i, ATR: %.2f' % 
                         (self.dataclose[0], opt_position-self.getposition(self.data).size, self.atr[0]))
                self.order = self.buy(size=opt_position-self.getposition(self.data).size)

            elif opt_position < self.getposition(self.data).size:
                self.log('SELL CREATE, Price: %.2f, Lots: %i, ATR: %.2f' % 
                            (self.dataclose[0], self.getposition(self.data).size-opt_position, self.atr[0]))
                self.order = self.sell(size = self.getposition(self.data).size - opt_position)                    

        # Sell Singal
        elif self.Bolling_sell[0]:
            if self.p.allow_short:
                opt_position = np.round(self.atr_initial * self.p.atr_percent / 100 / self.atr[0])
                if -opt_position < self.getposition(self.data).size:
                    self.log('SELL CREATE, Price: %.2f, Lots: %i, ATR: %.2f' % 
                             (self.dataclose[0], opt_position+self.getposition(self.data).size, self.atr[0]))
                    self.order = self.sell(size=opt_position+self.getposition(self.data).size)
                elif -opt_position > self.getposition(self.data).size:
                    self.log('BUY CREATE, Price: %.2f, Lots: %i, ATR: %.2f' % 
                             (self.dataclose[0], -self.getposition(self.data).size-opt_position, self.atr[0]))
                    self.order = self.buy(size=-self.getposition(self.data).size-opt_position)


    def stop(self):
        self.log('Ending Position %i. Ending Value %.2f, Net Profit: %.2f%%, (%i, %.1f)' %
                 (self.getposition(self.data).size, self.broker.getvalue(), (self.broker.getvalue()/self.atr_initial-1)*100, 
                  self.params.atr_period, self.params.atr_percent), doprint=True)

### 多股版本

In [32]:
class ATR_m(bt.Strategy):

    params = (
        ('printlog', False),
        ('atr_period', 14),
        ('atr_percent', 1),
        ('atr_risk', 1),
        ('b_per', 20),
        ('b_dev', 2),
        ('pstake', 100),
    )


    def log(self, txt, dt=None, doprint=False):
        ''' Logging function fot this strategy'''
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print('%s: %s' % (dt.isoformat(), txt))
            with open('log.txt', 'a') as file:
                file.write('%s: %s \n' % (dt.isoformat(), txt))
        
    
    def __init__(self):

        # Global initialization
        self.inds = dict()
        self.tracker = None
        self.atr_initial = self.broker.get_cash()

        for d in self.datas:
            # Local initialization
            self.inds[d] = dict()
            self.inds[d]['order'] = None
            self.inds[d]['buyprice'] = None
            self.inds[d]['sellprice'] = None

            # Indicators
            ## Bollinger Bands
            self.inds[d]['Bollinger'] = btind.BollingerBands(d.close, 
                                                             period=self.p.b_per, 
                                                             devfactor=self.p.b_dev)

            ## ATR
            self.inds[d]['tr'] = btind.Max((d.high - d.low), 
                                           abs(d.close(-1) - d.high),
                                           abs(d.close(-1) - d.low))
            self.inds[d]['atr'] = btind.SMA(self.inds[d]['tr'], period=self.p.atr_period)


            # Signals
            self.inds[d]['Bollinger_buy'] = bt.Or(bt.And(d.close > self.inds[d]['Bollinger'].top,
                                                         d.close(-1) < self.inds[d]['Bollinger'].top(-1)),
                                                  bt.And(d.close > self.inds[d]['Bollinger'].bot,
                                                         d.close(-1) < self.inds[d]['Bollinger'].bot(-1)))
            self.inds[d]['Bollinger_sell'] = bt.Or(bt.And(d.close < self.inds[d]['Bollinger'].top,
                                                          d.close(-1) > self.inds[d]['Bollinger'].top(-1)),
                                                   bt.And(d.close < self.inds[d]['Bollinger'].bot,
                                                          d.close(-1) > self.inds[d]['Bollinger'].bot(-1)))                                                         


    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # Buy/Sell order submitted/accepted to/by broker - Nothing to do
            return

        # Check if an order has been completed
        # Attention: broker could reject order if not enough cash
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log('BUY EXECUTED, Price: %.2f, Lot:%i, Cash: %i, Value: %i' %
                         (order.executed.price,
                          order.executed.size,
                          self.broker.get_cash(),
                          self.broker.get_value()))
                self.inds[self.tracker]['buyprice'] = order.executed.price

            else:  # Sell
                self.log('SELL EXECUTED, Price: %.2f, Lot:%i, Cash: %i, Value: %i' %
                        (order.executed.price,
                          -order.executed.size,
                          self.broker.get_cash(),
                          self.broker.get_value()))
                self.inds[self.tracker]['sellprice'] = order.executed.price

        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log('Order Canceled/Margin/Rejected')


    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' %
                 (trade.pnl, trade.pnlcomm))


    def next(self):
        
        for d in self.datas:
            self.tracker = d
            # Buy
            if self.inds[d]['Bollinger_buy'][0]:
                opt_pos = np.round(self.atr_initial*self.p.atr_percent/100/self.inds[d]['atr'][0]/self.p.pstake)               
                if opt_pos * self.p.pstake > self.getposition(d).size:
                    self.log('BUY CREATE, Stock code:%s, Price: %.2f, Lots: %i, ATR: %.2f, Current Position: %i' % 
                             (d._name, d.close[0], opt_pos*self.p.pstake-self.getposition(d).size, self.inds[d]['atr'][0], self.getposition(d).size))
                    self.inds[d]['order'] = self.buy(data=d, size=opt_pos*self.p.pstake-self.getposition(d).size, name=d._name)
                elif opt_pos * self.p.pstake < self.getposition(d).size and self.getposition(d).size > 0:
                    self.log('SELL CREATE, Stock code:%s, Price: %.2f, Lots: %i, ATR: %.2f, Current Position: %i' % 
                             (d._name, d.close[0], self.getposition(d).size-opt_pos*self.p.pstake, self.inds[d]['atr'][0], self.getposition(d).size))
                    self.inds[d]['order'] = self.sell(data=d, size=self.getposition(d).size-opt_pos*self.p.pstake, name=d._name)
            # Sell
            elif self.inds[d]['Bollinger_sell'][0]:
                    if self.getposition(d).size > 0:
                        self.log('SELL CREATE (Close), Stock code:%s, Price: %.2f, Lots: %i' % 
                                 (d._name, d.close[0], self.getposition(d).size))
                        self.inds[d]['order'] = self.close(data=d, name=d._name)            


    def stop(self):
        self.log('Ending Value %.2f, Net Profit: %.2f%%' %
                 (self.broker.getvalue(), (self.broker.getvalue()/self.params.initial-1)*100), doprint=True)

## 回测

### 单股回测
选用沪深300股指期货作为标的

In [None]:
startdate = dt.date(2020, 1, 1) - dt.timedelta(days=20)
enddate = dt.date(2020, 12, 31)
stock_index = '000300.SH'


if __name__ == '__main__':
    # Create a cerebro entity
    cerebro = bt.Cerebro()

    # Add a strategy
    strats = cerebro.addstrategy(ATR_s, printlog=True, initial=100000,  
                                 b_per=21, b_dev=2, atr_period=3, atr_percent=2) 

    # Create a Data Feed
    df = utilsJ.get_stock(stock_index, startdate, enddate)
    data = bt.feeds.PandasData(dataname=df,fromdate=startdate,todate=enddate)

    # Set cash inside the strategy
    cerebro.broker = bt.brokers.BackBroker(coc=True)
    cerebro.broker.setcash(100000)

    # Add the Data Feed to Cerebo
    cerebro.adddata(data)

    # Set leverage
    cerebro.broker.setcommission(mult=10)

    # Print out the starting conditions
    start_value = cerebro.broker.getvalue()
    #print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    # Run over everything
    cerebro.run()

    # Print out the final result
    final_value = cerebro.broker.getvalue()
    netp = (final_value / start_value -1)*100

### 多股回测

注：设置本金为5000000， 1ATR波动等同于总资金0.01%的波动是避免以下两种情况：
1. 资金不足无法执行买入信号。
2. 有买入信号，但是ATR计算后最优开仓不足100股导致不执行信号。

In [33]:
if __name__ ==  '__main__':
    # Create a cerebro entity
    cerebro = bt.Cerebro()

    # Add a strategy
    strats = cerebro.addstrategy(ATRStrategy_zz1000, printlog = False, initial = 5000000,  
                                 b_per = 20, b_dev = 2, atr_period = 14, atr_percent = 0.01) 

    s_date = dt.date(2020, 12, 31) - dt.timedelta(days = 365)
    e_date = dt.date(2020, 12, 31)
    stock_index = '000300.SH'

    # Download Data
    #utilsJ.index_to_csv_tushare('74f1379591c9d810854fa5891fffcacaba514b82bf17ec2e239025b6', stock_index,
    #                             0.5, s_date, e_date)

    # Create stock Data Feed
    pro = ts.pro_api('74f1379591c9d810854fa5891fffcacaba514b82bf17ec2e239025b6')
    index_list = np.unique(pro.index_weight(index_code=stock_index, 
                              start_date=s_date.strftime('%Y%m%d'), 
                              end_date=e_date.strftime('%Y%m%d')).con_code).tolist()

    for s_code in index_list:
        df = pd.read_csv('.\\Data\\' + s_code + '.csv')
        df.index=pd.to_datetime(df.trade_date)
        data = bt.feeds.PandasData(dataname=df,fromdate=s_date,todate=e_date)

        # Add the index Data Feed to Cerebo
        cerebro.adddata(data, name = s_code)

    # Single stock testing 
#    df = get_data_ts('74f1379591c9d810854fa5891fffcacaba514b82bf17ec2e239025b6', 
#                        '000001.SZ', s_date, e_date)
#    data = bt.feeds.PandasData(dataname=df,fromdate=s_date,todate=e_date)
#    cerebro.adddata(data, name = '600600.SH')
    
    # Set cash inside the strategy
    cerebro.broker = bt.brokers.BackBroker(coc=True)   
    cerebro.broker.setcash(5000000)

    # Set leverage
    #cerebro.broker.setcommission()

    # Print out the starting conditions
    start_value = cerebro.broker.getvalue()
    #print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    # Run over everything
    cerebro.run()

    # Print out the final result
    final_value = cerebro.broker.getvalue()

2020-12-31: Ending Value 5187656.00, Net Profit: 3.75%
