### simulation - with transaction fee, taxes / without dividends

# 초기 조건 / 데이터 임포트 / 벤치마크 계산

## 라이브러리 임포트

In [54]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [55]:
import pandas as pd
import numpy as np
from datetime import datetime
import math

In [56]:
current_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

## 데이터 임포트

In [57]:
df = pd.read_csv('/content/drive/MyDrive/quant_projects/momentum_strategies/SPY.csv')[:-2] # 2021-09까지 결과 출력
closes = df.Close.to_numpy()
opens = df.Open.to_numpy()
df['Date'] = df['Date'].map(lambda x: x[:-3])

In [58]:
tb3_df = pd.read_csv('/content/drive/MyDrive/quant_projects/momentum_strategies/TB3MS.csv')
risk_free_rates = tb3_df.TB3MS.to_numpy() / 100

In [59]:
dividend_df = pd.read_csv('/content/drive/MyDrive/quant_projects/momentum_strategies/SPY_dividends.csv')
dividend_df['Date'] = dividend_df['Date'].map(lambda x: x[:-3])
dividend_df.sort_values(by=['Date'], inplace=True)

In [60]:
dividend_df['Month'] = dividend_df['Date'].map(lambda x: df.index[df['Date']==x].item())

In [61]:
dividends = np.zeros(len(closes))
# 배당이 없는 달은 0으로 처리
for month, div in zip(dividend_df.Month, dividend_df.Dividends.map(lambda x: round(x,3))):
  dividends[month] = div

# Backtesting

In [62]:
strategies = []
returns = []

