# Turtle Trading Strategy

## Reference:
- 量化课堂--海龟策略.https://www.joinquant.com/view/community/detail/b061c8738b509ec318c03e7af040bb9b
- Debug: Local variable referenced before assignment: The UnboundLocalError in Python. https://blog.arrowhitech.com/local-variable-referenced-before-assignment/
- Python 面向对象编程 OOP--02--类中的单下划线和双下划线 5 种情况。https://blog.csdn.net/u010701274/article/details/122317489
- Numpy中stack()，hstack()，vstack()函数详解。https://blog.csdn.net/csdn15698845876/article/details/73380803
- How to Calculate Volatility as Average True Range (ATR) with Python DataFrames and NumPy.https://www.learnpythonwithrune.org/how-to-calculate-volatility-as-average-true-range-atr-with-python-dataframes-and-numpy/


## Explaination
- Turtle trading strategy is a trend follower which benefits from sustained momentum.
- It is a mature and completed trading system.

## Trading Logic
### 1. Trading Market Rule
- Trade in high liquidity and volatility markets
- Usually trade with commodities, energy, Chinese A shares, metal, Cryptocurrency. 

### 2. Position Sizing Rule
- True Range ($TR$)
  - Formula: $True \space Range=Maximum(H-L, High-PDC, PDC-Low)$
  - Notion:
    - $H$: current high
    - $L$: current low
    - $PDC$: previous day's closing price
- N
  - Formula: $N=\frac{PDC*[-n+1:]+TR}{n}$
  - Normally, we set $n=20$
  - For simplify calculation, we set $N≈ATR(n)=MA(TrueRange, n)$
- Dollar Volatility
  - Formula: $DV=N*Dollars \space per \space Point$
- Unit
  - Formula: $Unit=\frac{1\% * Account}{Market \space DV}$

### 3. Entry Rule
- Use the theory from Donchian Channel to capture momentum, market volatility, and trading signals.
- Divide cash into two parts. One is used in System1(short term system), another is used in System2(long term system).
- **System1**:
  - if price > 20-day high→Buy 1 Unit
  - Position Adding: if current price is larger than last trading price by $0.5N$ under long position→Buy 1 Unit
  - Position Reduction: if current price is lower than last trading price by $0.5N$ under short position→Sell 1 Unit
  - if price < 20-day low→Sell 1 Unit
- **System2**:
  - if price > 55-day high→Buy 1 Unit
  - Position Adding: if current price is larger than last trading price by $0.5N$ under long position→Buy 1 Unit
  - Position Reduction: if current price is lower than last trading price by $0.5N$ under short position→Sell 1 Unit
  - if price < 55-day low→Sell 1 Unit

### 4. Stop-Loss Rule
- If the current price is lower than last trading price by $2N$→Sell all
  - No trade will incur if risk > 20%

### 5. Take Profit Rule
- System1:
  - If price < 10-day low for long position→Sell all
  - If price > 10-day high for short position→Sell all
- System2:
  - If price < 20-day low for long position→Sell all
  - If price > 20-day high for short position→Sell all



