In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.optimize
from pandas_datareader import data as pdr
from scipy.stats import norm,binom,chi2
from scipy.linalg import cholesky
import matplotlib.pyplot as plt 
import datetime as dt
import numpy as np

## OBTAINING THE DATA FROM YAHOO FINANCE

In [2]:
def getData(tickers_list,start,end):
    """inputs: function whose inputs are the list of the ticks for
    the assets we want to include in our portfolio,the start
    and end date of analysis.
    outputs:
     -data: DataFrame of market variables or asset prices
     -returns: DataFrame of the percentage change or returns of the market variable
     -meanReturns: the mean of the returns for each market variable
     -covMatrix: the covariance matrix of the market variables"""
    data=pdr.get_data_yahoo(tickers_list, start, end)['Close']
    data.reset_index(inplace=True)
    dates=data.loc[1:,'Date']
    data.drop(columns=['Date'],inplace=True)
    returns=data.pct_change()
    returns.dropna(inplace=True)
    returns.reset_index(inplace=True, drop=True)
    meanReturns=returns.mean(axis=0)
    covMatrix=returns.cov()

    return data,returns,dates,meanReturns,covMatrix




## VOLATILITY ESTIMATION 

### implementing Garch(1,1) and M-Garch(1,1) 

In [3]:

class Garch1_1:
    """This is a class that calculate GARCH(1,1) optimal parameters c, a, b for a market variable returns and calculate 
    the conditional variances with the optimal parameters """

    def __init__(self, returns):
        self.returns = np.array(returns)
        self.n=len(self.returns)
        self.params_opt = self.garch_opt()  #extracting the optimal parameters c, a, b
    

    def garch_variance(self, parameters):
        "Returns the variance of a GARCH(1,1) process with the parameters passed"
        
        # specifying parameters
        c = parameters[0]
        a = parameters[1]
        b = parameters[2]

       
        # Initializing an empty array for the varainces in each day
        var_list = np.zeros(self.n)

        # calculating the long-run variance
        Vl=c/(1 - a - b)   
        var_list[0]=Vl
        #calculating variance 
        for i in range(1,self.n):
                var_list[i] = c + a * self.returns[i-1]**2 + b * var_list[i-1]
        return var_list
        
    def garch_loglikelihood(self, parameters):
        " Here we define the log likelihood function to be optimize"

        var = self.garch_variance(parameters)
        logL = np.sum(-np.log(var) - self.returns**2 / var)
        return -logL
    
    def garch_opt(self):
        "returns the optimal parameters minimizing the log likelihood function"
        #defining constraints
        constraint=lambda parameters: (1-1e-8)-parameters[1]-parameters[2] 
        con={'type':'ineq','fun':constraint}
        opt_params=scipy.optimize.minimize(self.garch_loglikelihood,(np.var(self.returns),0.2,0.6),bounds=((1e-10,1),(1e-10,1),(1e-10,1)),constraints=con)
        return opt_params.x 


