<a href="https://colab.research.google.com/github/KitHaywood/Crypto-Backtester/blob/main/VectorisedBacktesting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as _np
from scipy.optimize import curve_fit
from scipy.misc import derivative
import yfinance as yf
import random as rd
from datetime import datetime, timedelta
from tqdm import tqdm
import time
import itertools
import numpy as np
from autograd import grad, jacobian


In [2]:
def _get_1y_hourly_data(instr: str) -> pd.DataFrame:

    t = yf.Ticker(instr)
    dr = pd.date_range(
        start=datetime.today()-timedelta(days=365),
        end=datetime.today(),
        freq='5D'
    )

    intervals = [(dr[i],dr[i+1]) for i in range(len(dr)-1)]
    df = pd.DataFrame()

    for s,e in tqdm(intervals):

        _df = yf.download(
            tickers=instr,
            start=s,
            end=e,
            interval='1h'
        )

    #  period="5d", interval="1m"
        df = pd.concat([df,_df])
        time.sleep(0)

    return df

instr = 'TSLA'
df = _get_1y_hourly_data(instr)
df = df.drop(['Adj Close','Volume'],axis=1)
df.index = df.index.tz_localize(None)

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%*******

In [3]:
import itertools
outdf = pd.DataFrame()

#################################################################################
# Creating Random 1 Minute Data using highs and lows at random minutes in the hour
#################################################################################

for i,r in tqdm(df.iterrows()):

    hidx,lidx = rd.sample(list(range(2,59)),2)

    if hidx > lidx:
        early = sorted([round(rd.uniform(r['Open'],r['Low']),4) for _ in range(lidx-1)])[::-1]
        mid = sorted([round(rd.uniform(r['Low'],r['High']),4) for _ in range(hidx-lidx-1)])
        late = sorted([round(rd.uniform(r['Low'],r['Close']),4) for _ in range(59-hidx)])[::-1]
    else:
        early = sorted([round(rd.uniform(r['Open'],r['High']),4) for _ in range(hidx-1)])
        mid = sorted([round(rd.uniform(r['Low'],r['High']),4) for _ in range(lidx-hidx-1)])[::-1]
        late = sorted([round(rd.uniform(r['High'],r['Close']),4) for _ in range(59-lidx)])

    new_vals = list(itertools.chain(early, mid, late))

    new_vals.insert(lidx,r['Low'])
    new_vals.insert(hidx,r['High'])

    _df = pd.DataFrame(
        new_vals,
        columns=['Close'],
        index=pd.date_range(
            start=r.name,
            end=r.name + timedelta(minutes=58),
            freq='1min'
        ))

    outdf = pd.concat([outdf,_df])



1746it [00:10, 165.51it/s]


In [5]:
outdf.to_csv("/content/drive/MyDrive/DATA/tsla_data.csv")

In [None]:
from autograd import jacobian
import numba as nb
import time
import warnings
from typing import Callable, Tuple
warnings.filterwarnings('ignore')



def func(x: float ,*a: list[float]) -> float:
    return a[0] + a[1] * np.exp(x) + a[2] * np.exp(x)

