In [1]:
from typing import List
from functools import wraps

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import yfinance as yf
import datetime as dt

jtplot.style(figsize=(15, 9))

In [2]:
# tickers = ['GLD', 'GDX', 'AAPL', 'SPY']
# # tickers = ['MSFT', 'NVDA', 'AMD', 'META']
# d_start = dt.datetime(2010, 1, 1)
# d_final = dt.datetime(2020, 12, 31)

# df = yf.download(tickers, start=d_start, end=d_final, period='1d', auto_adjust=True)
# df.to_csv(f"../../../data/bt/{'_'.join(tickers)}__1D.csv")

In [2]:
import os

DATA_PATH = '../../data/bt/'

def read_csv_bt(csv:str)->pd.DataFrame:
    file_path = f"{DATA_PATH}{csv}"
    tickers, _ = csv.split('__')
    if len(tickers)>1:
        return pd.read_csv(file_path, parse_dates=True, header=[0, 1], index_col=0)['Close']
    else:
        return pd.read_csv(file_path, parse_dates=True, index_col=0)['Close']

def get_backtest_data()->List[list]:
    bt_files = os.listdir(DATA_PATH)
    csv_files = []
    for csv in bt_files:
        try:
            tickers, period = csv.split('__')
            df = read_csv_bt(csv) # send to some dict with data???
            csv_files.append({
                'csv': csv,
                'tickers': list(map(str.upper, tickers.split('_'))),
                'period': period.replace('.csv', ''),
                'start': df.iloc[0].name.strftime('%d/%m/%Y'),
                'end': df.iloc[-1].name.strftime('%d/%m/%Y')
            })
        except:
            continue
    return csv_files


In [3]:
get_backtest_data()

[{'csv': 'GLD_GDX_AAPL_SPY__1D.csv',
  'tickers': ['GLD', 'GDX', 'AAPL', 'SPY'],
  'period': '1D',
  'start': '04/01/2010',
  'end': '30/12/2020'},
 {'csv': 'gld_gdx_aapl_spy__1H.csv',
  'tickers': ['GLD', 'GDX', 'AAPL', 'SPY'],
  'period': '1H',
  'start': '07/10/2022',
  'end': '07/10/2024'},
 {'csv': 'MSFT_NVDA_AMD_META__1D.csv',
  'tickers': ['MSFT', 'NVDA', 'AMD', 'META'],
  'period': '1D',
  'start': '04/01/2010',
  'end': '30/12/2020'},
 {'csv': 'MSFT_NVDA_AMD_META__1H.csv',
  'tickers': ['MSFT', 'NVDA', 'AMD', 'META'],
  'period': '1H',
  'start': '04/01/2010',
  'end': '30/12/2020'}]

In [3]:
SMA1 = [42, 24, 18, 6]
SMA2 = [252, 180, 64, 22]

def visualize_bt(data):
    data[strategy+['returns']].cumsum().apply(np.exp).plot(title=ticker)

def start_backtesting()->list:
    bt_results_dfs = []
    bt_data_files = get_backtest_data()
    for bt in bt_data_files:
        csv, tickers, period, _, _ = bt.values()
        data = read_csv_bt(csv)

        result_df = pd.DataFrame()
        single_ticker_result = []
        for ticker in tickers:
            result, full_data = sma(
                ticker,
                d=pd.DataFrame(data[ticker]),
                sma1=SMA1, sma2=SMA2,
            )
            single_ticker_result.append(result)

        single_ticker_result = pd.concat(single_ticker_result, axis=1)
        single_ticker_result.index.name = period
        bt_results_dfs.append(single_ticker_result)
    return bt_results_dfs
        
def run_bt(files_for_bt=[], strategy='SMA')->list:
    if files_for_bt:
        for file in files_for_bt:
            pass
    else:
        return start_backtesting()
            

In [8]:
bt_result = run_bt()
bt_result[1]