In [4]:
class DCC_garch:
    """This class implement the DCC m-GARCH(1,1) model for a multi-asset portfolio. This is the second step of the DCC model 
    where we optimize the value of  the parameters a and b. The first step of the DCC model is obtaining the
    parameters c,a,b for each of the assets in the portfolio"""
    
    def __init__(self,returns):
        self.returns=np.array(returns)
        
        
    def m_garch_vol(self):
        " fetch the conditional variance for each asset portfolio in each day n and storage the squared root of them in a matrix D of size\
          nxJ where n is the number of days in our sample and J is the number of assets in our portfolio. These are the conditional\
          volatilities"
        
        row,col=self.returns.shape #number of data points and assets respectively.
        S=np.zeros([row,col])  ##place holder of the variances for each market variable
        
        for i in range(col):
            garch=Garch1_1(self.returns[:,i])
            opt_params=garch.garch_opt()
            
            S[:,i]=np.sqrt(garch.garch_variance(opt_params))
        
        return S
    

    def m_garch_loglikilihood(self,parameters): 
        "Especification of the log-likelihood function to estimate \
        parameter a and b"
        
        a=parameters[0]
        b=parameters[1]
        row,col=self.returns.shape

        S=self.m_garch_vol() # creating the S matrix with all the standard deviations
        Q_bar=np.cov(self.returns.reshape(col,row))#unconditional covariance .We approximate it by the sample covariance
        
        Q=np.zeros((row,col,col))
        R=np.zeros((row,col,col))
        H=np.zeros((row,col,col))

        logL=0

        Q[0]=np.matmul(self.returns[0].T,self.returns[0])    

        for i in range(1,row): #for each day
            D_n=np.diag(S[i,:])
            D_n_inv=np.linalg.inv(D_n)
            resd=D_n_inv*self.returns[i,:].T  #calculating the standardized residuals
            Q[i]=(1-a-b)*Q_bar+a*resd*resd.T +b*Q[i-1]      #calculating matrix Q
            Q_star_inv=np.linalg.inv(np.sqrt(np.diag(np.diag(Q[i]))))
            R[i]=np.dot(Q_star_inv,np.dot(Q[i],Q_star_inv))
            H[i]=np.dot(D_n,np.dot(R[i],D_n))

            # log likelihood function to minimize 
            logL=logL+row*np.log(2*np.pi)+2*np.log(np.linalg.det(D_n))+np.log(np.linalg.det(R[i]))+np.matmul(self.returns[i], (np.matmul( np.linalg.inv(R[i]), self.returns[i].T)))
            

        return  logL  # we change the sign to minimize the function
        
    def m_garch_opt(self):
        "optimization of the parameters a and b minimzing the log-likelihood function\
        returning estimated parameters"
        
        constraint=lambda parameters: (1-1e-8)-parameters[0]-parameters[1] 
        con={'type':'ineq','fun':constraint}
        opt_params=scipy.optimize.minimize(self.m_garch_loglikilihood,(0.3,0.6),bounds=((1e-10,1),(1e-10,1)),constraints=con)
        return opt_params.x

    def m_garch_covariance(self,parameters):
        "give us the variance covariance matrix on the last day of our data sample\
        when we want to calculate volatility and Var"
        
        a=parameters[0]
        b=parameters[1]
        row,col=self.returns.shape
        S=self.m_garch_vol() # creating the S matrix with all the estandard deviations
        Q_bar=np.cov(self.returns.reshape(col,row))#unconditional covariance .We approximate it by the sample covariance
        
        Q=np.zeros((row,col,col))
        R=np.zeros((row,col,col))
        H=np.zeros((row,col,col))


        Q[0]=np.matmul(self.returns[0].T,self.returns[0])   

        for i in range(1,row): #for each day
            D_n=np.diag(S[i,:])
            D_n_inv=np.linalg.inv(D_n)
            resd=D_n_inv*self.returns[i,:].T  #calculating the standardized residuals
            Q[i]=(1-a-b)*Q_bar+a*resd*resd.T +b*Q[i-1]      #calculating matrix Q
            Q_star_inv=np.linalg.inv(np.sqrt(np.diag(np.diag(Q[i]))))
            R[i]=np.dot(Q_star_inv,np.dot(Q[i],Q_star_inv))
            H[i]=np.dot(D_n,np.dot(R[i],D_n))
        return H[-1]

    

### Class that will call different methods to calculate portfolio volatility  

