<a href="https://colab.research.google.com/github/achrafbenss/algo-quant/blob/main/Backtesting_Pair_Trading_Strategy_Using_Hurst_Exponent_and_LO_Mackinley_Random_Walk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Backtesting Pair Trading Strategy Using Hurst Exponent and LO-Mackinley Random Walk

Install packages that allow us to download historical market data, perform statistical modeling and analysis, create interactive data visualizations, build and test trading strategies, and conduct backtesting of those strategies.

In [71]:
! pip install yfinance statsmodels plotly  backtesting



 # Importing packages Needed For the Project

In [72]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller, coint
from sklearn.linear_model import LinearRegression
import yfinance as yf
from scipy.stats import norm
from backtesting import Backtest , Strategy
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
%matplotlib inline
warnings.filterwarnings("ignore")

## A list of tickers representing futures commodities chosen for this project

In [73]:
comodities_tickers = ['GC=F','SI=F','HG=F','CL=F','BZ=F','NG=F','PA=F','PL=F','HO=F','RB=F',
                   'ZC=F','ZO=F','KE=F','ZR=F','ZS=F','GF=F','HE=F','LE=F','KC=F','CT=F','LBS=F','OJ=F','SB=F']

# Data preprocessing and cleaning

In [74]:
data = pd.DataFrame()
for i in comodities_tickers:
  data[i]=  yf.Ticker(i).history(period = '2y',
                           interval = '1d',
                           actions = True,
                           auto_adjust = True).Close
data.head()

Unnamed: 0_level_0,GC=F,SI=F,HG=F,CL=F,BZ=F,NG=F,PA=F,PL=F,HO=F,RB=F,...,ZR=F,ZS=F,GF=F,HE=F,LE=F,KC=F,CT=F,LBS=F,OJ=F,SB=F
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2021-07-06 00:00:00-04:00,1793.5,26.153999,4.258,73.370003,74.529999,3.637,2794.300049,1081.900024,2.1049,2.2282,...,1252.0,1363.75,160.625,109.925003,122.400002,147.899994,86.379997,792.900024,,17.870001
2021-07-07 00:00:00-04:00,1801.5,26.110001,4.329,72.199997,73.43,3.596,2846.5,1080.199951,2.0891,2.206,...,1260.5,1386.5,159.0,109.150002,120.599998,149.75,86.629997,780.0,,17.75
2021-07-08 00:00:00-04:00,1799.599976,25.966999,4.2715,72.940002,74.120003,3.688,2808.399902,1073.199951,2.1204,2.2552,...,1248.5,1390.25,157.324997,110.099998,119.275002,152.050003,85.879997,718.0,,17.450001
2021-07-09 00:00:00-04:00,1810.0,26.211,4.3525,74.559998,75.550003,3.674,2810.199951,1094.400024,2.1552,2.292,...,1260.0,1404.0,159.175003,111.0,119.224998,151.300003,87.989998,702.5,,17.280001
2021-07-12 00:00:00-04:00,1805.5,26.216999,4.323,74.099998,75.160004,3.749,2857.5,1121.699951,2.1498,2.2772,...,1266.0,1432.5,158.149994,112.474998,119.824997,153.850006,88.470001,685.0,,16.99


In [75]:
data.isnull().sum().tolist()

[0, 2, 0, 0, 0, 0, 2, 1, 0, 0, 3, 3, 3, 3, 3, 3, 7, 3, 1, 1, 34, 45, 1]

In [76]:
for i in data.columns:
  if data[i].isnull().sum()/len(data[i])> 0.01:    # we delete data that has more that 10% percent of missing value
    del data[i]
data.interpolate(limit_direction="both",inplace=True)   # we fill the rest of data by using interploation
data.tail()

