In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt




In [3]:
class Backtest:
    
    def __init__(self, instrument, futures, df, rollovers, jaren):
        self.instrument = instrument
        self.futures = futures
        self.df = df
        self.rollovers = rollovers

        if self.df.empty:
            print("geen data geleverd")
        else:
            
            self.bereken_zscore()
            self.genereer_signalen()
            self.koppel_signalen()
            self.bereken_winst()
            self.geef_resultaten()
        
    def bereken_zscore(self):
        lookback = 20

        self.df['spread'] = self.df[self.instrument] - self.df[self.futures]
        self.df['zscore'] = (self.df['spread'] - self.df['spread'].rolling(lookback).mean()) / self.df['spread'].rolling(lookback).std()
        self.df.dropna(inplace=True)
    
    def genereer_signalen(self):
        entryZscore = 2
        exitZscore = 0
        
        conditions = [(self.df.zscore < -entryZscore), (self.df.zscore >= -exitZscore)]
        choices = [1, 0]
        self.df['num_long'] = np.select(conditions, choices, default=np.NaN)
        
        conditions1 = [(self.df.zscore > entryZscore), (self.df.zscore <= exitZscore)]
        choices1 = [-1, 0]
        self.df['num_short'] = np.select(conditions1, choices1, default=np.NaN)
    
    
    def koppel_signalen(self):
        self.df['num_short'] = self.df['num_short'].fillna(method='ffill', inplace=False).fillna(value=0, inplace=False)
        self.df['num_long'] = self.df['num_long'].fillna(method='ffill', inplace=False).fillna(value=0, inplace=False)
        self.df['num_units'] = self.df['num_long'] + self.df['num_short']
        
        self.trades=0
        long_pos=False
        short_pos=False
        for index, row in self.df.iterrows():
            if not long_pos and row['num_units'] == 1:
                long_pos = True
                short_pos = False
                self.trades += 1
            if not short_pos and row['num_units'] == -1:
                short_pos = True
                long_pos = False
                self.trades += 1
            if long_pos or short_pos and row['num_units'] == 0:
                short_pos= False
                long_pos= False

        self.df[f"{self.instrument}_pos"] = self.df['num_units'] * self.df[self.instrument]
        self.df[f"{self.futures}_pos"] = -self.df['num_units'] * self.df[self.futures]
        
        
    def bereken_winst(self):
        self.df[f'{self.instrument}_pnl'] = (df[self.instrument]-df[self.instrument].shift())/df[self.instrument].shift()*self.df[f'{self.instrument}_pos'].shift()
        self.df[f'{self.futures}_pnl'] = (df[self.futures]-df[self.futures].shift())/df[self.futures].shift()*self.df[f'{self.futures}_pos'].shift()
        self.df['pnl'] = self.df[f'{self.instrument}_pnl'] + self.df[f'{self.futures}_pnl']
        
        mask = []
        for value in self.df.index.tolist():
            mask.append(value in self.rollovers)
        self.df.iloc[mask]['pnl'] = 0j
        
        
        self.ret = self.df['pnl'] / (np.abs(self.df[f'{self.instrument}_pos'].shift()) + np.abs(self.df[f'{self.futures}_pos'].shift()))
        self.ret = self.ret.fillna(0)
        
        self.returns_df = pd.DataFrame(self.ret, columns=['ret'])
        
        cum_ret = []
        for i in range(len(self.ret)):
            if i == 0:
                cum_ret.append(self.ret.iloc[i])
            else:
                cum_ret.append((self.ret.iloc[i] + 1) * (cum_ret[i-1]+1) - 1)
        

        self.returns_df['cum_ret'] = cum_ret*100
        
        self.sharpe = (self.returns_df['ret'].mean() - (10^(np.log10(1.0570)/365)))/self.returns_df['ret'].std()
        
        self.max_drawdown = 0
        for i in range(len(self.returns_df['cum_ret'])):
            current_cumret = self.returns_df.iloc[i]['cum_ret']
            dd = (current_cumret - self.returns_df.iloc[:i+1]['cum_ret'].max())/(self.returns_df.iloc[:i+1]['cum_ret'].max()+1)
            if dd < max_drawdown:
                self.max_drawdown = dd

    def geef_resultaten(self):
        self.returns_df['cum_ret'].plot(figsize=(20,6))
        print(f"totale rendement: %{self.returns_df.iloc[-1]['cum_ret']*100}")
        print(f'sharpe-ratio: {self.sharpe}')
        print(f'maximale tijdelijke verlies: %{self.max_drawdown*100}')