In [5]:
class PortVolatility:
    """ This class implement different methods to calculate portfolio volatility.
    Take as inputs the returns series, the weights, an a string with the name of the 
    method we want to use for the estimation.self.portVolatility is an attrivute of the class that return the portfolio 
    volatility estimated with the method we specified.
    """
    
    def __init__(self,returns,weights,method):
        self.method=method.lower()
        self.returns=returns
        self.weights=weights
        

        if not isinstance(self.returns,(pd.Series,pd.DataFrame)):
            raise TypeError('returns must be a pandas dataframe or series')
        
        if self.method=='eqwma':
            """Estimation of the portfolio volatility with the EQWMA method"""
            if isinstance(self.returns,pd.Series):
                self.portVolatility=np.std(self.returns)
            else:
                covMatrix=self.returns.cov()
                self.portVolatility=np.sqrt(np.dot(self.weights.T,np.dot(covMatrix,self.weights)))
            
        elif self.method=='ewma':
            "Calling the method 'EWMA' for the estimation of portfolio volatility"
            self.portVolatility=self.EWMA()
            
        else:  
            """Calculates the portfolio volatility with GARCH(1,1) or DCC m-GARCH(1,1) method."""
            if isinstance(self.returns,pd.Series):
                garch=Garch1_1(returns)
                opt_params=garch.garch_opt()  # getting the optimal parameters c,a,b
                self.portVolatility=np.sqrt(garch.garch_variance(opt_params)[-1])
            else:
                garch=DCC_garch(returns)
                opt_params= garch.m_garch_opt()
                covMatrix=garch.m_garch_covariance(opt_params)
                self.portVolatility=np.sqrt(np.dot(self.weights.T,np.dot(covMatrix,self.weights)))
                         
            
    def EWMA(self):
        """method that estimates the portfolio volatility with the EWMA approach"""
        
        lambda_=0.94
        squaredReturns=np.power(self.returns,2)
        if isinstance(self.returns,(pd.Series)):
            variance=np.var(self.returns)
            for i in range(1,len(self.returns)):
                var=lambda_*variance+(1-lambda_)*squaredReturns.iloc[i-1]
                variance=var 
            return np.sqrt(variance)

        else:
            covMatrix=self.returns.cov() # we initialize the variances and covariances as the sample variances and covariances at n=0
            row,col=squaredReturns.shape  #number of data points and assets 
            
            for i in range(1,row): # for each day in the sample
                for j in range(col):# for each asset in the portfolio

                    #updating variances for each asset
                    updatedVar=lambda_*covMatrix.iloc[j,j]+(1-lambda_)*squaredReturns.iloc[i-1]
                    covMatrix.iloc[j,j]=updatedVar[j]

                    #updating covariances
                    cov=covMatrix.iloc[j,covMatrix.iloc[j,:].name!=covMatrix.columns]
                    updatedCov=lambda_*cov+(1-lambda_)*self.returns.loc[i-1,cov.index]*self.returns.loc[i-1,covMatrix.iloc[j,:].name]
                    covMatrix.loc[covMatrix.iloc[j,:].name,covMatrix.iloc[j,:].name!=covMatrix.columns]=updatedCov
            portVol=np.sqrt(np.dot(self.weights.T,np.dot(covMatrix,self.weights)))
            return portVol




## VALUE AT RISK MODELS

### This class implement different value at risk models for a single and multi-asset portfolio