In [1]:
!pip install yfinance
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
!pip install pyfolio
import pyfolio as pf
from copy import deepcopy, copy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyfolio
  Downloading pyfolio-0.9.2.tar.gz (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.1/91.1 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting empyrical>=0.5.0 (from pyfolio)
  Downloading empyrical-0.5.5.tar.gz (52 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.8/52.8 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting jedi>=0.16 (from ipython>=3.2.3->pyfolio)
  Downloading jedi-0.18.2-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pyfolio, empyrical
  Build



In [35]:
# get the benchmark trading prices
SP500=yf.download('^GSPC','2021-01-01','2023-04-30', progress=False)[['Open','High','Low','Close','Adj Close']]

In [40]:
#calculate returns
SP500['log_returns']=np.log(SP500['Close'].pct_change().dropna())
SP500['cum_returns']=SP500['log_returns'].cumsum()

  result = getattr(ufunc, method)(*inputs, **kwargs)


In [71]:
#defne the Turtle Trading System
class TurtleTradingSystem:
  def __init__(self,
               tickers,
               init_account=1000000,
               risk_level=2,
               risk_max=0.02,
               system1_entry=20,
               system1_exit=10,
               system2_entry=55,
               system2_exit=20,
               ATR_periods=20,
               system1_allocation=0.5,
               risk_reduction_rate=0.1,
               risk_reduction_level=0.2,
               unit_limit=5,
               adj_units=1,
               start_date='2019-01-01',
               end_date='2023-04-30',
               shorts=True):
    '''
    tickers: list of stock we select to trade;
    init_account: the initialize trading cash amount;
    risk_level: stop loss barrier;
    risk_max: max percentage of account that a trade can take a risk;
    system1_entry: define the number of breakout days for system1 to generate a buy signal;
    system1_exit: define the number of breakout days for system1 to generate a sell signal;
    system2_entry: define the number of breakout days for system2 to generate a buy signal;
    system2_exit: define the number of breakout days for system2 to generate a sell signal;
    ATR_periods: define the number of days for SMA calculation;
    system1_allocation: capital allocation between system1 and system2;
    risk_reduction_rate: the amount of loss before reducing the trading size;
    risk_reduction_level: each increment in risk the system reduces as it loses capital below its initial size;
    shorts: a boolen value to allow short or not;
    '''
    self.tickers=tickers
    self.init_account=init_account
    self.cash=init_account
    self.portfolio_value=init_account
    self.risk_level=risk_level
    self.risk_max = risk_max
    self.system1_entry = system1_entry
    self.system1_exit = system1_exit
    self.system2_entry = system2_entry
    self.system2_exit = system2_exit
    self.system1_allocation = system1_allocation
    self.system2_allocation = 1 - system1_allocation
    self.start = start_date
    self.end = end_date
    self.ATR_periods = ATR_periods
    self.shorts = shorts
    self.unit_limit = unit_limit
    self.risk_reduction_level = risk_reduction_level
    self.risk_reduction_rate = risk_reduction_rate
    self.adj_units = adj_units
    self.last_s1_win = {t: False for t in self.tickers}
    self.system_list = ['S1', 'S2']

    self._prep_data()
  def _prep_data(self):
    self.data=self._get_data()
    self._calculate_breakouts_signal()
    self._calculate_N()

  def _get_data(self):
    yf_stocks=yf.Tickers(self.tickers)
    df=yf_stocks.history(start=self.start, end=self.end)
    df.drop(['Open', 'Dividends', 'Stock Splits', 'Volume'], inplace=True, axis=1)
    df.ffill(inplace=True) #replace missing value with None
    return pd.DataFrame(df.swaplevel(axis=1))
  
  def _calculate_breakouts_signal(self):
    for t in self.tickers:
      self.data[t, 'S1_EntryLong']=self.data[t]['Close'].rolling(self.system1_entry).max()
      self.data[t, 'S1_ExitLong']=self.data[t]['Close'].rolling(self.system1_exit).min()
      self.data[t, 'S2_EntryLong']=self.data[t]['Close'].rolling(self.system2_entry).max()
      self.data[t, 'S2_ExitLong']=self.data[t]['Close'].rolling(self.system2_exit).min()
      if self.shorts:
        self.data[t, 'S1_EntryShort']=self.data[t]['Close'].rolling(self.system1_entry).min()
        self.data[t, 'S1_ExitShort']=self.data[t]['Close'].rolling(self.system1_exit).max()
        self.data[t, 'S1_EntryShort']=self.data[t]['Close'].rolling(self.system2_entry).min()
        self.data[t, 'S1_ExitShort']=self.data[t]['Close'].rolling(self.system2_exit).max()

  def _calculate_N(self):
    '''
    We set N=Moving Average True Range for 20 days
    '''
    for t in self.tickers:
      TR=np.abs([self.data['High']-self.data['Low'], self.data['High'] - self.data['Close'].shift(), self.data['Low'] - self.data['Close'].shift()])
      self.data[t, 'N']=TR.rolling(self.ATR_periods).mean()

  def _check_account_balance(self, shares, price):
    '''
    Check if we have enough capital to purchase.
    If not, we should reset the position to lowest feasible level.
    '''
    if self.cash <= shares*price:
      shares=np.floor(self.cash/price)
    return shares
  
  def _adjust_risk_units(self, units):
    capital_loss=1- self.portfolio_value/self.init_account
    if capital_loss > self.risk_reduction_level:
      scale=np.floor(capital_loss/self.risk_reduction_level)
      units *= (1- scale*self.risk_reduction_rate)
    return units

  def _calculate_portfolio_value(self, portfolio):
    '''
    Return total portfolio value
    '''
    portfolio_value=sum([v1['value'] for v0 in portfolio.values() if type(v0) is dict
                         for k1,v1 in v0.items() if v1 is not None])
    portfolio_value+=self.cash
    if np.isnan(portfolio_value):
      raise ValueError(f'PortfolioValue = {portfolio_value}\n{portfolio}')
    return portfolio_value

  def _get_units(self, system):
    system_all=self.system1_allocation if system==1 else self.system2_allocation
    dollar_units=self.risk_max * self.portfolio_value * system_all
    dollar_units=self._adjust_risk_units(dollar_units)
    return dollar_units
  
  def _size_position(self, data, dollar_units):
    shares=np.floor(dollar_units/(self.risk_level * data['N'] *data['Close']))
    return shares

  def _run_system(self, ticker, data, position, system=1):
    S = system
    price=data['Close']
    global stop_price
    global long
    if np.isnan(price):
      #return the current position for preventing the missing data
      return position
    N=data['N']
    dollar_units=self._get_units(S)
    shares=0
    if position is None:
      if price==data[f'S{S}_EntryLong']: 
        if S==1 and self.last_s1_win[ticker]:
          self.last_s1_win[ticker]=False
          return None
        shares=self._size_position(data, dollar_units)
        stop_price= price - self.risk_level*N
        long=True
      elif self.shorts:
        if price==data[f'S{S}_EntryShort']: 
          if S==1 and self.last_s1_win[ticker]:
            self.last_s1_win=False
            return None
          shares=self._size_position(data, dollar_units)
          stop_price= price + self.risk_level*N
          long=False
      else:
        return None
      if shares==0:
        return None
      #make sure we have enough capital to trade
      shares=self._check_account_balance(shares, price)
      value=price*shares
      self.cash-=value
      position={'units':1,
                'shares':shares,
                'entry_price':price,
                'stop_price': stop_price,
                'entry_N':N,
                'value':value,
                'long':long}
      if np.isnan(self.cash) or self.cash<0:
        raise ValueError(f"Cash Error\n{S}-{ticker}\n{data}\n{position}")
    else:
      if position['long']:
        #check whether we need to exit the previous long position
        if price==data[f'S{S}_ExitLong'] or price<=position['stop_price']:
          self.cash+=position['shares']*price
          if price>=position['entry_price']:
            self.last_s1_win[ticker]=True
          else:
            self.last_s1_win[ticker]=False
          position=None
          #check whether there exists position adding
        elif position['units']<self.unit_limit:
          if price>=position['entry_price']+position['entry_N']:
            shares=self._size_position(data, dollar_units)
            shares=self._check_account_balance(shares, price)
            self.cash= price- self.risk_level*N
            average_price=(position['entry_price']*position['shares']+shares*price)/(position['shares']+shares)
            position['entry_price']= average_price
            position['shares'] += shares
            position['stop_price']=stop_price
            position['units']+=1
      else:
        #check whether we need to exit the previous short position
        if price==data[f'S{S}_ExitShort'] or price>=position['stop_price']:
          self.cash+=position['shares']*price
          if S==1:
            if price<=position['entry_price']:
              self.last_s1_win[ticker]=True
            else:
              self.last_s1_win[ticker]=False
          position=None
       #check whether there exists position reduction
        elif position['units']<self.unit_limit:
          if price <= position['entry_price']-position['entry_N']:
            shares=self._size_position(data, dollar_units)
            shares=self._check_account_balance(shares, price)
            self.cash -= shares*price
            stop_price=price + self.risk_level*N
            average_price=(position['entry_price']*position['shares']+shares*price)/(position['shares']+shares)
            position['entry_price']= average_price
            position['shares'] += shares
            position['stop_price']=stop_price
            position['units']+=1
      if position is not None:
        #update value at each time unit
        position['value']=position['shares']*price
    return position
  
  #run the turtle trading strategy backtest
  def run(self):
    self.portfolio={}
    position={system: {ticker: None for ticker in self.tickers}
              for system in self.system_list}
    for i, (ts, row) in enumerate(self.data.iterrows()):
      for t in self.tickers:
        for s,system in enumerate(self.system_list):
          position[system][t]=self._run_system(t,
                                               row[t],
                                               position[system][t])
          self.portfolio[i]=deepcopy(position)
          self.portfolio[i]['date']=ts
          self.portfolio[i]['cash']=copy(self.cash)
          self.portfolio_value = self._calculate_portfolio_value(self.portfolio[i])

  def get_portfolio_value(self):
    vals=[]
    for v in self.portfolio.values():
      portfolio_value=sum([v1['value'] for v0 in v.values() if type(v0) is dict
                           for k1, v1 in v0.items() if v1 is not None])
      portfolio_value=v['cash']
      vals.append(portfolio_value)
    return pd.Series(vals, index=self.data.index)

  def get_system_data_dict(self):
    global _array
    system_dict={}
    cols=['units','shares','entry_price','stop_price','entry_N','value','long']
    X=np.empty(shape=(len(cols)))
    X[:]=np.nan
    index = [v['date'] for v in self.portfolio.values()]
    for S in self.system_list:
      for t in self.tickers:
        df = pd.DataFrame()
        for i, v in enumerate(self.portfolio.values()):
          d = v[S][t]
          if d is None:
            if i==0:
              _array = X.copy()
            else:
              _array = np.vstack([_array, X]) #stack in vertical way orderly
          else:
            vals = np.array([float(d[i]) for i in cols])
            if i == 0:
              _array = vals.copy()
            else:
              _array = np.vstack([_array, vals])
        df = pd.DataFrame(_array, columns=cols, index=index)
        system_dict[(S, t)] = df.copy()
    return system_dict
  
  def get_transactions(self):
        ddict = self.get_system_data_dict()
        transactions = pd.DataFrame()
        for k, v in ddict.items():
            df = pd.concat([v, self.data[k[1]].copy()], axis=1)
            df.fillna(0, inplace=True)
            rets = df["Close"] / df["entry_price"].shift(1) - 1
            trans = pd.DataFrame(rets[df['shares'].diff() < 0], columns=['Returns'])
            trans['System'] = k[0]
            trans['Stock'] = k[1]
            trans["Long"] = df["long"].shift(1).loc[df["shares"].diff() < 0]
            trans["Units"] = df["units"].shift(1).loc[df["shares"].diff() < 0]
            trans["Entry_Price"] = (df["entry_price"].shift(1).loc[df["shares"].diff() < 0])
            trans["Sell_Price"] = df["Close"].loc[df["shares"].diff() < 0]
            trans["Shares"] = df["shares"].shift(1).loc[df["shares"].diff() < 0]
            trans.reset_index(inplace=True)
            trans.rename(columns={"index": "Date"}, inplace=True)
            transactions = pd.concat([transactions, trans.copy()])

        transactions.reset_index(inplace=True)
        transactions.drop("index", axis=1, inplace=True)
        return transactions

In [69]:
tickers=['AAPL','AMZN','AMD','TSLA','MMM','CRM','DELL','META','GOOG','MSFT','IBM','INTC','NVDA','QCOM','CSCO','HPQ','QUBT','DBX','SNAP']
trading = TurtleTradingSystem(tickers)
trading.run()

[*********************100%***********************]  19 of 19 completed


ValueError: ignored

In [None]:
#get returns of portfolio after applying the turtle trading strategy
portfolio_values = trading.get_portfolio_value()
Algo_log_returns = np.log(portfolio_values.pct_change().dropna())
Algo_cum_rets = Algo_log_returns.cumsum()

In [None]:
#backtesting
!pip uninstall pyfolio
!pip install git+https://github.com/quantopian/pyfolio
performance=pf.create_returns_tear_sheet(Algo_log_returns, SP500['log_returns'])