Unnamed: 0_level_0,GC=F,SI=F,HG=F,CL=F,BZ=F,NG=F,PA=F,PL=F,HO=F,RB=F,ZC=F,ZO=F,KE=F,ZR=F,ZS=F,GF=F,LE=F,KC=F,CT=F,SB=F
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
2023-06-26 00:00:00-04:00,1923.699951,22.805,3.7865,69.370003,74.18,2.791,1291.800049,936.700012,2.4388,2.5375,637.25,391.25,866.5,1794.0,1521.0,233.675003,177.699997,167.949997,77.07,23.719999
2023-06-27 00:00:00-04:00,1914.0,22.938999,3.7815,67.699997,72.260002,2.763,1283.199951,936.599976,2.399,2.5168,623.0,385.0,837.75,1767.5,1495.0,238.5,179.300003,169.449997,77.639999,23.030001
2023-06-28 00:00:00-04:00,1912.300049,22.868,3.7205,69.559998,74.029999,2.603,1236.0,923.400024,2.4067,2.6034,590.0,377.0,800.75,1748.5,1451.0,240.25,179.625,165.649994,80.120003,22.57
2023-06-29 00:00:00-04:00,1909.199951,22.591,3.6775,69.860001,74.339996,2.701,1218.800049,897.799988,2.4156,2.6177,581.0,367.75,794.25,1770.5,1483.0,242.375,179.75,163.600006,81.25,22.07
2023-07-04 00:00:00-04:00,1937.599976,23.219999,3.784,70.660004,75.519997,2.686,1246.5,928.599976,2.4112,2.4966,581.0,367.75,794.25,1770.5,1483.0,242.375,179.75,163.600006,81.25,22.07


### Identification of non-stationary assets to fulfill the cointegration property requirement

In [77]:
train_data , test_data = data[:round(len(data)*0.7)],data[round(len(data)*0.7):]
non_stationary = []
for i in train_data.columns:
  if adfuller(data[i])[1]> 0.01:
    non_stationary.append(i)
non_stationary

['GC=F',
 'SI=F',
 'HG=F',
 'CL=F',
 'BZ=F',
 'NG=F',
 'PA=F',
 'PL=F',
 'HO=F',
 'RB=F',
 'ZC=F',
 'ZO=F',
 'KE=F',
 'ZR=F',
 'ZS=F',
 'GF=F',
 'LE=F',
 'KC=F',
 'CT=F',
 'SB=F']

## Selection of pairs exhibiting cointegration


In [78]:
pair_coint = pd.DataFrame({'asset1':[], 'asset2':[], 'score':[], 'pvalue':[]}) # creating dataframe to store pairs that are cointegrated
for y in non_stationary:
  for x in non_stationary:
    if y != x:
      score, pvalue, _ = coint(train_data[y],train_data[x])
      if pvalue <= 0.05:
        pair_coint.loc[len(pair_coint.index)] = [y , x, score, pvalue]

list1 = [sorted([i,y]) for i , y in zip(pair_coint.asset1, pair_coint.asset2)]
index = []
list2 = []
# the code below consist of removing duplicated pairs
for i in list1:
  if list1.count(i)>1:
    list1.remove(i)

for i,y in zip(np.array(list1)[:,0], np.array(list1)[:,1]):
  row = pair_coint[(pair_coint['asset1'] == i) & (pair_coint['asset2'] == y)].index.tolist()
  index.append(row)

for i in index:
  for y in i:
    list2.append(y)
new_pair = pair_coint.loc[list2]
new_pair

Unnamed: 0,asset1,asset2,score,pvalue
2,HG=F,SI=F,-4.003251,0.007105
4,CL=F,ZC=F,-3.597962,0.024644
5,CL=F,ZS=F,-3.417062,0.04049
6,BZ=F,CL=F,-3.367997,0.046031
8,BZ=F,ZS=F,-3.498144,0.03256
9,PL=F,SI=F,-3.569934,0.026679
4,CL=F,ZC=F,-3.597962,0.024644
7,BZ=F,ZC=F,-3.824103,0.012585
12,HO=F,ZC=F,-3.71825,0.017359
17,KE=F,ZC=F,-3.949711,0.008459


### We define a fucntion that calculate the spread between two cointegrated pairs using linear regression

In [79]:
def spread(y,x):
  constant = pd.DataFrame(pd.Series([1]*len(train_data[x]),index = train_data[x].index),columns=['constant'])
  add_x_constant = pd.concat([train_data[x],constant], axis = 1)
  reg = LinearRegression().fit(add_x_constant,train_data[y])
  beta = reg.coef_[0]
  alpha = reg.intercept_
  spread = train_data[y] - beta*train_data[x]-alpha
  return spread

## Hurst Exponent Filter:
The Hurst exponent is a statistical measure used to characterize the long-term memory or persistence of a time series. The Hurst exponent ranges between 0 and 1, with values less than 0.5 indicating a mean-reverting behavior and values closer or above 0.5 indicating a persistent or trending behavior (more likely to continue).