In [63]:
class Portfolio:
  """
  A) strategies  
    1) backtest 2) buy_and_hold
  B) details in a strategy 
    1) init_portfolio 2) intra_month 3) signal_trading 4) save_result
  C) activities in a strategy
    1) buy 2) sell 3) sell_trade_gains 4) pay_tax 5) capital_gains_tax
  D) results
    1) mdd 2) annual_returns 3) monthly_returns 4) statistics
  """
  transaction_fee_rate = 0.0025
  tax_rate = 0.22
  tax_threshold = 2500000 / 1100

  def __init__(self, benchmark_df, risk_free_rates, dividends, portfolio_config, buy_and_hold=False):
    """
    ex) portfolio_config = {"initial_cash":10000, "benchmark_name":"SPY", "rebalancing_period":6,"look_back":12, "exclude_period":0, "start_month":4,"apply_tax":True, "apply_fee":True}
    """
    self.benchmark_opens = benchmark_df.Open.to_numpy()
    self.benchmark_closes = benchmark_df.Close.to_numpy()
    self.benchmark_highs = benchmark_df.High.to_numpy()
    self.benchmark_lows = benchmark_df.Low.to_numpy()
    self.benchmark_dates = benchmark_df.Date.to_numpy()
    self.risk_free_rates = risk_free_rates
    self.benchmark_dividends = dividends
    self.columns = ["upwards", "cash", "investments", "shares", "buy_prices", "fees", "accum_fees", "dividend_incomes", "month_gains", "annual_gains", "annual_taxes", "accum_taxes", "portfolio_closes", "portfolio_opens", "portfolio_highs", "portfolio_lows"]
    for column in self.columns:
      setattr(self, column, np.zeros(len(self.benchmark_dates)))
    for key, value in portfolio_config.items():
      setattr(self, key, value)
    self.df = pd.DataFrame({'date': self.benchmark_dates, 'opens': self.benchmark_opens, 'closes': self.benchmark_closes, 'highs':self.benchmark_highs, 'lows':self.benchmark_lows})
    portfolio_name = "benchmark_b&h" if buy_and_hold else f"{self.benchmark_name}_{self.rebalancing_period}:{self.look_back}_{self.start_month}to{self.start_month + self.rebalancing_period}ex{self.exclude_period}"
    self.name = portfolio_name + f"{'_tax' if self.apply_tax else ''}{'_fee' if self.apply_fee else ''}{'_div' if len(self.benchmark_dividends) else ''}"

  """A) strategies"""
  def backtest(self):
    trade_month_index = (self.start_month - int(self.benchmark_dates[0][-2:])) % self.rebalancing_period
    while trade_month_index < 0:
      # ex) self.rebalancing_period == 1, trade_month_index == -7 -> trade_month_index becomes 0 
      # ex) self.rebalancing_period == 6, trade_month_index == -1 -> trade_month_index becomes 5 
      trade_month_index += self.rebalancing_period 
    tax_month_index = (12 - int(self.benchmark_dates[0][-2:])) % 12
    self.init_portfolio(self.look_back)
    # self.look_back 시점 이후 거래 시작
    for i in range(self.look_back, len(self.benchmark_dates)):
      # 월말 거래 전
      self.intra_month(i)
      # trade
      self.signal_trading(i, trade_month_index)
      # tax
      if self.apply_tax and (i % 12 == tax_month_index):
        self.capital_gains_tax(i)
      self.accum_taxes[i] = self.accum_taxes[i-1] + self.annual_taxes[i]
      # fees
      self.accum_fees[i] = self.accum_fees[i-1] + self.fees[i]
    self.save_result()
    strategies.append(self.statistics())
    returns.append({"name":self.name, "df":self.df['portfolio_closes'].to_numpy()})

  def buy_and_hold(self, start):
    self.init_portfolio(start)
    for i in range(start, len(self.benchmark_dates)):
      if i == 0:
        self.init_portfolio(1)
      else:
        self.intra_month(i)
      shares_plus = self.cash[i] / (1 + Portfolio.transaction_fee_rate) // self.benchmark_closes[i]
      self.buy(i, shares_plus)
      self.portfolio_closes[i] = self.shares[i] * self.benchmark_closes[i] + self.cash[i]
      # fees
      self.accum_fees[i] = self.accum_fees[i-1] + self.fees[i]
    self.save_result()
    strategies.append(self.statistics())
    returns.append({"name":self.name, "df":self.df['portfolio_closes'].to_numpy()})

  """B) details in a strategy"""
  def init_portfolio(self, init_period):
    self.cash[0:init_period] = self.initial_cash
    self.portfolio_opens[0:init_period] = self.initial_cash
    self.portfolio_closes[0:init_period] = self.initial_cash
    self.portfolio_highs[0:init_period] = self.initial_cash
    self.portfolio_lows[0:init_period] = self.initial_cash
  
  def intra_month(self, i):
    current_dividend = self.benchmark_dividends[i] * self.shares[i-1]
    self.cash[i] = self.cash[i-1] + current_dividend
    self.dividend_incomes[i] = current_dividend
    self.shares[i] = self.shares[i-1]
    self.portfolio_opens[i] = self.shares[i] * self.benchmark_opens[i] + self.cash[i]
    self.portfolio_highs[i] = self.shares[i] * self.benchmark_highs[i] + self.cash[i]
    self.portfolio_lows[i] = self.shares[i] * self.benchmark_lows[i] + self.cash[i]
    self.investments[i] = self.shares[i] * self.benchmark_closes[i]
    self.buy_prices[i] = self.buy_prices[i-1]

  def signal_trading(self, i, trade_month_index):
    # trade with upwards signal
    if i % self.rebalancing_period == trade_month_index:
      self.upwards[i] = 1 if (self.benchmark_closes[i-self.exclude_period] - self.benchmark_closes[i-self.look_back]) / self.benchmark_closes[i-self.look_back] >= self.risk_free_rates[i] else 0
      previus_i = i - self.rebalancing_period
      while previus_i < 0:
        previus_i += 1
      if self.upwards[i] == 1:
        # 0 - 1 (신규 매수) / 1 - 1 (추가 매수)
        shares_plus = self.cash[i] / (1 + Portfolio.transaction_fee_rate) // self.benchmark_closes[i]
        self.buy(i, shares_plus)
      elif self.upwards[previus_i] == 1 and self.upwards[i] == 0:
        # 1 - 0 (전량 매도)
        self.sell(i, self.shares[i])
    self.portfolio_closes[i] = self.shares[i] * self.benchmark_closes[i] + self.cash[i]

  def save_result(self):
    for column in self.columns:
      self.df[column] = getattr(self, column).tolist()
    # self.df['benchmark'] = self.benchmark.tolist()
    # self.df.to_csv(f'/content/drive/MyDrive/quant_projects/momentum_strategies/{current_date}_{self.name}.csv')

  """C) activities in a strategy"""
  def buy(self, i, buy_shares):
    self.shares[i] += buy_shares
    buy_amount = buy_shares * self.benchmark_closes[i]
    self.investments[i] += buy_amount
    self.cash[i] -= buy_amount * (1 + Portfolio.transaction_fee_rate)
    if self.apply_fee:
      self.fees[i] += buy_amount * Portfolio.transaction_fee_rate
    self.buy_prices[i] = (self.buy_prices[i-1] * self.shares[i-1] + buy_amount) / self.shares[i] 

  def sell(self, i, sell_shares):
    self.shares[i] -= sell_shares
    sell_amount = sell_shares * self.benchmark_closes[i]
    self.investments[i] -= sell_amount
    self.cash[i] += sell_amount * (1 - Portfolio.transaction_fee_rate)
    if self.apply_fee:
      self.fees[i] += sell_amount * Portfolio.transaction_fee_rate
    self.buy_prices[i] = 0 if self.shares[i] == 0 else self.buy_prices[i-1]
    self.month_gains[i] += self.sell_trade_gains(i, sell_shares)

  def sell_trade_gains(self, i, sell_shares):
    # returns expected_trade_gains (양도수익 - 매매대금 수수료)
    return (sell_shares * (self.benchmark_closes[i] - self.buy_prices[i-1]) - sell_shares * self.benchmark_closes[i] * Portfolio.transaction_fee_rate)

  def pay_tax(self, i, current_tax):
    # 올해 세금 확정 후 납부
    self.annual_taxes[i] = current_tax
    self.cash[i] -= current_tax

  def capital_gains_tax(self, i):
    self.annual_gains[i] = sum(self.month_gains[i-11:i+1])
    if self.annual_gains[i] > Portfolio.tax_threshold:
      # 1)세금을 내야 하는 경우
      current_tax = (self.annual_gains[i] - Portfolio.tax_threshold) * Portfolio.tax_rate
      if self.cash[i] >= current_tax:
        # 1-1)보유현금이 세금보다 많은 경우
        self.pay_tax(i, current_tax)
      else:
        # 1-2)추가로 매도하여 세금을 내야하는 경우
        shares_minus = math.ceil((current_tax - self.cash[i]) / (1 - Portfolio.transaction_fee_rate) / self.benchmark_closes[i])
        trade_gains = self.sell_trade_gains(i, shares_minus)
        if (self.annual_gains[i] + trade_gains > Portfolio.tax_threshold):
          # 1-2-A)세금을 내기 위해 매도했는데 1-2-A1)확정 수익이 추가로 발생하거나 / 1-2-A2)확정 손실이 작아 세금을 내야 하는 경우
          if trade_gains > 0:
            # 1-2-A1)확정 수익이 추가로 발생 -> 추가분까지 고려하여 매도수량 계산
            shares_minus = math.ceil(((self.annual_gains[i] - Portfolio.tax_threshold) * Portfolio.tax_rate - self.cash[i]) / (1 - Portfolio.transaction_fee_rate) / (self.benchmark_closes[i] * (1 - Portfolio.tax_rate) + self.buy_prices[i-1] * Portfolio.tax_rate))
            trade_gains = self.sell_trade_gains(i, shares_minus)
          # 매도 후 양도수익 업데이트
          self.sell(i, shares_minus)
          self.annual_gains[i] += trade_gains
          # 세금 업데이트 후 납부
          current_tax = (self.annual_gains[i] - Portfolio.tax_threshold) * Portfolio.tax_rate
          self.pay_tax(i, current_tax)
        else:
          # 1-2-B)세금을 내기 위해 매도했는데 세금보다 확정 손실이 커서 세금을 내지 않아도 되는 경우
          # 매도 후 양도수익 업데이트
          self.sell(i, shares_minus)
          self.annual_gains[i] += trade_gains
  
  """D) results"""
  def mdd(self):
    peak_lower = np.argmax(np.maximum.accumulate(self.portfolio_closes) - self.portfolio_closes)
    if peak_lower == 0:
      peak_upper = 0
    else:
      peak_upper = np.argmax(self.portfolio_closes[:peak_lower])
    # 월중 mdd
    # peak_lower = np.argmax((np.maximum.accumulate(self.portfolio_highs) - self.portfolio_lows) / np.maximum.accumulate(self.portfolio_highs))
    # peak_upper = np.argmax(self.portfolio_highs[:peak_lower])
    # print(peak_upper, peak_lower, (self.portfolio_lows[peak_lower] - self.portfolio_highs[peak_upper]) / self.portfolio_highs[peak_upper])
    return (self.portfolio_closes[peak_lower] - self.portfolio_closes[peak_upper]) / self.portfolio_closes[peak_upper]

  def annual_returns(self):
    month_lag = int(self.benchmark_dates[0][-2:]) - 1
    annual_returns = []
    for i in range(len(self.portfolio_closes) // 12 +1):
      if (i+1)*12 > len(self.portfolio_closes):
        annual_returns.append(round((self.portfolio_closes[-1] - self.portfolio_opens[i*12-month_lag]) / self.portfolio_opens[i*12-month_lag],4))
      elif i*12-month_lag < 0:
        annual_returns.append(round((self.portfolio_closes[i*12+11-month_lag] - self.portfolio_opens[0]) / self.portfolio_opens[0], 4))
      else:
        annual_returns.append(round((self.portfolio_closes[i*12+11-month_lag] - self.portfolio_opens[i*12-month_lag]) / self.portfolio_opens[i*12-month_lag],4))
    return annual_returns

  def monthly_returns(self):
    return[round((self.portfolio_closes[i] - self.portfolio_opens[i]) / self.portfolio_opens[i], 4) for i in range(len(self.benchmark_dates))]

  def statistics(self, is_annual=True, is_average=False):
    time_series_returns = self.annual_returns() if is_annual else self.monthly_returns()
    mean_risk_free_rate = np.mean(self.risk_free_rates) if is_average else 0.005
    cagr = round(pow(self.portfolio_closes[-1] / self.portfolio_closes[0], 1 / len(time_series_returns)) - 1, 4)
    mean_return = round(np.mean(time_series_returns),4)
    stddev = round(np.std(time_series_returns),4)
    sharpe = round((np.mean(time_series_returns)- mean_risk_free_rate) / np.std(time_series_returns),4)
    worst_time_series = round(min(time_series_returns), 4)
    best_time_series = round(max(time_series_returns), 4)
    mdd = round(self.mdd(), 4)
    return {
          'name':self.name, 
          'initial_balance': round(self.portfolio_closes[0], 2), 
          'final_balance':round(self.portfolio_closes[-1],2), 
          'cagr_annual':cagr, 
          'mean_annual':mean_return, 
          'stddev':stddev, 
          'sharpe':sharpe, 
          'worst_year':worst_time_series, 
          'best_year': best_time_series, 
          'mdd': mdd, 
          'accum_fees':round(self.accum_fees[-1], 2), 
          'accum_taxes':round(self.accum_taxes[-1], 2)
        } if is_annual else {
          'name':self.name, 
          'initial_balance': round(self.portfolio_closes[0], 2), 
          'final_balance':round(self.portfolio_closes[-1],2), 
          'cagr_monthly':cagr, 
          'mean_monthly':mean_return, 
          'stddev':stddev, 
          'sharpe':sharpe, 
          'worst_month':worst_time_series, 
          'best_month': best_time_series, 
          'mdd': mdd, 
          'accum_fees':round(self.accum_fees[-1], 2), 
          'accum_taxes':round(self.accum_taxes[-1], 2)
        }

In [64]:
# 단일 건 테스트
# portfolio_config = {"initial_cash":10000, "benchmark_name":"SPY", "rebalancing_period":6,"look_back":12, "exclude_period":0, "start_month":4,"apply_tax":True, "apply_fee":True}
# portfolio = Portfolio(df, risk_free_rates, dividends, portfolio_config)
# portfolio.backtest()

In [65]:
bnh_config = {"initial_cash":10000, "benchmark_name":"SPY", "apply_tax":True, "apply_fee":True}
benchmark_bnh = Portfolio(df, risk_free_rates, dividends, bnh_config, buy_and_hold=True)
benchmark_bnh.buy_and_hold(start=0)

In [66]:
benchmark_bnh.statistics()

{'accum_fees': 87.09,
 'accum_taxes': 0.0,
 'best_year': 0.3753,
 'cagr_annual': 0.1014,
 'final_balance': 164101.7,
 'initial_balance': 9975.13,
 'mdd': -0.1948,
 'mean_annual': 0.1119,
 'name': 'benchmark_b&h_tax_fee_div',
 'sharpe': 0.6322,
 'stddev': 0.1691,
 'worst_year': -0.3688}

In [67]:
rebalancing_periods = [1,3,6,12]
look_backs = [1,3,6,12]
for rebalancing_period in rebalancing_periods:
  for look_back in look_backs:
    for index in range(rebalancing_period):
      for exclude_period in range(min(3, look_back)):
        portfolio_details = {"initial_cash":10000, "benchmark_name":"SPY", "rebalancing_period":rebalancing_period,"look_back":look_back, "exclude_period":exclude_period, "start_month":2+index,"apply_tax":True, "apply_fee":True}
        portfolio = Portfolio(df, risk_free_rates, dividends, portfolio_details)
        portfolio.backtest()

# 결과 출력

## statistics / returns csv 출력

In [68]:
statistics_df = pd.DataFrame(strategies)
# print(statistics_df)
# print(statistics_df.iloc[statistics_df['final_balance'].idxmax()])

In [69]:
statistics_df.sort_values(by=['cagr_annual'], inplace=True, ascending=False)

In [70]:
statistics_df

Unnamed: 0,name,initial_balance,final_balance,cagr_annual,mean_annual,stddev,sharpe,worst_year,best_year,mdd,accum_fees,accum_taxes
0,benchmark_b&h_tax_fee_div,9975.13,164101.70,0.1014,0.1119,0.1691,0.6322,-0.3688,0.3753,-0.1948,87.09,0.00
91,SPY_6:12_4to10ex2_tax_fee_div,10000.00,149886.19,0.0979,0.1105,0.1158,0.9105,-0.1008,0.3307,-0.1936,938.34,12309.67
193,SPY_12:12_4to16ex2_tax_fee_div,10000.00,135842.56,0.0941,0.1077,0.1213,0.8467,-0.0536,0.3705,-0.1919,783.97,11840.88
89,SPY_6:12_4to10ex0_tax_fee_div,10000.00,135026.57,0.0939,0.1080,0.1121,0.9184,-0.0539,0.3308,-0.1935,1344.47,18157.35
197,SPY_12:12_6to18ex0_tax_fee_div,10000.00,133468.41,0.0935,0.1017,0.1254,0.7707,-0.1183,0.3297,-0.1928,307.50,2320.96
...,...,...,...,...,...,...,...,...,...,...,...,...
102,SPY_12:1_3to15ex0_tax_fee_div,10000.00,18267.51,0.0210,0.0274,0.1138,0.1971,-0.2207,0.4144,-0.4438,297.94,850.12
142,SPY_12:3_11to23ex2_tax_fee_div,10000.00,16973.31,0.0184,0.0249,0.1102,0.1805,-0.3678,0.2939,-0.4073,386.77,713.19
4,SPY_1:3_2to3ex2_tax_fee_div,10000.00,16549.74,0.0175,0.0186,0.0703,0.1932,-0.1219,0.1686,-0.2585,3800.33,0.00
41,SPY_6:1_2to8ex0_tax_fee_div,10000.00,14215.99,0.0122,0.0141,0.0772,0.1180,-0.1343,0.1911,-0.3178,496.69,0.00


In [71]:
statistics_df.to_csv(f'/content/drive/MyDrive/quant_projects/momentum_strategies/{current_date}_statistics.csv')

In [72]:
returns_df = pd.DataFrame({'Date': df['Date']})
for portfolio in returns:
  returns_df[portfolio['name']] = portfolio['df']
print(returns_df)
returns_df.to_csv(f'/content/drive/MyDrive/quant_projects/momentum_strategies/{current_date}_returns.csv')

        Date  ...  SPY_12:12_13to25ex2_tax_fee_div
0    1993-02  ...                     10000.000000
1    1993-03  ...                     10000.000000
2    1993-04  ...                     10000.000000
3    1993-05  ...                     10000.000000
4    1993-06  ...                     10000.000000
..       ...  ...                              ...
339  2021-05  ...                     50866.373006
340  2021-06  ...                     51993.891686
341  2021-07  ...                     53247.893126
342  2021-08  ...                     54813.891686
343  2021-09  ...                     52294.853726

[344 rows x 222 columns]


## 그래프 출력

In [None]:
chart = returns_df.plot.line(x='Date', grid=True, figsize=(40,25), xticks=np.arange(0, 346, 12), yticks=np.arange(0, 150000, 10000))
chart.tick_params(axis='x', rotation=90)
chart.figure.savefig(f'/content/drive/MyDrive/quant_projects/momentum_strategies/{current_date}_returns.jpg')

Output hidden; open in https://colab.research.google.com to view.