Unnamed: 0_level_0,GLD,GDX,AAPL,SPY
1H,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
strategy_42_252,1.210578,0.547866,1.141445,1.205247
strategy_24_180,1.001059,0.695432,1.11215,1.183577
strategy_18_64,1.258914,1.151021,1.049129,1.315175
strategy_6_22,0.788741,0.80287,0.819486,1.026587
returns,1.325064,1.176649,1.572987,1.422543
drawdown_42_252,0.102017,0.587091,0.504713,0.140857
drawdown_24_180,0.179167,0.522789,0.306785,0.127559
drawdown_18_64,0.142815,0.557039,0.283586,0.096486
drawdown_6_22,0.329267,0.47127,0.313121,0.177073


## Strategies for testing

In [13]:
def sma(ticker:str, d:pd.DataFrame, sma1:List[int]=[42], sma2:List[int]=[252])->pd.DataFrame:
    d['returns'] = np.log(d[ticker] / d[ticker].shift(1))
    strategy = []
    drawdown = []
    for s1, s2 in zip(sma1, sma2):
        d[f'SMA1_{s1}'] = d[ticker].rolling(s1).mean()
        d[f'SMA2_{s2}'] = d[ticker].rolling(s2).mean()
        d.dropna(inplace=True)
        
        d[f'position_{s1}_{s2}'] = np.where(d[f'SMA1_{s1}'] > d[f'SMA2_{s2}'], 1, -1)
        d[f'strategy_{s1}_{s2}'] = d[f'position_{s1}_{s2}'].shift(1) * d['returns']
        strategy.append(f'strategy_{s1}_{s2}')
        
        d[f'cumret_{s1}_{s2}'] = d[f'strategy_{s1}_{s2}'].cumsum().apply(np.exp)
        d[f'cummax_{s1}_{s2}'] = d[f'cumret_{s1}_{s2}'].cummax()
        d[f'drawdown_{s1}_{s2}'] = d[f'cummax_{s1}_{s2}'] - d[f'cumret_{s1}_{s2}']
        drawdown.append(f'drawdown_{s1}_{s2}')
    
    strategy_result = d[strategy+['returns']].sum().apply(np.exp)
    drawdown_result = d[drawdown].max()
    result = pd.concat([strategy_result, drawdown_result]).to_frame()
    result.rename(columns={0: ticker}, inplace=True)
    
    return result, d.copy()
       

In [28]:
def mom(ticker:str, d:pd.DataFrame, moms:List[int]=[1])->pd.DataFrame:
    d['returns'] = np.log(d[ticker] / d[ticker].shift(1))
    strategy = []
    drawdown = []
    for mom in moms:
        d[f'MOM_{m}'] = d[ticker].rolling(mom).mean()
        d.dropna(inplace=True)
        
        d[f'position_{m}'] = np.sign(d['returns'].rolling(m).mean())
        d[f'strategy_{m}'] = d[f'position_{m}'].shift(1) * data['returns']
        strategy.append(f'strategy_{m}')
        
        d[f'cumret_{m}'] = d[f'strategy_{m}'].cumsum().apply(np.exp)
        d[f'cummax_{m}'] = d[f'cumret_{m}'].cummax()
        d[f'drawdown_{m}'] = d[f'cummax_{m}'] - d[f'cumret_{m}']
        drawdown.append(f'drawdown_{m}')
    
    strategy_result = d[strategy+['returns']].sum().apply(np.exp)
    drawdown_result = d[drawdown].max()
    result = pd.concat([strategy_result, drawdown_result]).to_frame()
    result.rename(columns={0: ticker}, inplace=True)
    
    return result, d.copy()


In [39]:
class Strategy:
    def __init__(self):
        pass
    
    def set_data(self, ticker:str, d:pd.DataFrame):
        self.d = d.copy()
        self.d['returns'] = np.log(d[ticker] / d[ticker].shift(1))
        self.ticker = ticker
        self.strategy = []
        self.drawdown = []