In [80]:
def rolling_hurst(df):
  hurst = []
  for i in range(100,len(df)):
    lags= range(2,40)
    tau = [np.sqrt(np.std(np.subtract(df[i-100 :i][lag:],df[i-100 :i][:-lag]))) for lag in lags]
    poly = np.polyfit(np.log(lags),np.log(tau),1)
    hurst.append(poly[0]*2)
  return hurst

#### In the context of selecting pairs spread , a low Hurst exponent value, less than 0.5 suggests mean-reverting behavior This means that the spread between the selected pairs tends to revert to its mean value over time,  wich aids to identifying pairs with mean-reverting properties.

In [81]:
def eligible_pair(pair):
  list1 = []
  for i, (x,y) in enumerate(zip(pair.asset1.tolist(),pair.asset2.tolist())):
    if np.mean(rolling_hurst(spread(x,y))) > 0.4:
      list1.append(i)
  return pair.drop(pair.index[list1])

eligible_pair = eligible_pair(new_pair)
eligible_pair

Unnamed: 0,asset1,asset2,score,pvalue
2,HG=F,SI=F,-4.003251,0.007105
4,CL=F,ZC=F,-3.597962,0.024644
5,CL=F,ZS=F,-3.417062,0.04049
6,BZ=F,CL=F,-3.367997,0.046031
8,BZ=F,ZS=F,-3.498144,0.03256
9,PL=F,SI=F,-3.569934,0.026679
4,CL=F,ZC=F,-3.597962,0.024644
7,BZ=F,ZC=F,-3.824103,0.012585
12,HO=F,ZC=F,-3.71825,0.017359
17,KE=F,ZC=F,-3.949711,0.008459


## Variance Ratio Test - Lo and MacKinlay:
The LO-Mackinlay test is used to evaluate the efficiency of a time series or determine if it follows a random walk process. we need to identify the minimum p-value and retrieves the corresponding maximum z-statistic, which represents the highest significance level for the LO-Mackinlay variance test ratio

In [82]:
def rolling_variance_ratio(df):
  spread  = df.pct_change()[1:]
  variance = spread.var()
  max_t = 40
  variance_ratio_test = pd.DataFrame({'variance':[], 'variance_increments': [], 'variance_ratio': [], 'standard_eroor':[],'z_stat':[],'pvalue':[]})
  for t in range(2,max_t):
    v_t = spread.rolling(t).sum().var()
    variance_ratio = v_t/(t*variance) -1
    standard_error = np.sqrt(2*(2*t - 1)*(t-1)/(3*t*len(spread)))
    z_stat = variance_ratio/standard_error
    p_value = 2*(1- norm.cdf(abs(z_stat)))
    variance_ratio_test.loc[len(variance_ratio_test.index)] = [variance, v_t, variance_ratio, standard_error, z_stat, p_value]

  min_pvalue = min(variance_ratio_test['pvalue'])
  max_z_stat =  variance_ratio_test[variance_ratio_test['pvalue'] == min_pvalue]['z_stat'].values # retreving the minimum pvalue with the corresponding z_stat

  return max_z_stat

In [83]:
# If any of the maximum z-statistics are greater than or equal to 0 (indicating mean-reverting behavior)
def eligible(pair):
  list1 = []
  for i, (x,y) in enumerate(zip(pair.asset1.tolist(),pair.asset2.tolist())):
    max_z_stat = rolling_variance_ratio(spread(x,y))
    if  (max_z_stat >= 0).any():
      list1.append(i)
  return pair.drop(pair.index[list1])
eligible = eligible(eligible_pair)
eligible

Unnamed: 0,asset1,asset2,score,pvalue
2,HG=F,SI=F,-4.003251,0.007105
4,CL=F,ZC=F,-3.597962,0.024644
9,PL=F,SI=F,-3.569934,0.026679
4,CL=F,ZC=F,-3.597962,0.024644
18,GF=F,SI=F,-3.545376,0.028579
19,GF=F,HG=F,-4.07183,0.005657
21,SB=F,SI=F,-3.72324,0.017102
31,SB=F,ZO=F,-3.576364,0.0262
33,SB=F,ZR=F,-3.628969,0.02255
34,SB=F,ZS=F,-3.63926,0.02189