def strides(a, L, S=1 ):  # Window len = L, Stride len/stepsize = S
    nrows = ((a.size-L)//S)+1
    n = a.strides[0]
    return _np.lib.stride_tricks.as_strided(a, shape=(nrows,L), strides=(S*n,n))

def normalize_negative_one(a):
    div = np.max(a,axis=1) - np.min(a,axis=1)
    div = np.dstack([div]*a.shape[1])[0]
    normalized_input = (a - np.dstack([np.min(a,axis=1)] * a.shape[1])[0]) / div
    return (2 * normalized_input) - 1

def num_trades(s):
    return (np.diff(s)!=0).sum()

def _mdd(a):
    acc_max = np.maximum.accumulate(a)
    return (a - acc_max).min()

def _calc_nt_score(x: int, sensible_nt: int) -> float:
    if x == 0:
        return 0
    elif x < sensible_nt:
        return abs(1 - (sensible_nt/x))
    elif x > sensible_nt:
        return 1 / (x/sensible_nt)**2
    elif x == sensible_nt:
        return 1

def calc_all_indicators(prices: pd.DataFrame, windows: np.ndarray, btdepth: int) -> np.ndarray:
    """
    RETURNS:
    indicator: np.ndarray -> (len(windows) ** 2, 2, btdepth)
    thresholds: np.ndarray -> (len(windows) ** 2, 2 btdepth)
    """
    der1 = {}
    der2 = {}

    vals = prices['Close'].to_numpy()

    for a in tqdm(windows):

        y =  np.linspace(0,1,a)
        s = normalize_negative_one(strides(vals,int(a),1))
        coefs = [curve_fit(func, x,y,p0=[1.]*3,ftol=5e-1)[0] for x in s]
        der1[a] =  [derivative(func, 1, dx=0.001,args=c, n=1) for c in coefs][-btdepth:]
        der2[a] =  [derivative(func, 1, dx=0.001,args=c, n=2) for c in coefs][-btdepth:]

    d1 = np.array([list(der1.values())])[0]
    d2 = np.array([list(der2.values())])[0]

    ind = np.array(list(itertools.product(d1,d2)))
    threshold = np.linspace(0,np.max(ind.flat),100)
    threshold = np.array(list(itertools.product(threshold,threshold)))
    t = np.repeat(threshold[:,:,np.newaxis],ind.shape[-1],axis=2)
    return ind, t, coefs

def calc_sig(ind: np.ndarray, thresh: np.ndarray,axis: int = 1) -> np.array:
    sig = np.where(ind > thresh, 1, 0)
    sig = np.where(ind < -thresh, -1, sig)
    sig = np.sum(sig,axis=axis)
    conds = [sig == 2, sig == -2]
    opt = [1,-1]
    sig = np.select(conds, opt, default=0)
    return sig


def calc_rets(prices: np.ndarray, sig: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    p_returns = np.diff(prices)/prices[0]
    p_returns = np.pad(p_returns,(0,1))
    p_cum_returns = np.cumsum(p_returns)
    sig_rets = sig * p_returns
    cr = np.cumsum(sig_rets,axis=1)
    return sig_rets, cr


def run_opter_bt(prices: np.ndarray, ind: np.ndarray, thresh: np.ndarray, wghts: np.ndarray) -> np.ndarray:
    """
    TODO: Signal Lag not accounted for - 20 consecutive signals required -- tricky
    """

    # Signal Calculation
    sig = calc_sig(ind, thresh)
    print(sig.shape)
    print(prices.shape)
    sig_rets, cr = calc_rets(prices,sig)

    # BT Results
    sharpe = (np.mean(sig_rets,axis=1) / np.std(sig_rets,axis=1)) * np.sqrt(sig.shape[-1])
    sharpe = np.nan_to_num(sharpe)
    mdd = np.apply_along_axis(_mdd,1,cr)
    min_eq = np.apply_along_axis(np.min,1, cr)
    max_eq = np.apply_along_axis(np.max,1, cr)
    ntrad = np.apply_along_axis(num_trades,1,sig_rets)
    ntrad = ntrad.astype(int)

    # Individual optimums
    res = np.stack((sharpe,mdd,min_eq,max_eq,ntrad)).T
    best_sharpe = np.max(res[:,0])
    best_dd = np.max(res[:,1])
    best_min_eq = np.max(res[:,2])
    best_max_eq = np.max(res[:,3])
    sensible_numtrades = np.median(res[:,4])

    # individual scores
    sharpe_score = 1 - ((best_sharpe - sharpe)/best_sharpe)
    dd_score = np.abs(best_dd + mdd)
    min_eq_score = 1 - (best_min_eq + min_eq)
    max_eq_score = 1 - (best_max_eq - max_eq)
    sensible_nt = np.median(ntrad)
    _nt_scores = [_calc_nt_score(x,sensible_nt) for x in ntrad]

    scores = np.stack((sharpe_score, dd_score, min_eq_score, max_eq_score, _nt_scores)).T
    scores = np.sum(wghts * scores, axis=1)
    best_score = np.max(scores)

    return best_score, scores, res


In [None]:
from typing import Callable

def fit(x: np.ndarray,y: np.ndarray) -> np.ndarray:
    """Perform single curve_fit on single stride for windowlength x"""
    return curve_fit(func, x, y,p0=[0.]*3,ftol=5e-1)[0]

def get_deriv(cfs: np.array, num: int) -> float:
    return derivative(func,1,dx=0.001,args=cfs, n=num)

def calc_full_ind_future(prices: np.ndarray, win1: int, win2: int) -> np.ndarray:

    s = list(zip(
        normalize_negative_one(strides(prices,win1,1)),
        normalize_negative_one(strides(prices,win2,1))
    ))

    y_win1 = np.linspace(0,1,win1)
    y_win2 = np.linspace(0,1,win2)

    cfs = [(fit(s1,y_win1),fit(s2,y_win2)) for s1,s2 in tqdm(s)]
    inds = [(get_deriv(cfs1,num=1),get_deriv(cfs2,num=2)) for cfs1,cfs2 in cfs]

    return np.array(inds).T

def calc_perf_future(prices: np.ndarray, win1: int, win2: int, thresh1: float, thresh2: float) -> np.ndarray:

    ind = calc_full_ind_future(prices, win1, win2)
    ts = np.array([thresh1,thresh2])
    ts = np.repeat(ts[:,np.newaxis],ind.shape[-1],axis=1)
    sig = calc_sig(ind, ts, axis=0)
    sup_sig = np.roll(sig,20)
    sup_sig = np.hstack((np.zeros(20),sup_sig[20:]))
    trade_sig = sig+sup_sig
    conds = [trade_sig == 2, trade_sig == -2]
    opt = [1,-1]

    full_sig = np.select(conds, opt, default=0)

    # cr, rets = calc_rets(prices, full_sig)

    return prices, full_sig

def calc_indgrid_and_bt(prices: np.array, windows: np.array, btdepth):
    """
    Creates N x N windows (combs)

    Creates N**2 x 2 x btepth - indicator grid
    """
    win_combs = np.array(list(itertools.product(windows,windows)))
    bt_end = prices.index[-1]

    ind, thresh,coefs = calc_all_indicators(
        prices = prices,
        windows = windows,
        btdepth = btdepth,
        )

    bt_prices = outdf['Close'].asfreq('1H').ffill().iloc[max(windows):(max(windows)+1200)].values
    best_score, scores, res = run_opter_bt(bt_prices,ind, thresh,wghts=np.array([1,0,0,2,0]))
    win1, win2 = win_combs[np.where(scores == best_score)[0][0]]
    t1, t2 = thresh[np.where(scores == best_score)[0][0]][:,0]
    return  win1, win2, t1, t2, bt_end


def recursive_full_run(prices: pd.DataFrame) -> np.ndarray:
    # Takes the prices

    # Calculates the indicators and threshold grid for first run at index 0 -> (btdepth + max(win))

    btdepth = 1200
    windows = np.linspace(200,400,100,dtype=int)
    bt_prices = prices.asfreq('1H').ffill().iloc[:(btdepth+max(windows))]
    win1, win2, t1, t2, bt_end = calc_indgrid_and_bt(bt_prices,windows, btdepth)

    # Next Run
    prices = prices.loc[bt_end:bt_end+timedelta(hours=int(btdepth*0.35))].Close.values
    prices, full_sig = calc_perf_future(prices, win1, win2, t1, t2)

    return  prices, full_sig

prices, full_sig = recursive_full_run(prices)

100%|██████████| 100/100 [01:23<00:00,  1.20it/s]


(10000, 1200)
(1200,)


100%|██████████| 150871/150871 [02:52<00:00, 873.16it/s]


In [None]:
prices = outdf.asfreq("10S").interpolate()

btdepth = 1200
windows = np.linspace(200,400,100,dtype=int)

bt_prices = prices.asfreq('1H').ffill().iloc[:(btdepth+max(windows))]
win1, win2, t1, t2, bt_end = calc_indgrid_and_bt(bt_prices,windows, btdepth)

# Next Run
_ind_prices = prices.loc[bt_end:bt_end+timedelta(hours=int(btdepth*0.35))].iloc[:-max([win1,win2])].Close.values
_prices = prices.loc[bt_end:bt_end+timedelta(hours=int(btdepth*0.35))].Close.values
ind = calc_full_ind_future(_ind_prices, win1, win2)

ts = np.array([t1,t2])
ts = np.repeat(ts[:,np.newaxis],ind.shape[-1],axis=1)

sig = calc_sig(ind, ts, axis=0)
sup_sig = np.roll(sig,20)
sup_sig = np.hstack((np.zeros(20),sup_sig[20:]))
trade_sig = sig + sup_sig
conds = [trade_sig == 2, trade_sig == -2]
opt = [1,-1]
full_sig = np.select(conds, opt, default=0)


100%|██████████| 100/100 [01:26<00:00,  1.16it/s]


(10000, 1200)
(1200,)


100%|██████████| 150540/150540 [03:03<00:00, 822.07it/s]


In [None]:
print("_prices",_prices.shape)
print("ind.shape",ind.shape)
print("trade_sig",trade_sig.shape)
print("full_sig.shape",full_sig.shape)

_prices (150870,)
ind.shape (2, 150540)
trade_sig (150540,)
full_sig.shape (150540,)


(150540,)

In [None]:
cr, rets = calc_rets(_prices, full_sig)

ValueError: operands could not be broadcast together with shapes (150540,) (150870,) 