In [6]:
class ValueAtRisk():
    def __init__(self,data,returns,alpha,P0,weights,h,model):
        
        
        """This class is going to calculate the h-day (1-alpha) Var of a portfolio. It takes as inputs 'data'
        which is the series of the closing prices of the assets included in the eportfolio,the series of returns,'alpha' or 
        significante level, the initial investment 'P0', the weights or proportion of the investment money allocated to each asset,
        'h' or the risk horizon and a string witht he name of the model we want to use for Var estimation."""
        
        self.model=model
        self.data=data
        self.returns=returns
        self.alpha=alpha
        self.P0=P0
        self.weights=weights
        self.h=h
        
        #checking for valid format of data 
        if not isinstance(self.data,(pd.Series,pd.DataFrame)) or\
        not isinstance(self.returns,(pd.Series,pd.DataFrame)):
            raise TypeError('data and returns must be a pandas dataframe or series')
            
        # case that a pandas dataframe contains only one column. We pass it to pandas.Series
        if isinstance(self.returns,pd.DataFrame) and self.returns.shape[1]==1:
            self.returns=self.returns.squeeze()
        elif isinstance(self.data,pd.DataFrame) and self.data.shape[1]==1:
            self.data=self.data.squeeze()
            
        
        if self.model=='hs':
            self.Var= self.historicalSimVar()[1]
        
        elif self.model=='mc':
            self.Var=self.MCVar()
            
        else:
            self.Var=self.parametricVar()
        

    def historicalSimVar(self):
        """ This method computes the h-day (1-alpha) Var with historical simulation method """

        df=self.data.div(self.data.shift(1)) #calculating the change in prices of each asset
        df.dropna(inplace=True)
        value_port=df*self.P0*self.weights #calculating the value of porfolio for each scenario
        
        if isinstance(self.data,pd.Series):
            df=df.to_frame()
            df['Portafolio Value']=value_port
        else:
            df['Portafolio Value']=value_port.sum(axis=1)

        df['Loss']=self.P0-df['Portafolio Value'] 
        df.sort_values(by=['Loss'],ascending=False,inplace=True) # sorting losses. Gains are negative losses
        
        return df.Loss, df.Loss.quantile(q=1-self.alpha,interpolation='higher')*np.sqrt(self.h)
    
    
    def parametricVar(self):
        """This method computes the h-day (1-alpha) Var with parametric method and assuming normal distribution """
        portVol=PortVolatility(self.returns,self.weights,self.model).portVolatility
        meanReturns=0
        portMean=0 #np.sum(self.weights.T*meanReturns) 
        stat=norm.ppf(1-self.alpha)
        Var=(stat*portVol-portMean)*self.P0*np.sqrt(self.h) 
        return Var
    
    def MCVar(self):
        """Method that computes the h-day (1-alpha) Var with Monte Carlo """
         
        n_sims=10000 # we can change the number of simulations 
        meanReturns=0
        
        if isinstance(self.returns,pd.Series):
            vol=np.std(self.returns)
             # independent random standard normal vectors for risk factor 
            Z=norm.ppf(np.random.uniform(size=n_sims))
            # calculate the portfolio daily profit and losses
            portLosses=meanReturns+Z*vol*self.P0*np.sqrt(self.h/252) 
            
        else:
            #we calculate the covariance matrix using the equally weighted method, but we can use the other method to calculate the covariance matrix
            covMatrix=self.returns.cov()
            #cholesky decomposition of the returns covariance matrix
            L = cholesky(covMatrix, lower=True) 
            #independent random standard normal vectors for each risk factor 
            Z=norm.ppf(np.random.uniform(size=(n_sims,len(self.weights))))
            #transformation of the independent standard normal vectors to correlated multivariate normal vectors 
            simReturns=np.array(meanReturns)+np.dot(Z,L)
            # simulating the portfolio gains and losses
            portLosses=np.sum(simReturns*self.weights, axis=1)*self.P0 
            
        loss=pd.Series(portLosses)
        loss.sort_values(ascending=False,inplace=True) # sorting losses. Gains are negative losses
        Var=loss.quantile(q=1-self.alpha,interpolation='higher')*np.sqrt(self.h)
        return Var
            
            



## MODEL VALIDATION


### Implementing a backtest 