## Matrix of eligible pairs of assets, exhibiting mean-reverting behavior

In [84]:
# Extract the relevant columns
matrix_data = eligible[['asset1', 'asset2', 'pvalue']]

# Create matrix plot
fig = go.Figure(data=go.Scatter(
    x=matrix_data['asset2'],
    y=matrix_data['asset1'],   #
    mode='markers',
    marker=dict(
        color=matrix_data['pvalue'],
        colorscale='Viridis',
        size=20,
        colorbar=dict(title='P-value')
    ),
    text=matrix_data['pvalue'],
    hovertemplate='<b>Asset 1</b>: %{y}<br><b>Asset 2</b>: %{x}<br><b>P-value</b>: %{text:.4f}',
))

# Set axis labels
fig.update_xaxes(title_text='Asset 2')
fig.update_yaxes(title_text='Asset 1')

# Show figure
fig.show()

#### We retreive pairs that demonstrate a fundamental relationship, which will enable us to conduct backtesting

In [85]:
list_pairs = [['HG=F','SI=F'],['GF=F','HG=F'],['GF=F','SI=F'],['PL=F','SI=F']]

In [86]:
# #  calculates the z-score of the spread between two pairs
def zscore_spread(y,x):
  spread_mavg = spread(y,x)
  spread_mavg60 = spread(y,x).rolling(window=60, center=False).mean()
  spread_std_60 = spread(y,x).rolling(window=60, center=False).std()
  zscore_60 = (spread_mavg - spread_mavg60)/spread_std_60
  return zscore_60

In [87]:
# Calculate the threshold values
def get_thresholds(data):
    std_dev = np.std(data)
    threshold_upper = np.mean(data) + 1.7*std_dev
    threshold_lower = np.mean(data) - 1.7*std_dev
    return threshold_upper, threshold_lower

#### We generate trading signals for two pairs of assets. In this strategy we need to make sure that two pairs move differently in the same time. so if the spread exceeds the upper threshold and the corresponding asset's price has increased over the past three periods, a short position (-1) is assigned to that asset, and vice versa.

In [88]:
def signal(y,x):
  new_data = train_data[[y,x]]
  spread_data = zscore_spread(y,x)
  threshold_upper, threshold_lower = get_thresholds(spread_data)
  new_data['position_y'] = None
  for i in range(3,len(new_data)):
    if (spread_data[i] > threshold_upper) and (new_data[y][i] > new_data[y][i-3]):
      new_data['position_y'].iloc[i] = -1
    if (spread_data[i] < threshold_lower) and (new_data[y][i] < new_data[y][i-3]):
      new_data['position_y'].iloc[i] = 1

  new_data['position_x'] = None
  for i in range(3,len(new_data)):
    if (spread_data[i] < threshold_lower) and (new_data[x][i] > new_data[x][i-3]):
      new_data['position_x'].iloc[i] = -1
    if (spread_data[i] > threshold_upper) and (new_data[x][i] < new_data[x][i-3]):
      new_data['position_x'].iloc[i] = 1
  return new_data


