## Derivations of beta β
A linear regression takes the form of:  
               
$$ y = bx + ε $$ 

where x is the regressor; b is x's coefficient, and ε is noise. its estimate can be expressed by:  
               
$$ {y = \hat{b} x} $$ 
               
As the Ordinary Least Square method states, the best value of beta would minimize the difference between the estimate value $ \hat{y} $ and the actual y. Thus we have:  
               
$$ arg\underset{b}min (y - \hat{y})^2 $$ 

Let:  
        
$$ F =  (y - \hat{y})^2$$
$$F = (y - \hat{b} x)^2 $$
Take the derivative of F with respect to $\hat{b}$, and make it equal to 0 for optimization  
  
$${dF\over{d\hat{b}}} = 2(y - \hat{b} x)(-x) = 0 $$  

$$(y - \hat{b} x)(-x) = 0 $$  

$$ \hat{b} x x' = xy $$  

and expression of estimated beta would be:   
$$ \hat{b} = (xx')^{-1} xy $$

Now, for the standard deviation of beta, first take calculate the variance of beta:  
$$ Var(b) = Var((xx')^{-1} xy) $$
$$ Var(b) = Var((xx')^{-1} x (b x + ε)) $$
$$ Var(b) = Var((xx')^{-1} x b x + (xx')^{-1} x ε)) $$
$$ Var(b) = Var((xx')^{-1} x ε)) $$




<tbody><tr>
<th colspan="5">Critical values for Dickey–Fuller <i>t</i>-distribution.
</th></tr>
<tr>
<td></td>
<td colspan="2">Without trend</td>
<td colspan="2">With trend
</td></tr>
<tr>
<td>Sample size</td>
<td>1%</td>
<td>5%</td>
<td>1%</td>
<td>5%
</td></tr>
<tr>
<td>T = 25</td>
<td>−3.75</td>
<td>−3.00</td>
<td>−4.38</td>
<td>−3.60
</td></tr>
<tr>
<td>T = 50</td>
<td>−3.58</td>
<td>−2.93</td>
<td>−4.15</td>
<td>−3.50
</td></tr>
<tr>
<td>T = 100</td>
<td>−3.51</td>
<td>−2.89</td>
<td>−4.04</td>
<td>−3.45
</td></tr>
<tr>
<td>T = 250</td>
<td>−3.46</td>
<td>−2.88</td>
<td>−3.99</td>
<td>−3.43
</td></tr>
<tr>
<td>T = 500</td>
<td>−3.44</td>
<td>−2.87</td>
<td>−3.98</td>
<td>−3.42
</td></tr>
<tr>
<td>T = ∞</td>
<td>−3.43</td>
<td>−2.86</td>
<td>−3.96</td>
<td>−3.41
</td></tr>
<tr>
<td colspan="5">Source<sup id="cite_ref-Fuller1976_2-0" class="reference"><a href="#cite_note-Fuller1976-2">[2]</a></sup><sup class="reference" style="white-space:nowrap;">:<span>373</span></sup>
</td></tr></tbody>

# import section


In [1]:
import warnings
warnings.filterwarnings('ignore')
from colorit import *
init_colorit()

import numpy as np
import pandas as pd

import statsmodels
from sklearn.linear_model import LinearRegression as LR

from statsmodels.tsa.stattools import coint
import matplotlib.pyplot as plt
import seaborn as sns

from scipy.stats import norm
 

In [2]:
class LinearRegression():
    def __init__(self, x, y, add_const = True):
        self.x = x.copy()
        self.y = y.copy()
        if add_const == True:
            self.addconst()
        self.fit()
        
        
    def addconst(self):
        self.x['constant'] = np.ones(len(self.x))
        
    def fit(self):
        self.coef = np.linalg.inv(self.x.T.dot(self.x)).dot(self.x.T.dot(self.y))
        
        
        self.error = self.y.values - self.x.dot(self.coef).values
        self.residual_cov = self.error.T.dot(self.error)/ len(self.y)
        self.coef_cov = np.kron(np.linalg.inv(self.x.T.dot(self.x))
                                ,self.residual_cov )
        self.coef_std = np.sqrt(self.y.shape[0]/(self.y.shape[0]-self.x.shape[1])*np.diag(self.coef_cov))
        self.tstats = self.coef/self.coef_std.reshape(self.coef.shape)
        
        self.p_value = pd.DataFrame(2*(1-norm.cdf(abs(self.tstats))), 
                     index = self.x.columns, 
                     columns = self.y.columns)
        
    def return_coef(self):
        return self.coef
        
    def report(self):
        self.coef = pd.DataFrame(self.coef, 
                                 index = self.x.columns, 
                                 columns = self.y.columns)
        self.coef_std = pd.DataFrame(self.coef_std, 
                                     index = self.x.columns, 
                                     columns = self.y.columns)
        self.tstats = pd.DataFrame(self.tstats, 
                                   index = self.x.columns, 
                                   columns = self.y.columns)
        return pd.concat({'Estimate Coefficient':self.coef,
                          'SD of Estimate':self.coef_std,
                          't-Statistic':self.tstats,}, axis = 1)
        

In [3]:
class VAR():
    """
    This is class contains all the 
    statistical means I need for this
    pair trading project
    """
    def __init__(self, df, lag, ):
        """
        x and y are pandas dataframe
        """
        self.df = df
        self.lag =lag
        self.coef = np.array([])
        self.columns_name = self.name_lag()
        self.data = self.process_data()
        self.x = self.data[self.indepvar_name]
        self.y = self.data[df.columns]
        self.addconst()
        self.fit()
        
        
    def name_lag(self,):
        rlst = list(self.df.columns)
        self.indepvar_name = []
        for j in range(self.lag):
            for i in range(len(self.df.columns)):
                rlst.append('(Lag_'+str(j+1)+', '+self.df.columns[i]+')')
                self.indepvar_name.append('(Lag_'+str(j+1)+', '+self.df.columns[i]+')')
        return rlst
        
    def process_data(self,):
        lst = [self.df]
        for i in range(self.lag):
            lst.append(self.df.shift(i+1))
        r_df = pd.concat(lst,axis = 1)
        r_df.columns = self.columns_name
        return r_df.fillna(0).reset_index(drop=True)
    
    def addconst(self):
        self.x['constant'] = np.ones(len(self.x))
        self.indepvar_name.append('constant')

    def fit(self):
        self.coef = np.linalg.inv(self.x.T.dot(self.x)).dot(self.x.T.dot(self.y))
        self.coef = pd.DataFrame(self.coef, 
                                 index = self.x.columns,
                                columns = self.y.columns)
        self.error = self.y - self.x.dot(self.coef)
        self.residual_cov = self.error.T.dot(self.error)/ len(self.y)
        self.coef_cov = np.kron(np.linalg.inv(self.x.T.dot(self.x))
                                ,self.residual_cov )
        self.coef_std = np.sqrt(self.y.shape[0]/(self.y.shape[0]-self.x.shape[1])*np.diag(self.coef_cov))
        self.tstats = self.coef/self.coef_std.reshape(self.coef.shape)
        
        self.coef_std = pd.DataFrame(self.coef_std.reshape(self.coef.shape),
                                    index =self.x.columns,
                                   columns = self.y.columns)
        
        self.p_value = pd.DataFrame(2*(1-norm.cdf(abs(self.tstats))), 
                     index = self.indepvar_name, 
                     columns = self.df.columns)
        
    
    def report(self):
        return pd.concat({'Estimate Coefficient':self.coef,
                          'SD of Estimate':self.coef_std,
                          't-Statistic':self.tstats,}, axis = 1)
        
    def AIC(self):
        return np.log(np.linalg.det(self.residual_cov)) + 2*self.x.shape[1]*self.y.shape[1]/self.y.shape[0]
    
    def BIC(self):
        return np.log(np.linalg.det(self.residual_cov)) + np.log(self.y.shape[0])*self.x.shape[1]*self.y.shape[1]/self.y.shape[0]
        
    def IC(self, lag):
        ic = pd.DataFrame([[VAR(self.df, p+1).AIC(), VAR(self.df, p+1).BIC()] for p in range(lag)], 
             index=[p+1 for p in range(lag)],
             columns = ['AIC','BIC'],)
        ic.index.name = 'Lag'
        return ic.style.apply(self.color_min_red)
    
    
    def stability(self):
        eig_v,_ = np.linalg.eig(self.coef.drop('constant'))
        if False in (eig_v < 1):
            print('Stability status: '+ color('[x] UNSTABLE',Colors.red))
        else:
            print('Stability status: ' + color('[o] STABLE',(9, 86, 146)))
            
    def color_min_red(self, s):
        is_min = s== s.min()
        return ['color: %s' % 'red' if v else '' for v in is_min]
        
        


In [4]:
class ADF:
    def __init__(self, residual):
        self.residual = residual
        self.e_lag_1 = residual.shift()
        self.delta_e = residual.diff()
        self.delta_e.columns = ['(Δ{})'.format(residual.columns[0])]
        
        self.delta_e_lag_1 = self.e_lag_1.diff()
        
        self.x = pd.DataFrame(index = residual.index)
        self.x['(Lag 1, {})'.format(residual.columns[0])] = self.e_lag_1
        self.x['(Lag 1, Δ{})'.format(residual.columns[0])] = self.delta_e_lag_1
        
        self.x = self.x.dropna()
        self.y = self.delta_e.loc[self.x.index]
        
    def report(self):
        model = LinearRegression(self.x, self.y)
        t_value = model.report()['t-Statistic'].loc['(Lag 1, {})'.format(self.residual.columns[0])]
        if t_value.values < -3.45:
            print("The T-value is {} which is lower than -3.45".format(t_value.values))
            print('[*]We '+color('reject', (18, 96, 184))+
                  ' the  𝐻0  hypothesis of unit root.'+
                  color(' The residuals are stationary.',(18, 96, 184)))
        else:
            print("The T-value is {} which is higher than -3.45".format(t_value.values))
            print('[*]We '+color('fail to reject', (235, 30, 6))+
                  ' the  𝐻0  hypothesis of unit root.'+
                  color(' The residuals are NOT stationary.',(235, 30, 6)))
        return model.report().style.applymap(self.color_negative_red)

    
    def color_negative_red(self, val):
        color = 'red' if val < -3.45 else 'black'
        return 'color: %s' % color
    
 
            
        
        

In [5]:
class Backtest:
    def __init__(self, df, upper_bound ,lower_bound, mu ):
        self.df = df.copy()
        self.upper_bound = upper_bound
        self.lower_bound = lower_bound
        self.mu = mu
        
    def Signal_generation(self, daily = True):
        entry1 = False
        entry2 = False
        action_lst = []
        for i in range(len(self.df)):
            if self.df.iloc[i].res_past < self.upper_bound and self.df.iloc[i].res > self.upper_bound and entry1 == False and entry2 == False:
                action_lst.append('entry1')
                entry1 = True
            elif self.df.iloc[i].res_past > self.lower_bound and self.df.iloc[i].res < self.lower_bound and entry1 == False and entry2 == False:
                action_lst.append('entry2')
                entry2 = True
            elif self.df.iloc[i].res_past > self.mu and self.df.iloc[i].res < self.mu and entry1 == True:
                action_lst.append('exit')
                entry1 = False
            elif self.df.iloc[i].res_past < self.mu and self.df.iloc[i].res > self.mu and entry2 == True:
                action_lst.append('exit')
                entry2 = False
            else:
                if entry1 == True and entry2 == False:
                    action_lst.append('hold1')
                elif entry2 == True and entry1 == False:
                    action_lst.append('hold2')
                else:    
                    action_lst.append('none')
        self.df['action'] = np.array(action_lst,dtype = str)
        if daily == False:
            return self.df[self.df.action != 'none'][self.df.action != 'hold1'][self.df.action != 'hold2']
        return self.df
    
    def PL(self, trading_record, beta):
        PL = []
        profit = 0
        for i in range(trading_record.shape[0]):
            if trading_record.iloc[i].action == 'exit':
                A = trading_record['FB'].iloc[i] - trading_record['FB'].iloc[i-1]
                B = trading_record['BABA'].iloc[i] - trading_record['BABA'].iloc[i-1]
                if trading_record.iloc[i-1].action == 'entry2':
                    profit += (A - beta * B) 
                else:
                    profit += -A + beta*B
                PL.append(profit)

        PL = pd.Series(PL,)
        print('='*20+'Cumulative Profit'+'='*20)
        print(PL)
        plt.bar(x = PL.index, height = PL.values,)
        #plt.grid(True)
        
    def Returns(self, tr, beta):
        beta1 = np.array([beta,-1])
        beta2 = np.array([-beta,1])
        r = []
        flag = 0
        for i in range(tr.shape[0]):
            if tr.iloc[i].action == 'none':
                r.append(0)
                flag = 0
            elif tr.iloc[i].action == 'entry1' or tr.iloc[i].action == 'entry2':
                r.append(0)
            elif tr.iloc[i].action == 'hold1' or (tr.iloc[i].action == 'exit' and flag == 1):
                ret = np.log((np.array(tr.iloc[i][['BABA','FB']], dtype = float) / np.array(tr.iloc[i-1][['BABA','FB']], dtype = float)))
                r.append(ret.dot(beta1))
                flag = 1
            elif tr.iloc[i].action == 'hold2' or (tr.iloc[i].action == 'exit' and flag == 2):
                ret = np.log((np.array(tr.iloc[i][['BABA','FB']], dtype = float) / np.array(tr.iloc[i-1][['BABA','FB']], dtype = float)))
                r.append(ret.dot(beta2))
                flag = 2
        tr['returns'] = r
        tr['cumulative_returns'] = tr.returns.cumsum()
        return tr
    
    def drawback(self, c_ret, top_num = 5):
        top = 0
        top_i = 0
        bottom = 0
        bottom_i = 0
        drawdown_lst = []
        drawdown_i_lst = []
        drawdown_top = []

        for i in range(len(c_ret)):
            if c_ret[i] >= top:
                drawdown_lst.append(top-bottom)
                drawdown_i_lst.append((top_i,bottom_i))
                top = c_ret[i]
                top_i = i
                bottom = c_ret[i]
                bottom_i = i
            elif c_ret[i] <= bottom:
                bottom = c_ret[i]
                bottom_i = i
            elif i == len(c_ret):
                drawdown_lst.append(top-bottom)
                drawdown_i_lst.append((top_i,bottom_i))

        for each in np.sort(drawdown_lst)[-top_num:]:
            index = drawdown_lst.index(each)
            drawdown_top.append(drawdown_i_lst[index])
        drawdown_top.reverse()
        return drawdown_top
    
    def plot(self,tr):
        tr.cumulative_returns.plot( figsize = (16,8), fontsize = 12, color = '#E64848',)
        colors = ['#A569BD','#AF7AC5','#BB8FCE','#C39BD3','#D2B4DE']
        c_ret = tr.cumulative_returns
        c_ret.index = np.arange(len(c_ret))
        drawdown_top =  self.drawback(c_ret, top_num = 5)
        for i in range(len(colors)):
            plt.axvspan(drawdown_top[i][0],drawdown_top[i][1], alpha=0.5, color= colors[i], 
                        label = 'maximum drawdown top'+str(i+1))
        plt.legend()
        plt.title('Cumulative Log return with maxium drawdown',
                  fontsize = 18)
        plt.grid(True)