In [40]:
class SMA(Strategy):
    def __init__(self, sma1:List[int]=[42], sma2:List[int]=[252]):
        self.sma1 = sma1
        self.sma2 = sma2
    
    def run_strategy(self):
        for s1, s2 in zip(self.sma1, self.sma2):
            self.d[f'SMA1_{s1}'] = self.d[self.ticker].rolling(s1).mean()
            self.d[f'SMA2_{s2}'] = self.d[self.ticker].rolling(s2).mean()
            self.d.dropna(inplace=True)

            self.d[f'position_{s1}_{s2}'] = np.where(self.d[f'SMA1_{s1}'] > self.d[f'SMA2_{s2}'], 1, -1)
            self.d[f'strategy_{s1}_{s2}'] = self.d[f'position_{s1}_{s2}'].shift(1) * self.d['returns']
            self.strategy.append(f'strategy_{s1}_{s2}')

            self.d[f'cumret_{s1}_{s2}'] = self.d[f'strategy_{s1}_{s2}'].cumsum().apply(np.exp)
            self.d[f'cummax_{s1}_{s2}'] = self.d[f'cumret_{s1}_{s2}'].cummax()
            self.d[f'drawdown_{s1}_{s2}'] = self.d[f'cummax_{s1}_{s2}'] - self.d[f'cumret_{s1}_{s2}']
            self.drawdown.append(f'drawdown_{s1}_{s2}')

        strategy_result = self.d[self.strategy+['returns']].sum().apply(np.exp)
        drawdown_result = self.d[self.drawdown].max()
        result = pd.concat([strategy_result, drawdown_result]).to_frame()
        result.rename(columns={0: self.ticker}, inplace=True)

        return result, self.d.copy()
    

In [41]:
SMA1 = [42, 24, 18, 6]
SMA2 = [252, 180, 64, 22]

def start_backtesting(strategy_cls, *args)->list:
    bt_results_dfs = []
    bt_data_files = get_backtest_data()
    for bt in bt_data_files:
        csv, tickers, period, _, _ = bt.values()
        data = read_csv_bt(csv)

        result_df = pd.DataFrame()
        single_ticker_result = []
        STRATEGY = get_strategy(strategy_cls, *args)
        for ticker in tickers:
            STRATEGY.set_data(ticker, pd.DataFrame(data[ticker]))
            result, full_data = STRATEGY.run_strategy()
            
            single_ticker_result.append(result)

        single_ticker_result = pd.concat(single_ticker_result, axis=1)
        single_ticker_result.index.name = period
        bt_results_dfs.append(single_ticker_result)
    return bt_results_dfs
        
def get_strategy(strategy, *args):
    strategies = dict(sma=SMA(*args))
    return strategies[strategy]


In [42]:
start_backtesting('sma', SMA1, SMA2)

[                      GLD       GDX       AAPL       SPY
 1D                                                      
 strategy_42_252  1.308799  1.065529   6.424632  1.466223
 strategy_24_180  1.162980  1.102915   6.462446  1.668211
 strategy_18_64   1.611339  0.746799   3.299714  1.804156
 strategy_6_22    1.462546  2.269095   5.655584  0.803092
 returns          1.107165  0.732320  10.219678  3.437358
 drawdown_42_252  0.447434  1.041484   4.107096  0.623526
 drawdown_24_180  0.402130  0.787659   1.729420  1.023388
 drawdown_18_64   0.308936  1.408235   1.144106  0.520338
 drawdown_6_22    0.322727  2.556176   1.345458  0.557455,
                       GLD       GDX      AAPL       SPY
 1H                                                     
 strategy_42_252  1.210578  0.547866  1.141445  1.205247
 strategy_24_180  1.001059  0.695432  1.112150  1.183577
 strategy_18_64   1.258914  1.151021  1.049129  1.315175
 strategy_6_22    0.788741  0.802870  0.819486  1.026587
 returns          1