In [89]:
def visualize_pair(y, x):
    spread_data = zscore_spread(y, x)
    threshold_upper, threshold_lower = get_thresholds(spread_data)
    data = signal(y, x)

    fig_pair = make_subplots(rows=3, cols=1,vertical_spacing=0.025, row_heights=[1,1.4, 1])
    fig_pair.add_trace(go.Scatter(x=data.index, y=data[y], name=y), row=1, col=1)
    fig_pair.add_trace(go.Scatter(x=data.index, y=data[x], name=x), row=3, col=1)

    fig_pair.add_trace(go.Scatter(mode='markers', name='buy', x=data[data['position_y'] == 1].index,
                                  y=data[data['position_y'] == 1][y],
                                  marker=dict(color='green', size=10, symbol='triangle-up')), row=1, col=1)

    fig_pair.add_trace(go.Scatter(mode='markers', name='sell', x=data[data['position_y'] == -1].index,
                                  y=data[data['position_y'] == -1][y],
                                  marker=dict(color='red', size=10, symbol='triangle-down')), row=1, col=1)

    fig_pair.add_trace(go.Scatter(mode='markers', name='buy', x=data[data['position_x'] == 1].index,
                                  y=data[data['position_x'] == 1][x],
                                  marker=dict(color='green', size=10, symbol='triangle-up')), row=3, col=1)

    fig_pair.add_trace(go.Scatter(mode='markers', name='sell', x=data[data['position_x'] == -1].index,
                                  y=data[data['position_x'] == -1][x],
                                  marker=dict(color='red', size=10, symbol='triangle-down')), row=3, col=1)

    fig_pair.add_trace(go.Scatter(x=spread_data.index, y=spread_data, mode='lines', name='Spread',
                                  line=dict(color='skyblue', dash='dashdot')), row=2, col=1)
    fig_pair.add_trace(go.Scatter(x=spread_data.index, y=[spread_data.mean()] * len(spread_data),mode='lines',
                                  name='Mean',line=dict(color='black', dash='dash')), row=2, col=1)
    fig_pair.add_trace(go.Scatter(x=spread_data.index, y=[threshold_upper] * len(spread_data), mode='lines',
                                  name='Upper Threshold', line=dict(color='red', dash='dash')), row=2, col=1)
    fig_pair.add_trace(go.Scatter(x=spread_data.index, y=[threshold_lower] * len(spread_data), mode='lines',
                                  name='Lower Threshold', line=dict(color='green', dash='dash')), row=2, col=1)
    fig_pair.update_layout(title='Pair Trading Strategy: {} and {}'.format(y,x), hovermode='x unified',title_x=0.45, height=900, showlegend=True)
    return fig_pair.show()

In [90]:
#  Present Graph of each pair
for i in list_pairs:
  print(visualize_pair(i[0], i[1]))

None


None


None


None


# Backtesting Strategy  

#### We backtest only **Gold** and **Silver** in this project make sure to do it by yourself for the other pairs selected.

In [91]:
class generating_signal:
  def __init__(self,first_pair, second_pair,test_data):
    self.first_pair = first_pair
    self.second_pair = second_pair
    self.test_data = test_data
    self.test_data1 = yf.Ticker(self.first_pair).history(period = '2y',
                           interval = '1d',
                           actions = True,
                           auto_adjust = True)
    self.test_data2 = yf.Ticker(self.second_pair).history(period = '2y',
                           interval = '1d',
                           actions = True,
                           auto_adjust = True)

    self.test_data.index = pd.to_datetime(self.test_data.index,format="%Y-%m-%d",utc=True)
    self.test_data1.index = pd.to_datetime(self.test_data1.index,format="%Y-%m-%d",utc=True)
    self.test_data2.index = pd.to_datetime(self.test_data2.index,format="%Y-%m-%d",utc=True)
    self.test_data1 = self.test_data1.loc[self.test_data.index[0]:]
    self.test_data2 = self.test_data2.loc[self.test_data.index[0]:]
    self.common = self.test_data1.index.intersection(self.test_data2.index)
    self.test_data1 = self.test_data1.loc[self.common]
    self.test_data2 = self.test_data2.loc[self.common]
    if True:
      self.signal()

  def pair_spread(self):
    self.constant = pd.DataFrame(pd.Series([1]*len(self.test_data2),index = self.test_data2.index),columns=['constant'])
    self.add_x_constant = pd.concat([self.test_data2.Close,self.constant], axis = 1)
    self.reg = LinearRegression().fit(self.add_x_constant,self.test_data1.Close)
    self.beta = self.reg.coef_[0]
    self.alpha = self.reg.intercept_
    self.spread = self.test_data1.Close -(self.beta*self.test_data2.Close)-self.alpha
    return self.spread

  def zscore_spread(self):
    self.spread_mavg = self.pair_spread()
    self.spread_mavg60 = self.pair_spread().rolling(window=60, center=False).mean()
    self.spread_std_60 = self.pair_spread().rolling(window=60, center=False).std()
    self.zscore_60 = (self.spread_mavg - self.spread_mavg60)/self.spread_std_60
    return self.zscore_60

  def get_thresholds(self):
    self.std_dev = np.std(self.zscore_spread())
    self.threshold_upper = np.mean(self.zscore_spread()) + 1.7*self.std_dev
    self.threshold_lower = np.mean(self.zscore_spread()) - 1.7*self.std_dev
    return self.threshold_upper, self.threshold_lower

  def signal(self):

    self.spread_data = self.zscore_spread()
    self.threshold_upper, self.threshold_lower = self.get_thresholds()

    self.test_data1['position_y'] = None
    for i in range(1,len(self.test_data1)):
      if (self.spread_data[i] >= self.threshold_upper) and (self.test_data1.Close[i] > self.test_data1.Close[i-1]):
        self.test_data1['position_y'].iloc[i] = -1
      if (self.spread_data[i] <= self.threshold_lower) and (self.test_data1.Close[i] < self.test_data1.Close[i-1]):
        self.test_data1['position_y'].iloc[i] = 1
      self.test_data1['position_y'] = self.test_data1['position_y'].fillna(0)

    self.test_data2['position_x'] = None
    for i in range(1,len(self.test_data2)):
      if (self.spread_data[i] <= self.threshold_lower) and (self.test_data2.Close[i] > self.test_data2.Close[i-1]) :
        self.test_data2['position_x'].iloc[i] = -1
      if (self.spread_data[i] >= self.threshold_upper) and (self.test_data2.Close[i] < self.test_data2.Close[i-1]):
        self.test_data2['position_x'].iloc[i] = 1
      self.test_data2['position_x'] = self.test_data2['position_x'].fillna(0)