In [7]:
class ModelValidation():
    """This class implement different methods and tests to validate the performance of our models"""
    
    def __init__(self,data,h,alpha,P0,weights,model,testSize=0.4):
        self.data=data
        self.h=h
        self.P0=P0
        self.alpha=alpha
        self.weights=weights
        self.model=model
        self.percentTestSample=testSize
  
    def backTest(self):
        """Method that implement backtesting of our models calculating the 1-day Var for each day in the test data and 
        comparing them with the actual profits and losses. Returns a dataframe containing three columns:the actual profit and losses,
        the 1-day (1-alpha) Var and the a column of exceedances or column indicating if the actual losses/gains are higher than Var (1) or lower (0).It also returns the 
        total numbe of exceedances"""
        
        dataSize=len(self.data)
        testSampleStart=int(dataSize-dataSize*self.percentTestSample) # observation where the test sample starts and the estimation sample ends

        #calculating the realized profits and losses. 
        returnsTest=self.data.iloc[testSampleStart:].pct_change()
        returnsTest.dropna(inplace=True)

        if isinstance(returnsTest,pd.DataFrame):
            realizedLosses=np.sum(returnsTest*self.P0*self.weights,axis=1)
        else: 
            realizedLosses=returnsTest*self.P0*self.weights
        realizedLosses=realizedLosses.reset_index(drop=True)
        #rolling window aproach: Estimation of Var for each of the days in the test sample.The estimation sample size is the same 
        #as we apply the rolling window approach.
        VarTest=np.full(shape=(len(realizedLosses)),fill_value=0.0) #place holder for Vars

        for i in range(len(realizedLosses)):

            estimationData=self.data.iloc[i:testSampleStart+i] 
            estimationReturns= estimationData.pct_change()
            estimationReturns.dropna(inplace=True)
            estimationReturns=estimationReturns.reset_index(drop=True)

            Var=ValueAtRisk(estimationData,estimationReturns,self.alpha,self.P0,self.weights,self.h,self.model).Var

            # so far we express Var with positive sing but for comparing with 
            #the realised losses we change the sign. Losses are negative and gains are positive
            VarTest[i]=-Var 
        df=pd.DataFrame({'Realized Losses':realizedLosses,'Var':pd.Series(VarTest)})
        df['Exceedances']=df.apply(lambda x:1 if x['Realized Losses']<x['Var'] else 0, axis=1) 
        exceedances=np.sum(df['Exceedances'])
        return df,exceedances

    def exactBinomCI(self,p,df,epsilon=0.05):
         
        """Estimating the lower and upper bound for the binomial confidence interval.
        'p' is the expected proportion of exceedances that equal to alpha,'n' is sample size,
        1-'epsilon' is the confidence that the expected proportion is in the interval."""
        n=len(df)
        lower=binom.ppf(epsilon/2,n,p)
        upper=binom.ppf(1-(epsilon/2),n,p)
        return lower,upper
    

    def ind_test(self,df,epsilon=0.05):
        """Computes the independence test. It takes as inputs the dataframe output in the backtesting step and the 
        significance level (epsilon) for the hipothesis testing. It returns 'independent' if we can not reject the null 
        HO that the exceedance in each day are independent of the past at the SL, otherwise the result is not independet"""
        n=len(df)
        n00=0
        n01=0
        n10=0
        n11=0
        for i in range(1,n-1):
            if df['Exceedances'].iloc[i-1]==0 and df['Exceedances'].iloc[i]==0:
                n00+=1
            elif df['Exceedances'].iloc[i-1]==0 and df['Exceedances'].iloc[i]==1:
                n01+=1
            elif df['Exceedances'].iloc[i-1]==1 and df['Exceedances'].iloc[i]==0:
                n10+=1
            else: n11+=1

        p01=n01/(n00+n01)
        p11=n11/(n10+n11)
        #probability of having a failure
        pUC=(n01+n11)/(n00+n01+n10+n11)

        if 0 in [n00,n01,n10,n11]:
            L=1
            if n00==0 or n01==0:
                L1=1
            else: L1=n00*np.log(1-p01)+n01*np.log(p01)
            
            if n10==0 or n11==0:
                L2=1
            else: L2=n10*np.log(1-p11)+n11*np.log(p11)
            lnRatio=L-L1-L2
        
        
        else:
            L=(n00+n10)*np.log(1-pUC)+(n01+n11)*np.log(pUC)
            L1=n00*np.log(1-p01)+n01*np.log(p01)
            L2=n10*np.log(1-p11)+n11*np.log(p11)
            lnRatio=L-L1-L2
      
        if -2*lnRatio>chi2.ppf(1-epsilon, df=1): #this p is the sl for the test too. we can change it 
            #h0: exceedances are independent at sl 
            return 'independent'
        else:
            #h1: exceedance are not independent at sl
            return 'not independent'

            
       
    def UCT(self,p,df,epsilon=0.05):
        """Performs the unconditional coverage test to compute how well the model estimates the actual portfolio risk.
        Takes as input p=alpha or probability of exceedance, the df output ffrom the backtesting step, and the significance 
        level (epsilon). The outputs is the result of the hipothesis testing: model no well calibrated if we have evidence 
        for rejecting the H0 that the exceedances are close to the expected value. Otherwise,we cannot reject the HO and
        we have evidence to conclude our model is well calibrated. """
        n=len(df)
        n1=np.sum(df['Exceedances']) # number exceedances
        n0=n-n1
        lnRatio=n1*np.log(p)+n0*np.log(1-p)-n1*np.log(n1/n) #n1/n= observed proportion of exceedances
        if -2*lnRatio>chi2.ppf(1-epsilon, df=1): # this p is the significance level for the h test too
            return 'Model no well calibrated'
        else:
            return 'Model well calibrated'
    