## Backtest first asset (Gold)


In [92]:
asset1 = generating_signal('GF=F','SI=F',test_data).test_data1
def SIGNAL1():
  return asset1.position_y

In [93]:
class  Pairstrading(Strategy):
  def init(self):
    super().init()
    self.signal = self.I(SIGNAL1)
    self.cooldown = 0

  def next(self):
    super().next()
    price = self.data.Close[-1]

    if self.signal == 1 and self.cooldown == 0:
      self.buy(tp = 1.2*price , sl = 0.95*price, size = 0.1)
      self.cooldown = 3   # new trades can be taken only after 3 bars

    elif self.signal== -1 and self.cooldown == 0:
      self.sell(tp = 0.8*price , sl = 1.05*price, size = 0.1)
      self.cooldown = 3  # new trades can be taken only after 3 bars

    if self.cooldown > 0 :
      self.cooldown -= 1

In [94]:
bt = Backtest(asset1,Pairstrading,cash=100000,commission=0.001, exclusive_orders=True)
stat = bt.run()
evaluation = pd.DataFrame(stat)
evaluation

Unnamed: 0,0
Start,2022-11-23 05:00:00+00:00
End,2023-06-29 04:00:00+00:00
Duration,217 days 23:00:00
Exposure Time [%],24.832215
Equity Final [$],101273.033007
Equity Peak [$],101798.857882
Return [%],1.273033
Buy & Hold Return [%],35.216179
Return (Ann.) [%],2.16251
Volatility (Ann.) [%],1.768105


In [95]:
bt.plot()

## Backtest Second asset (Silver)

In [96]:
asset2 = generating_signal('GF=F','SI=F',test_data).test_data2
def SIGNAL2():
  return asset2.position_x

In [97]:
class  Pairstrading(Strategy):
  def init(self):
    super().init()
    self.signal = self.I(SIGNAL2) # for second assets
    self.cooldown = 0

  def next(self):
    super().next()
    price = self.data.Close[-1]

    if self.signal == 1 and self.cooldown == 0:
      self.buy(tp = 1.2*price , sl = 0.95*price, size = 0.1)
      self.cooldown = 3   # new trades can be taken only after 3 bars

    elif self.signal== -1 and self.cooldown == 0:
      self.sell(tp = 0.8*price , sl = 1.05*price, size = 0.1)
      self.cooldown = 3  # new trades can be taken only after 3 bars

    if self.cooldown > 0 :
      self.cooldown -= 1

In [98]:
bt = Backtest(asset2,Pairstrading,cash=100000,commission=0.001, exclusive_orders=True)
stat = bt.run()
evaluation = pd.DataFrame(stat)
evaluation

Unnamed: 0,0
Start,2022-11-23 05:00:00+00:00
End,2023-06-29 04:00:00+00:00
Duration,217 days 23:00:00
Exposure Time [%],22.147651
Equity Final [$],100683.243662
Equity Peak [$],101497.485151
Return [%],0.683244
Buy & Hold Return [%],5.777961
Return (Ann.) [%],1.15828
Volatility (Ann.) [%],1.194069


In [99]:
bt.plot()