# Week1-2

## t-test （stats - ttest）

In [None]:
# t-test
# stats -- generate distribution & sample

from scipy import stats
t = np.sqrt(num)*sample.mean/np.sqrt(sample.variance)
p = 2*(1-stats.t.cdf(abs(t),df=num-1))

## OLS


In [None]:
import statsmodels.api as sm
# parse the data into the dataframe
df=pd.read_csv("problem2.csv")
# seperate dependent variable and dependent variable
Y=df["y"]
x=df["x"]
# Our model needs an intercept so we add a column of 1s
X=sm.add_constant(x)
# build the regression model and fit
model = sm.OLS(Y,X)
res = model.fit()
# get some important statistics for the result
res.summary()

## MLE -- make it simple

In [None]:
from scipy.stats import t
from scipy.stats import norm
import statsmodels.api as sm

# Normal

# use MLE to do the regression
# using the norm distribution from scipy, we can write this likelihood simply as following
def _ll_norm(y, X, beta, sigma):
    resid=y-np.dot(X, beta)
    ll = norm.logpdf(resid, scale = sigma)
    return ll
# We create a new model class which inherits from GenericLikelihoodModel
class Normin(GenericLikelihoodModel):
    def __init__(self, endog, exog, **kwds):
        self.y=endog
        self.X=exog
        super().__init__(endog, exog, **kwds)

    def nloglikeobs(self, params):
        '''This function should return one evaluation of the negative log-likelihood function per observation in your dataset '''
        sigma = params[-1]
        beta = params[:-1]
        ll = _ll_norm(self.endog, self.exog, beta, sigma)
        return -ll

    def fit(self, start_params=None, maxiter=10000, maxfun=5000, **kwds):
        ''' start_params: a one-dimensional array of starting values needs to be provided. 
            The size of this array determines the number of parameters that 
            will be used in optimization
        '''
        # we have one additional parameter and we need to add it for summary
        self.exog_names.append('sigma')
        if start_params == None:
            # Reasonable starting values
            start_params = np.append(np.zeros(self.exog.shape[1]), 1)
            # intercept
            start_params[-2] = np.log(self.endog.mean())
        self.res = super().fit(start_params=start_params,
                                     maxiter=maxiter, maxfun=maxfun,
                                     **kwds)
        return self.res

    def predict(self):
        '''calculate the fitted y and residuals'''
        y_pre=np.dot(self.X,self.res.params[:2])
        err=self.y-y_pre
        y_pre=pd.DataFrame(y_pre)
        err=pd.DataFrame(err)
        df=pd.concat([self.y,self.X.x,y_pre,err],axis=1)
        df.columns = ['y','x','y_pre','resid']
        return df 
    
    def R2(self):
        y_pre=np.dot(self.X,self.res.params[:2])
        SSE=sum((self.y-y_pre)**2)
        Mean=self.y.mean()
        SST=sum((self.y-Mean)**2)
        R2=1-SSE/SST

        n=self.y.shape[0]
        p=1
        adj_R2=1-(1-R2)*(n-1)/(n-p-1)
        return R2,adj_R2


# T
def _ll_t(y, X, beta, df, sigma):
    resid=y-np.dot(X, beta)
    ll = t.logpdf(resid, df, scale=sigma)
    return ll
    
# We create a new model class which inherits from GenericLikelihoodModel
class Tin(GenericLikelihoodModel):
    def __init__(self, endog, exog, **kwds):
        self.y=endog
        self.X=exog
        super().__init__(endog, exog, **kwds)

    def nloglikeobs(self, params):
        '''This function should return one evaluation of the negative log-likelihood function per observation in your dataset '''
        df = params[-2]
        sigma = params[-1]
        beta = params[:-2]
        ll = _ll_t(self.endog, self.exog, beta, df, sigma)
        return -ll

    def fit(self, start_params=None, maxiter=10000, maxfun=5000, **kwds):
        ''' start_params: a one-dimensional array of starting values needs to be provided. 
            The size of this array determines the number of parameters that 
            will be used in optimization
        '''
        
        # we have three additional parameters and we need to add it for summary
        self.exog_names.append('df')
        self.exog_names.append('sigma')
        if start_params == None:
            # Reasonable starting values
            start_params = np.append(np.zeros(self.exog.shape[1]), np.array([50,1]))
            # intercept
            start_params[-3] = np.log(self.endog.mean())
        self.res = super().fit(start_params=start_params,
                                     maxiter=maxiter, maxfun=maxfun,
                                     **kwds)
        return self.res

    def predict(self):
        '''calculate the fitted y and residuals'''
        y_pre=np.dot(self.X,self.res.params[:2])
        err=self.y-y_pre
        y_pre=pd.DataFrame(y_pre)
        err=pd.DataFrame(err)
        df=pd.concat([self.y,self.X.x,y_pre,err],axis=1)
        df.columns = ['y','x','y_pre','resid']
        return df

    def R2(self):
        y_pre=np.dot(self.X,self.res.params[:2])
        SSE=sum((self.y-y_pre)**2)
        Mean=self.y.mean()
        SST=sum((self.y-Mean)**2)
        R2=1-SSE/SST

        n=self.y.shape[0]
        p=1
        adj_R2=1-(1-R2)*(n-1)/(n-p-1)
        return R2,adj_R2


In [None]:
# read data from problem2.csv
df=pd.read_csv("problem2.csv")
y = df.y
X = df[['x']].copy()
X["constant"] = 1

In [None]:
# build the model calss
mod_norm = Normin(y, X)
# condcut fitting
res_norm = mod_norm.fit()
# get the parameters and corresponding p-value
print(res_norm.summary())
# calulate the R2, adjusted R2, AIC and BIC
R2_norm=mod_norm.R2()
print("R2 = {} \nadj_R2 = {}".format(R2_norm[0],R2_norm[1]))
print("AIC = {}".format(res_norm.aic))
print("BIC = {}".format(res_norm.bic))


# build the model calss
mod_T = Tin(y, X)
# condcut fitting
res_T = mod_T.fit()
# get the parameters and corresponding p-value
print(res_T.summary())
# calulate the R2, adjusted R2, AIC and BIC
R2_T=mod_T.R2()
print("R2 = {} \nadj_R2 = {}".format(R2_T[0],R2_T[1]))
print("AIC = {}".format(res_T.aic))
print("BIC = {}".format(res_T.bic))

## AR,MA -- statsmodel ARIMA, sample

In [None]:
import statsmodels.api as sm
import numpy as np
from scipy import stats
import matplotlib.gridspec as gridspec

def simulate_AR1_process(alpha,beta,sigma,sample_size):
    ''' AR(1)
        y_t = alpha + beta*y_t-1 + e, e ~ N(0,sigma)
    '''
    x0=alpha/(1-beta)
    x=np.zeros(sample_size+1)
    x[0]=x0
    eps=stats.norm.rvs(size=sample_size,scale=sigma)
    for i in range(sample_size):
        x[i+1]=alpha+beta*x[i]+eps[i]
    return x

def simulate_AR2_process(alpha,beta,sigma,sample_size):
    ''' AR(2)
        y_t = alpha + beta[0]*y_t-1 + beta[1]*y_t-2 + e, e ~ N(0,sigma)
    '''
    x0=alpha/(1-sum(beta))
    eps=stats.norm.rvs(size=sample_size+1,scale=sigma)
    x=np.zeros(sample_size+2)
    x[0]=x0
    x[1]=x0+eps[0]
    for i in range(sample_size):
        x[i+2]=alpha+np.dot(beta,x[i:i+2])+eps[i+1]
    return x

def simulate_AR3_process(alpha,beta,sigma,sample_size):
    ''' AR(3)
        y_t = alpha + beta[0]*y_t-1 + beta[1]*y_t-2 + beta[2]*y_t-3 + e, e ~ N(0,sigma)
    '''
    x0=alpha/(1-sum(beta))
    eps=stats.norm.rvs(size=sample_size+2,scale=sigma)
    x=np.zeros(sample_size+3)
    x[0]=x0
    x[1]=x[0]+eps[0]
    x[2]=x[1]+eps[1]
    for i in range(sample_size):
        x[i+3]=alpha+np.dot(beta,x[i:i+3])+eps[i+2]
    return x


def simulate_MA1_process(alpha,beta,sigma,sample_size):
    ''' MA(1)
        y_t = alpha + e_t + beta*e_t-1, e ~ N(0,sigma)
    '''
    x0=alpha
    x=np.zeros(sample_size+1)
    x[0]=x0
    eps=stats.norm.rvs(size=sample_size+1,scale=sigma)
    for i in range(sample_size):
        x[i+1]=alpha+eps[i+1]+beta*eps[i]
    return x

def simulate_MA2_process(alpha,beta,sigma,sample_size):
    ''' MA(2)
        y_t = alpha + e_t + beta[0]*e_t-1 + beta[1]*e_t-2, e ~ N(0,sigma)
    '''
    x0=alpha
    eps=stats.norm.rvs(size=sample_size+2,scale=sigma)
    x=np.zeros(sample_size+2)
    x[0]=x0
    x[1]=x0
    for i in range(sample_size):
        x[i+2]=alpha+eps[i+2]+np.dot(beta,eps[i:i+2][::-1])
    return x

def simulate_MA3_process(alpha,beta,sigma,sample_size):
    ''' MA(3)
        y_t = alpha + e_t + beta[0]*e_t-1 + beta[1]*e_t-2 + beta[2]*e_t-3, e ~ N(0,sigma)
    '''
    x0=alpha
    eps=stats.norm.rvs(size=sample_size+3,scale=sigma)
    x=np.zeros(sample_size+3)
    x[0]=x0
    x[1]=x[0]
    x[2]=x[0]
    for i in range(sample_size):
        x[i+3]=alpha+eps[i+3]+np.dot(beta,eps[i:i+3][::-1])
    return x


In [None]:
# generate sample for AR(1)
alpha = 0.75
beta = 0.4
sigma = 0.8
sample_size = 2000
ar1_sample=simulate_AR1_process(alpha,beta,sigma,sample_size)

# generate sample for AR(2)
alpha = 0.75
beta = np.array([0.4,0.3])
sigma = 0.8
sample_size = 2000
ar2_sample=simulate_AR2_process(alpha,beta,sigma,sample_size)

# generate sample for AR(3)
alpha = 0.75
beta = np.array([0.2,0.3,0.2])
sigma = 0.8
sample_size = 2000
ar3_sample=simulate_AR3_process(alpha,beta,sigma,sample_size)

In [None]:
# plot for AR(1)
fig = plt.figure(tight_layout=True,figsize=[10,8])
gs = gridspec.GridSpec(2, 2)
ax = fig.add_subplot(gs[0, :])
ax.plot(ar1_sample)
ax.set_title("AR 1")
ax = fig.add_subplot(gs[1, 0])
sm.graphics.tsa.plot_acf(ar1_sample,ax=ax,lags=30)
ax = fig.add_subplot(gs[1, 1])
sm.graphics.tsa.plot_pacf(ar1_sample,method="ywm",ax=ax,lags=30)
plt.show()

# plot for AR(2)
fig = plt.figure(tight_layout=True,figsize=[10,8])
gs = gridspec.GridSpec(2, 2)
ax = fig.add_subplot(gs[0, :])
ax.plot(ar2_sample)
ax.set_title("AR 2")
ax = fig.add_subplot(gs[1, 0])
sm.graphics.tsa.plot_acf(ar2_sample,ax=ax,lags=30)
ax = fig.add_subplot(gs[1, 1])
sm.graphics.tsa.plot_pacf(ar2_sample,method="ywm",ax=ax,lags=30)
plt.show()

# plot for AR(3)
fig = plt.figure(tight_layout=True,figsize=[10,8])
gs = gridspec.GridSpec(2, 2)
ax = fig.add_subplot(gs[0, :])
ax.plot(ar3_sample)
ax.set_title("AR 3")
ax = fig.add_subplot(gs[1, 0])
sm.graphics.tsa.plot_acf(ar3_sample,ax=ax,lags=30)
ax = fig.add_subplot(gs[1, 1])
sm.graphics.tsa.plot_pacf(ar3_sample,method="ywm",ax=ax,lags=30)
plt.show()

In [None]:
# generate sample for MA(1)
alpha = 0.75
beta = 0.4
sigma = 0.8
sample_size = 2000
ma1_sample=simulate_MA1_process(alpha,beta,sigma,sample_size)

# generate sample for MA(2)
alpha = 0.75
beta = np.array([0.4,0.3])
sigma = 0.8
sample_size = 2000
ma2_sample=simulate_MA2_process(alpha,beta,sigma,sample_size)

# generate sample for MA(3)
alpha = 0.75
beta = np.array([0.2,0.3,0.2])
sigma = 0.8
sample_size = 2000
ma3_sample=simulate_MA3_process(alpha,beta,sigma,sample_size)

In [None]:
# plot for MA(1)
fig = plt.figure(tight_layout=True,figsize=[10,8])
gs = gridspec.GridSpec(2, 2)
ax = fig.add_subplot(gs[0, :])
ax.plot(ma1_sample)
ax.set_title("MA 1")
ax = fig.add_subplot(gs[1, 0])
sm.graphics.tsa.plot_acf(ma1_sample,ax=ax,lags=30)
ax = fig.add_subplot(gs[1, 1])
sm.graphics.tsa.plot_pacf(ma1_sample,method="ywm",ax=ax,lags=30)
plt.show()

# plot for MA(2)
fig = plt.figure(tight_layout=True,figsize=[10,8])
gs = gridspec.GridSpec(2, 2)
ax = fig.add_subplot(gs[0, :])
ax.plot(ma2_sample)
ax.set_title("MA 2")
ax = fig.add_subplot(gs[1, 0])
sm.graphics.tsa.plot_acf(ma2_sample,ax=ax,lags=30)
ax = fig.add_subplot(gs[1, 1])
sm.graphics.tsa.plot_pacf(ma2_sample,method="ywm",ax=ax,lags=30)
plt.show()

# plot for MA(3)
fig = plt.figure(tight_layout=True,figsize=[10,8])
gs = gridspec.GridSpec(2, 2)
ax = fig.add_subplot(gs[0, :])
ax.plot(ma3_sample)
ax.set_title("MA 3")
ax = fig.add_subplot(gs[1, 0])
sm.graphics.tsa.plot_acf(ma3_sample,ax=ax,lags=30)
ax = fig.add_subplot(gs[1, 1])
sm.graphics.tsa.plot_pacf(ma3_sample,method="ywm",ax=ax,lags=30)
plt.show()

# Week 3

## Cholesky Factorization

In [None]:
class NotPsdError(Exception):
    ''' 
    Used for expection raise if the input matrix is not sysmetric positive definite 
    '''
    pass

class chol_psd():
    '''
    Cholesky Decompstion: Sysmetric Positive Definite matrix could use Cholesky 
    algorithm to fatorize the matrix to the product between a lower triangle matrix and
    upper triangle matrix

    Parameter:
        matrix  --  Sysmetric Positive Definite (or Positive Semi-definite) 
                    matrix needed to do Cholesky Factorization.
    
    Formula: 
        matrix=L*L.T

    Usage:
        Chol_model=chol_psd(matrix)
        root=Chol_model.root
    '''
    # initialization
    def __init__(self,matrix):
        self.__psd=matrix
        self.run()

    # Perform the Cholesky Factorization
    def run(self):
        n=self.__psd.shape[0]
        root=np.zeros([n,n])
        for i in range(n):
            # diagonal
            root[i][i] = self.__psd[i][i] - root[i][:i] @ root[i][:i].T
            root[i][i]=0 if 0>=root[i][i]>=-1e-8 else root[i][i]
            # if the diagonal element is less than -1e-8, it might not be PSD
            if root[i][i]<0:
                raise NotPsdError("Not PSD!")
            root[i][i]=np.sqrt(root[i][i])
            
            #below the diagonal
            # if diagonal element is zero, set the following element of that column to be zero too
            if root[i][i]==0:
                continue
            for j in range(i+1,n):
                root[j][i]=(self.__psd[j][i]-root[i,:i] @ root[j,:i])/root[i][i]
        self.__root=root
        self.__ispsd=True

    @property
    def root(self):
        return self.__root   
    
    @property
    def ispsd(self):
        return self.__ispsd 

In [None]:
# Test

# PD
sigma=np.full([5,5],0.9)
np.fill_diagonal(sigma, 1)
root=chol_psd(sigma).root
print(np.allclose(root@root.T,sigma))

# PSD
sigma[0][1]=1
sigma[1][0]=1
v,c=np.linalg.eig(sigma)
root=chol_psd(sigma).root
print(np.allclose(root@root.T,sigma))

# not PSD
sigma[0][1]=0.7357
sigma[1][0]=0.7357
root=chol_psd(sigma).root
print(np.allclose(root@root.T,sigma))

## Use Cholesky Factorization to test whether a matrix is PSD or not

In [None]:
class NotSysmetricError(Exception):
    ''' 
    Used for expection raise if the input matrix is not sysmetric
    '''
    pass

class NegativeEigError(Exception):
    ''' 
    Used for expection raise if matrix has the negative eigvalue
    '''
    pass

class PSD:
    """
    PSD class is used for Positive Semi-Definite Matrix confirmation.
    """
    @staticmethod
    def confirm(psd):
        # make sure sysmetric
        if not np.allclose(psd,psd.T):
            raise NotSysmetricError("Matrix does not equal to Matrix.T")
        # Make sure no negative eigenvalues
        eig_val=np.linalg.eigvals(psd)
        neg_eig=len(eig_val[eig_val<0])
        # No negative eigenvalues or Pass the Cholesky algorithm
        if neg_eig==0 or chol_psd(psd).ispsd:
            print("Matrix is Sysmetric Positive Definite!")
            return True
        else:
            raise NegativeEigError("Matrix has negative eigenvalue.")

In [None]:
PSD.confirm(RJ_psd)

## Exponentially Weighted Covariance

In [None]:
#  Exponentially Weighted Moving Average for Volatility Model -- Exponentially Weighted Covariance
class EWMA:
    """
    Calculate the Exponentially Weighted Covariance & Correaltion Matrix
    
    Parameter: 
        data (np.array)  -- return data for calculating Covariance & Correaltion Matrix (array)
        lambda_  -- smoothing parameter (less than 1)
        flag (optional)  -- a flag to dertermine whether to subtract mean from data.
                            if it set False, data would not subtract its mean.

    fomula: \sigma_t^2=\lambda \sigma_{t-1}^2+(1-\lambda)r_{t-1}^2

    Usage:  
        model=EWMA(data,0.97)
        cov_mat=model.cov_mat
        corr_mat=model.corr_mat
    """
    # initialization 
    def __init__(self,data,lambda_,flag=False):
        self.__data=data if flag==False else data-data.mean(axis=0)
        self.__lambda=lambda_
        self.get_weight() 
        self.cov_matrix()
        self.corr_matrix()

    # calculate the weight matrix
    def get_weight(self):
        n=self.__data.shape[0]
        weight_mat=[(1-self.__lambda)*self.__lambda**(n-i-1) for i in range(n)]
        weight_mat=weight_mat/sum(weight_mat)
        self.__weight_mat=np.diag(weight_mat)

    # calculate cov_matrix
    def cov_matrix(self):
        self.__cov_mat=self.__data.T @ self.__weight_mat @ self.__data

    # calculate corr_matrix
    def corr_matrix(self):
        n=self.__data.shape[1]
        invSD=np.sqrt(1./np.diag(self.__cov_mat))
        invSD=np.diag(invSD)
        self.__corr_mat=invSD @ self.__cov_mat @ invSD
        return self.__corr_mat

    # plot the cumulative weight
    def plot_weight(self,k=None,ax=None,label=None):
        weight=np.diag(self.__weight_mat)[::-1]
        cum_weight=weight.cumsum()/weight.sum()
        sns.lineplot(cum_weight,ax=ax,label="{:.2f}".format(label) if label!=None else "")
        if ax!=None:
            ax.set_xlabel('Time Lags')
            ax.set_ylabel('Cumulative Weights')
            ax.set_title("Weights of differnent lambda")
        ax.legend(loc='best')

    @property
    def cov_mat(self):
        return self.__cov_mat    

    @property
    def corr_mat(self):
        return self.__corr_mat

In [None]:
# parse the .csv file into dataframe
df=pd.read_csv('DailyReturn.csv',index_col=0)
# transfrom dataframe to array to speed up matrix calculation
data=np.array(df)
# EWMA
lambdaValue=0.97
model=EWMA(data,lambdaValue) 
cov_mat=model.cov_mat 

## Generate Non-PSD Correlation Matrix

In [None]:
class Non_psd_mat:
    """
    Used to generate the non-positive semi-definite matrix
    """
    def non_psd_mat(self,n):
        corr = np.full((n, n), 0.9)
        np.fill_diagonal(corr, 1)
        corr[0, 1] = 0.7357  
        corr[1, 0] = 0.7357 
        return corr

In [None]:
non_psd_mat(10)

## Weighted Frobenius Norm

In [2]:
class Weighted_F_norm:
    '''
    Given the weight matrix, calculate the Weighted Frobenius Norm. (Assume it's diagonal)
    '''
    def compare_F(self,mat_a,mat_b,mat_w):
        '''Give two matrix, use Weighted Frobenius Norm to calculate how close they are'''
        err = mat_a-mat_b #difference
        weighted_err = np.sqrt(mat_w) @ err @ np.sqrt(mat_w) 
        w_F_norm = np.sqrt(np.square(weighted_err).sum())
        return w_F_norm
    
    def calculate_F(self,mat,mat_w):
        "Given one matrix, calculate its Weighted Frobenius Norm"
        weighted_err = np.sqrt(mat_w) @ mat @ np.sqrt(mat_w)
        w_F_norm = np.sqrt(np.square(weighted_err).sum())
        return w_F_norm

## Rebonato and Jackel's Method (Non-PSD -> PSD)

In [3]:
class near_psd(Weighted_F_norm):
    '''
    Rebonato and Jackel's Method to get acceptable PSD matrix 
    
    Parameters:
        not_psd -- the matrix which is not positive semi-definite matrix
        weight  -- used for calculating the Weighted Frobenius Norm (Now assume it's diagonal)

    Usage:
        near_psd_model=near_psd(non_psd,weight)
        psd=near_psd_model.psd
    '''
    # initialization
    def __init__(self,not_psd,weight):
        self.__not_psd=not_psd
        self.__weight=weight
        self.run() # main function
        self.F_compare_norm(weight) # Weighted Frobenius Norm
        
    def run(self):
        n=self.__not_psd.shape[0]
        # Set the weight matrix to be identity matrix
        invSD = np.eye(n)
        corr=self.__not_psd
        # if the matrix is not correlation matrix, convert it to the correlation matrix
        if not np.allclose(np.diag(self.__not_psd),np.ones(n)):
            invSD=np.diag(1/np.sqrt(np.diag(self.__not_psd)))
            corr=invSD @ self.__not_psd @ invSD
        eig_val,eig_vec=np.linalg.eigh(corr) # eigenvalues & eigenvectors 
        eig_val[eig_val<0]=0 # adjust the negative value to 0
        # get the scale matrix
        scale_mat = np.diag(1/((eig_vec * eig_vec) @ eig_val))
        B = np.sqrt(scale_mat) @ eig_vec @ np.sqrt(np.diag(eig_val))
        corr=B @ B.T
        # convert it back into original form
        SD=np.diag(1/np.diag(invSD))
        psd = SD @ corr @ SD
        self.__psd = psd

    # Weighted Frobenius Norm of the difference between near_psd and ono_psd
    def F_compare_norm(self,weight):
        self.__F = self.compare_F(self.__psd,self.__not_psd,weight)

    @property
    def psd(self):
        return self.__psd
    
    @property
    def F(self):
        return self.__F
        

In [None]:
mat = non_psd_mat(10) # np.array
weight=np.eye(10)
near_psd(mat,weight)

## Higham's Method to find nearest PSD correlation matrix

In [None]:
class Higham_psd(Weighted_F_norm,chol_psd):
    '''
    Higham's Method to get nearest PSD matrix under the measure of Weighted Frobenius Norm
    Now Assume the Weighted Matrix is diagonal
    
    Parameters:
        not_psd -- the matrix which is not positive semi-definite matrix
        weight  -- used for calculating the Weighted Frobenius Norm (Assume it's diagonal)
        epsilon -- the acceptable precision between near_psd and non_psd
        max_iter -- maximum iteration number

    Usage:
        Higham_psd_model=Higham_psd(non_psd,weight)
        psd=Higham_psd_model.psd
    '''
    # initialization
    def __init__(self,not_psd,weight,epsilon=1e-9,max_iter=1e10):
        self.__not_psd=not_psd
        self.__weight=weight
        self.run(epsilon=epsilon,max_iter=max_iter)
        self.F_compare_norm(weight)

    def Projection_U(self,A):
        # Projection to the Space U
        # we assume that the weight matrix is diagonal
        A_copy=A.copy()
        np.fill_diagonal(A_copy,1)
        return A_copy
        
    def Projection_S(self,A):
        # Projection to the Space S
        w_sqrt=np.sqrt(self.__weight)
        eig_val,eig_vec=np.linalg.eigh(w_sqrt @ A @ w_sqrt)
        eig_val[eig_val<0]=0
        A_plus=eig_vec @ np.diag(eig_val) @ eig_vec.T
        w_sqrt_inv=np.diag(1/np.diag(w_sqrt))
        ans = w_sqrt_inv @ A_plus @ w_sqrt_inv
        return ans
    
    def run(self,epsilon,max_iter):
        # iterating process
        Y=self.__not_psd
        F1=np.inf
        F2=self.calculate_F(Y,self.__weight)
        delta=0
        iteration=0
        neg_eig=0
        while abs(F1-F2)>epsilon or neg_eig>0:
            R=Y-delta
            X=self.Projection_S(R)
            delta=X-R
            Y=self.Projection_U(X)
            F1,F2=F2,self.calculate_F(Y,self.__weight)
            iteration+=1
            if iteration>max_iter:
                break
            eig_val=np.linalg.eigvals(Y)
            neg_eig=len(eig_val[eig_val<0])

        self.__F_norm=F2
        self.__psd=Y

    def F_compare_norm(self,weight):
        self.__F = self.compare_F(self.__psd,self.__not_psd,weight)
        
    @property
    def psd(self):
        return self.__psd 
    @property
    def F_norm(self):
        return self.__F_norm 
    
    @property
    def F(self):
        return self.__F

In [None]:
mat = non_psd_mat(10) # np.array
weight=np.eye(10)
Higham_psd(mat,weight)

## Principal Component Analysis (PCA)

In [None]:
class PCA:
    """
    Reducing the dimensionality of the dataset.
    
    Parameter:
        cov_mat -- covarinance matrix for dimensionality reduction
        threshold(optional) -- the threshold of cumulative variance explained
    
    Usage:
        PCA_model=PCA(cov_mat,threshold)
        PCA_model.plot()
        PCA_model.pct_explain_val_vec()
    """
    # initialization
    def __init__(self,cov_mat,threshold=None):
        self.__cov_mat=cov_mat
        self.__threshold=threshold
        self.run()
    
    # Conduct the PCA
    def run(self):
        self.__eig_val,self.__eig_vec = np.linalg.eigh(self.__cov_mat)
        # pick the eigenvalues which is bigger than 0 and corresponding eigenvector   
        idx=self.__eig_val>1e-8
        self.__eig_val = self.__eig_val[idx]
        self.__eig_vec = self.__eig_vec [:,idx]       
        # sort since the result given by numpy is lowest to highest, flip them up
        sorted_indices = np.argsort(self.__eig_val)
        self.__eig_val=self.__eig_val[sorted_indices[::-1]]
        self.__eig_vec=self.__eig_vec[:,sorted_indices[::-1]]
        

    # calculate the cumulative percent of variance explained
    def percent_expalined(self,k=None):
        k = self.__eig_val.shape[0] if k==None else k
        k_eig_val=self.__eig_val[:k]

        return k_eig_val.cumsum()/k_eig_val.sum()

    # plot the cumulative percent of variance explained
    def plot(self,k=None,ax=None,label=None):
        explain=self.percent_expalined(k)
        sns.lineplot(explain,ax=ax,label="{:.2f}".format(label) if label!=None else "" )
        if ax!=None:
            ax.set_xlabel('Number of Component')
            ax.set_ylabel('Cumulative Variance Explained')
            ax.set_title("Cumulative Variance Explained via different lambda")
        ax.legend(loc='best')
    
    # Given the threshold, calculate the needed eigenvalues and corresponding eigenvectors
    def pct_explain_val_vec(self):
        eig_val=self.__eig_val
        pct_cum_var=eig_val.cumsum()/eig_val.sum()
        pct_cum_var[-1]=1
        # if cumulative percent of variance explained is larger than threshold, then break
        k = bisect.bisect_left(pct_cum_var, self.__threshold) 
        return self.__eig_val[:k+1],self.__eig_vec[:,:k+1]

In [None]:
# parse the .csv file into dataframe
df=pd.read_csv('DailyReturn.csv',index_col=0)
# transfrom dataframe to array to speed up matrix calculation
data=np.array(df)
# PCA -- Use PCA and plot the cumulative variance explained by each eigenvalue for λ ∈ (0, 1) each chosen.
PCA_model=PCA(cov_mat) 
PCA_model.plot(ax=ax[0],label=i) 
model.plot_weight(ax=ax[1],label=i) 

## Simulator for Direct Simulation & PCA_Simulation

In [None]:
class Simulator(Weighted_F_norm):
    """
    Simulator for DirectSimulation & PCA_Simulation

    Parameter: 
        cov_mat -- covariance matrix
        draw_num -- the number of sample draw from simulation

    Usage:
        simulator = Simulator(cov_mat,draw_num)
        simulator.DirectSimulation()
        simulator.PCA_Simulation(pct)
    """

    def __init__(self,cov_mat,draw_num):
        self.__cov_mat=cov_mat
        self.__draw_num=draw_num
        self.__Direct_run_time=None
        self.__PCA_run_time=None
        self.__X=None # generated sample

    def DirectSimulation(self):
        """Cholesky"""
        t = time.time()
        root=chol_psd(self.__cov_mat).root
        n=root.shape[0]
        rand_norm=np.random.normal(0, 1, size=(n, self.__draw_num))
        X= root @ rand_norm
        self.__Direct_run_time = time.time()-t
        self.__X=X #simulated sample
        return X 

    def PCA_Simulation(self,threshold):
        """PCA"""
        t = time.time()
        PCA_model=PCA(self.__cov_mat,threshold=threshold)
        eig_val,eig_vec=PCA_model.pct_explain_val_vec()
        B= eig_vec @ np.diag(np.sqrt(eig_val))
        n=B.shape[1]
        rand_norm=np.random.normal(0, 1, size=(n, self.__draw_num))
        X= B @ rand_norm
        self.__PCA_run_time = time.time()-t
        self.__X=X #simulated sample
        return X

    # get the F norm of difference between covariance of simulation and original covariance
    def err_F_norm(self):
        n=self.__cov_mat.shape[0]
        w=np.eye(n)
        return self.compare_F(self.__cov_mat,np.cov(self.__X),w)
     
    @property
    def Direct_run_time(self):
        return self.__Direct_run_time

    @property
    def PCA_run_time(self):
        return self.__PCA_run_time

In [None]:
simulator = Simulator(cov_mat,draw_num)
sample1=simulator.DirectSimulation()
sample2=simulator.PCA_Simulation(pct)

# Week4

## Calculate Return

In [None]:
def return_calculate(Price,option="DISCRETE",rm_means=True):
    '''
        Provide two ways to calculate the return from Price dataframce.
        The date should be accumulative(e.g. 10 Oct [1st row], 11 Oct [2nd row]....)
    '''
    # calculate the log normal return 
    if option == 'CONTINUOUS':
        returns = np.log(df/df.shift()).dropna()
    # calculate the discrete return 
    elif  option == 'DISCRETE':
        returns = df.pct_change().dropna()
    # other undefined option will cause error
    else:
        raise Exception("Unknown Option!")
    # remove mean from the returns
    return returns if rm_means==False else returns-returns.mean()

## Generalized T distribution with mean (MLE Fitter)

In [None]:
class T_fitter:
    '''T distribution MEL fitter'''
    def ll_t(self,parameter,x):
        # log likelihood 
        ll=np.sum(stats.t.logpdf(x=x,df=parameter[0],loc=0,scale=parameter[1])) # assume mean to be 0
        return -ll

    def MLE(self,x):
        cons=[ {'type':'ineq', 'fun':lambda x:x[1]} ] # standard deviation is non-negative
        parameter = np.array([x.size-1,2])
        MLE = minimize(self.ll_t, parameter, args = x, constraints = cons) # MLE
        return MLE.x

## Single Asset VaR under different distribution (Normal, T, AR(1), Historical) --  需要修改

In [None]:
class VaR:
    '''Calculate the VaR of 1D array or dataframe of return due to specific distribution'''
    def __init__(self,data,option="Absolute",alpha=0.05):
        # Absolute Value at risk or Relative Value at risk
        if option != "Absolute" and option != "Relative":
            raise Exception('Unknown option!')
        self.__option=option
        # 1-alpha is confidence level
        self.__alpha=alpha
        # returns data (DataFrame)
        self.__data=data

    def normal(self,option='Normal',plot=False):
        '''Assume returns follow normal distribution'''
        # assume mean to be 0
        mu=0
        # calculate the standard deviation
        if option == 'Normal': # use normal standard deviation
            std=self.__data.std() 
        elif option == 'EWMA': # use EW standard deviation
            model=project3.EWMA(META,0.94) # assume lambda = 0.94
            std=np.sqrt(model.cov_mat)
        else: 
            raise Exception('Unknown option!')
         # calculate the VaR
        if self.__option=="Absolute":
            VaR=-stats.norm.ppf(self.__alpha,loc=mu,scale=std)
        else:
            VaR=self.__data.mean()-stats.norm.ppf(self.__alpha,loc=mu,scale=std)

        if plot:
            # plot the normal Probability density function
            max_val=self.__data.max()
            min_val=self.__data.min()
            x=np.linspace(min_val,max_val,1000)
            y=stats.norm.pdf(x=x,loc=mu,scale=std)
            plt.plot(x,y,color='brown')
            # fill VaR area
            plt.fill_between(x,y,where=x<-VaR,color="red", alpha=0.3)
            # plot the return data & its empirical kde
            sns.histplot(self.__data,kde=True,stat='density')
            # plot the VaR
            plt.axvline(-VaR,color='#FF6347')
            if option == 'Normal':
                plt.title("Normal")
                plt.legend(['Normal','VaR','historical'])
            else:
                plt.title("Normal with EW standard deviation")
                plt.legend(['Normal','VaR','historical'])
        return VaR
        
    def T_dist(self,plot=False):
        para=T_fitter().MLE(self.__data)
        mu=0 # assume mean to be 0
        df=para[0] # degree of freedom of T distribution
        std=para[1] # standard deviation
        # calculate the VaR
        if self.__option=="Absolute":
            VaR=-stats.t.ppf(self.__alpha,df=df,loc=mu,scale=std)
        else:
            VaR=self.__data.mean()-stats.t.ppf(self.__alpha,df=df,loc=mu,scale=std)
        if plot:
            # plot the T Probability density function
            max_val=self.__data.max()
            min_val=self.__data.min()
            x=np.linspace(min_val,max_val,1000)
            y=stats.t.pdf(x=x,df=df,loc=mu,scale=std)
            plt.plot(x,y,color='brown')
            # fill VaR area
            plt.fill_between(x,y,where=x<-VaR,color="red", alpha=0.3)
            # plot the return data & its empirical kde
            sns.histplot(self.__data,kde=True,stat='density')
            # plot the VaR
            plt.axvline(-VaR,color='#FF6347')
            plt.title("MLE fitted T distribution")
            plt.legend(['T distribution','VaR','historical'])
        return VaR

    def AR_1(self,plot=False):
        # Use AR(1) fitter to find the best cofficience
        mod = sm.tsa.arima.ARIMA(self.__data, order=(1, 0, 0))
        res = mod.fit()
        const=0 # constant number
        ar_L1=res.params[1] # cofficience of Lag 1
        sigma2=res.params[2] # variance of error term

        # AR(1) is also normal
        mu=0
        std=np.sqrt(sigma2/(1-ar_L1))

        # calculate the VaR
        if self.__option=="Absolute":
            VaR=-stats.norm.ppf(self.__alpha,loc=mu,scale=std)
        else:
            VaR=self.__data.mean()-stats.norm.ppf(self.__alpha,loc=mu,scale=std)
        
        if plot:
            # plot the AR(1) Probability density function
            max_val=self.__data.max()
            min_val=self.__data.min()
            x=np.linspace(min_val,max_val,1000)
            y=stats.norm.pdf(x=x,loc=mu,scale=std)
            plt.plot(x,y,color='brown')
            # fill VaR area
            plt.fill_between(x,y,where=x<-VaR,color="red", alpha=0.3)
            # plot the return data & its empirical kde
            sns.histplot(self.__data,kde=True,stat='density')
            # plot the VaR
            plt.axvline(-VaR,color='#FF6347')
            plt.title("fitted AR(1)")
            plt.legend(['fitted AR(1)','VaR','historical'])
        return VaR
    
    def historical_simulation(self,alpha,plot=False):
        size=self.__data.shape[0]
        rt=self.__data.sample(n=size,replace=True)
        # calculate the VaR
        if self.__option=="Absolute":
            VaR=-np.quantile(rt,alpha)
        else:
            VaR=rt.mean()-np.quantile(rt,alpha)
        if plot:
            # plot the historical kde
            sns.kdeplot(rt,color='brown')

            # plot the return data & its empirical kde
            ax = sns.histplot(self.__data,kde=True,stat='density')
            
            # fill VaR area
            # Get the two lines from the axes to generate shading
            l = ax.lines[0]
            # Get the xy data from the lines so that we can shade
            x = l.get_xydata()[:,0]
            y = l.get_xydata()[:,1]
            ax.fill_between(x,y,where=x<-VaR,color="red", alpha=0.3)

            # plot the VaR
            plt.axvline(-VaR,color='#FF6347')
            plt.title("historical simulation")
            plt.legend(['historical simulation','VaR','historical'])
        return VaR

In [None]:
# parse the DailyPrices data into dataframe
df=pd.read_csv('DailyPrices.csv',index_col='Date')
# calculate the returns
rt=return_calculate(df)
META=rt['META']

# Using a normal distribution
VaR_normal=VaR(META).normal(plot=True)
print("VaR (Normal Distribution): {}".format(VaR_normal))
# Using a normal distribution with an Exponentially Weighted variance 
VaR_normal_EWMA=VaR(META).normal('EWMA',plot=True)
print("VaR (Normal Distribution + EWMA): {}".format(VaR_normal_EWMA))
# Using a MLE fitted T distribution
VaR_T=VaR(META).T_dist(plot=True)
print("VaR (MLE Fitted T Distribution): {}".format(VaR_T))
# Using a fitted AR(1) model.
VaR_AR_1=VaR(META).AR_1(plot=True)
print("VaR (AR(1) Process): {}".format(VaR_AR_1))
# Using a Historic Simulation.
VaR_normal_historical=VaR(META).historical_simulation(0.05,plot=True)
print("VaR (Historical sampling): {}".format(VaR_normal_historical))

## Portfolio Var (Delta Normal, Normal MC, Historical Simulation) -- 也要改

In [None]:
class VaR_portfolio:
    '''Differnet Method to get the VaR of stock portfolio'''
    # initialization
    def __init__(self,portfolio,returns,price):
        '''The fromat of data should be same as the file in the current directory.'''
        # information about portfolio
        self.__portfolio=portfolio
        # returns information about companies
        self.__returns=returns
        # price about stock
        self.__price=price

    def delta_normal(self,alpha=0.05,option='EWMA'):
        ''' Delta Normal method to calculate the VaR of the Stock portfolio
            
            Parmeter:
                option='EWMA' or 'std'
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio
        current_position = holding * current_price # current position of each stock of the portfilio
        PV = current_position.sum() # portfolio value
        delta=current_position/PV # calculate the delta
        stocks = self.__portfolio.Stock.values
        rt=self.__returns[stocks]

        if option=='EWMA':
            # Calculate EWMA covariance
            model=project3.EWMA(rt.values,0.94) # assume lambda = 0.94
            cov_mat=model.cov_mat
        elif option=='std':
            # Calculate standard deviation
            cov_mat=rt.cov()

        # calculate the std of portfolio
        std = np.sqrt(delta @ cov_mat @ delta)
        # get the VaR of the portfolio
        VaR_p = PV*(-stats.norm.ppf(alpha))*std
        return VaR_p

    def normal_MC(self,alpha=0.05,option='EWMA',method='PCA',draw_num=100000,pct=1,plot=False,VaROption='Absolute',p_name=''):
        ''' Use Monte Carlo Methods to simulate the price of each stock of portfolio
            then calculate the value of the portfolio
            
            Parmeter:
                option: 'EWMA' or 'std'
                method: 'PCA' or 'Cholesky'
                VaROption: 'Absolute' VaR or 'Relative' VaR
                draw_num: number of path simulated
                pct: the percentage of variance expained by PCA
                plot: plot the simulated path or not
                alpha: 1-alpha is confidence level
                p_name: portfolio name
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio

        # get the covariance
        if option=='EWMA':
            # Calculate EWMA covariance
            model=project3.EWMA(rt.values,0.94) # assume lambda = 0.94
            cov_mat=model.cov_mat
        elif option=='std':
            # Calculate standard deviation
            cov_mat=rt.cov()
        else:
            raise Exception("Unknown option!")
        
        # MC to get the simulated return
        simulator = project3.Simulator(cov_mat,draw_num)
        if method=='PCA':
            simulated_rt=simulator.PCA_Simulation(pct)
        elif method=='Cholesky':
            simulated_rt=simulator.DirectSimulation()
        else:
            raise Exception("Unknown method!")
        # simulated price
        simulate_price = np.expand_dims(current_price,1).repeat(draw_num,axis=1) * simulated_rt
        # simulated position
        simulate_position=np.expand_dims(holding,1).repeat(draw_num,axis=1) * simulate_price
        # simulated portfolio value
        simulate_PV=pd.DataFrame(simulate_position).sum()
        # sort
        simulate_PV=pd.DataFrame(simulate_PV.sort_values(ascending=True))
    
        if VaROption=="Absolute":
            VaR_p=-np.quantile(simulate_PV,alpha)
        elif VaROption=='Relative':
            VaR_p=simulate_PV.mean()-np.quantile(simulate_PV,alpha)
        else:
            raise Exception("Unknown VaROption!")
        
        
        if plot:
            # add current Portfolio value
            simulate_PV[1]=simulate_PV
            simulate_PV[0]=0
            plot_data=simulate_PV.T
            # plot
            fig, ax = plt.subplots(1,2,figsize=(14,6))
            plot_data.plot(ax=ax[0],legend=False,xlabel='Time',ylabel='Price',title="Mote Carlo Simulation({} path) for portfolio {}".format(draw_num,p_name))
            sns.histplot(data=simulate_PV[1],kde=True,stat="density",ax=ax[1])

            # fill VaR area
            # Get the two lines from the axes to generate shading
            l = ax[1].lines[0]
            # Get the xy data from the lines so that we can shade
            x = l.get_xydata()[:,0]
            y = l.get_xydata()[:,1]
            ax[1].fill_between(x,y,where=x<-VaR_p,color="red", alpha=0.3)
            
            # plot the VaR
            plt.axvline(-VaR_p,color='#FF6347')
            plt.title("Monte Carlo Simulated VaR($) of Portfolio {}".format(p_name))
            plt.legend(['MC simulation kde','VaR'])
        return VaR_p

    def historical_simulation(self,alpha=0.05,draw_num=10000,plot=False,p_name='',VaROption='Absolute'):
        ''' Use historical returns as dataset, draw sample from it to simulate the 
            potential loss (VaR)

            Parameter:
                draw_num: number of path simulated
                p_name: portfolio name
                VaROption: 'Absolute' VaR or 'Relative' VaR
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio

        # sampling from the historical returns
        size=draw_num
        historical_rt=rt.sample(n=size,replace=True)

        # simulated price
        simulate_price = np.expand_dims(current_price,1).repeat(draw_num,axis=1) * (historical_rt.T)
        # simulated position
        simulate_position=np.expand_dims(holding,1).repeat(draw_num,axis=1) * simulate_price
        # simulated portfolio value
        simulate_PV=pd.DataFrame(simulate_position).sum()
        # sort
        simulate_PV=pd.DataFrame(simulate_PV.sort_values(ascending=True))
        
        if VaROption=="Absolute":
            VaR_p=-np.quantile(simulate_PV,alpha)
        elif VaROption=='Relative':
            VaR_p=simulate_PV.mean()-np.quantile(simulate_PV,alpha)
        else:
            raise Exception("Unknown VaROption!")

        if plot:
            # add a column of current Portfolio value
            simulate_PV[1]=simulate_PV
            simulate_PV[0]=0
            plot_data=simulate_PV.T
            # plot
            fig, ax = plt.subplots(1,2,figsize=(14,6))
            plot_data.plot(ax=ax[0],legend=False,xlabel='Time',ylabel='Price',title="Historical Simulation({} path) for portfolio {}".format(draw_num,p_name))
            sns.histplot(data=simulate_PV[1],kde=True,stat="density",ax=ax[1])

            # fill VaR area
            # Get the two lines from the axes to generate shading
            l = ax[1].lines[0]
            # Get the xy data from the lines so that we can shade
            x = l.get_xydata()[:,0]
            y = l.get_xydata()[:,1]
            ax[1].fill_between(x,y,where=x<-VaR_p,color="red", alpha=0.3)

            # plot the VaR
            plt.axvline( x=-VaR_p ,color='#FF6347')
            plt.title("Historical Simulated VaR($) of Portfolio {}".format(p_name))
            plt.legend(['Historical simulation kde','VaR'])

        return VaR_p

In [None]:
# parse the DailyPrices data into dataframe
price=pd.read_csv('DailyPrices.csv',index_col='Date')
# calculate the returns
rt=return_calculate(price)
# information about portfolio
portfolio=pd.read_csv('portfolio.csv',index_col='Portfolio')

# Delta Normal + EWMA
A=VaR_portfolio(portfolio.loc['A'],rt,price).delta_normal(0.05)
print("VaR of portfolio A is {}".format(A))
B=VaR_portfolio(portfolio.loc['B'],rt,price).delta_normal(0.05)
print("VaR of portfolio B is {}".format(B))
C=VaR_portfolio(portfolio.loc['C'],rt,price).delta_normal(0.05)
print("VaR of portfolio C is {}".format(C))
All=VaR_portfolio(portfolio,rt,price).delta_normal(0.05)
print("VaR of the whole portfolio is {}".format(All))

# Normal Monte Carlo Simulation 
A=VaR_portfolio(portfolio.loc['A'],rt,price).normal_MC(plot=True,p_name='A')
print("VaR of portfolio A is {}".format(A))
B=VaR_portfolio(portfolio.loc['B'],rt,price).normal_MC(plot=True,p_name='B')
print("VaR of portfolio B is {}".format(B))
C=VaR_portfolio(portfolio.loc['C'],rt,price).normal_MC(plot=True,p_name='C')
print("VaR of portfolio C is {}".format(C))
All=VaR_portfolio(portfolio,rt,price).normal_MC(plot=True,p_name='All')
print("VaR of the whole portfolio is {}".format(All))

# Historical Simulation 
A=VaR_portfolio(portfolio.loc['A'],rt,price).historical_simulation(plot=True,p_name='A')
print("VaR of portfolio A is {}".format(A))
B=VaR_portfolio(portfolio.loc['B'],rt,price).historical_simulation(plot=True,p_name='B')
print("VaR of portfolio B is {}".format(B))
C=VaR_portfolio(portfolio.loc['C'],rt,price).historical_simulation(plot=True,p_name='C')
print("VaR of portfolio C is {}".format(C))
All=VaR_portfolio(portfolio,rt,price).historical_simulation(plot=True,p_name='All')
print("VaR of the whole portfolio is {}".format(All))

# Week 5

## A Generalized Fitted Model (Contain Normal, T or other distribution you want)

In [None]:
class FittedModel:
    '''The prototype of fitted distribution.'''
    def __init__(self):
        self.dist=self.set_dist() # the distribution
        self.frz_dist=None # the distribution which has specific parameters

    def set_dist(self):
        '''Need to be implemented in subclass to set the dist.'''
        raise NotImplementedError
    
    def freeze_dist(self,parameters):
        '''Need to be implemented in subclass to set the parameters of different distribution.'''
        raise NotImplementedError

    def fit(self,data,x0,cons):
        '''
        the data is np.array
        Use MLE to fit the distribution
        x0 is initial paremeters which needed to be implemented in subclass
        cons is constraints of parameters which needed to be implemented in subclass
        '''
        def nll(parameters,x):
            '''Negative likelihood function'''
            self.freeze_dist(parameters)
            ll=self.frz_dist.logpdf(x=x).sum()
            return -ll
        MLE = minimize(nll, x0=x0, args=data, constraints = cons) # MLE 
        self.freeze_dist(MLE.x)
        self.fitted_parameters=MLE.x

    @property
    def fitted_dist(self):
        '''Return Fitted Distribution'''
        return self.frz_dist

class Norm(FittedModel):
    def set_dist(self):
        '''set the distribution to be normal'''
        return stats.norm
        
    def freeze_dist(self,parameters):
        '''set the parameters of norm: parameters[0]--mu, parameters[1]--std'''
        self.frz_dist=self.dist(loc=parameters[0],scale=parameters[1])

    def fit(self,data):
        '''set the initial parameters and cons to call the father's fit'''
        x0 = (data.mean(),data.std())  # initial paremeters
        cons = [ {'type':'ineq', 'fun':lambda x:x[1]} ] # standard deviation is non-negative
        super().fit(data,x0,cons)

class T(FittedModel):
    def set_dist(self):
        '''set the distribution to be normal'''
        return stats.t
        
    def freeze_dist(self,parameters):
        '''set the parameters of norm: parameters[0]--degree of freedom, parameters[1]--mu, parameters[2]--std'''
        self.frz_dist=self.dist(df=parameters[0],loc=parameters[1],scale=parameters[2])
        
    def fit(self,data):
        '''set the initial parameters and cons to call the father's fit'''
        # degree of freedom of t should be greater than 2; standard deviation is non-negative
        cons=[ {'type':'ineq', 'fun':lambda x:x[0]-2} , {'type':'ineq', 'fun':lambda x:x[2]} ] 
        x0 = np.array([2,0,1]) # initial parameter
        super().fit(data,x0,cons)

In [None]:
data=pd.read_csv('problem1.csv') # read the data
data=np.array(data.values).reshape(data.size) # transform to array

# Fit the normal distribution
norm=Norm()
norm.fit(data)
fitted_norm=norm.fitted_dist
# Fit the T distribution
t=T()
t.fit(data)
fitted_t=t.fitted_dist

## Risk Metrics (1. Var & ES using distribution 2. Var & ES using historical data)

In [None]:
class RiskMetrics:
    @staticmethod
    def VaR_dist(dist,alpha=0.05):
        '''Given a distribution and alpha, calculate the corresponding VaR'''
        return -dist.ppf(alpha)

    @staticmethod
    def ES_dist(dist,alpha=0.05):
        '''Given a distribution and alpha, calculate the corresponding Expected Shortfall'''
        lb=-np.inf   
        ub=dist.ppf(alpha)
        return -dist.expect(lb=lb,ub=ub)/alpha # integral
    
    @staticmethod
    def VaR_historical(data,alpha=0.05):
        '''Given a dataset(array), calculate the its historical VaR'''
        data.sort()
        n=round(data.size*alpha)
        return -data[n-1]
    
    @staticmethod
    def ES_historical(data,alpha=0.05):
        '''Given a dataset(array), calculate the its historical Expected Shortfall'''
        data.sort()
        n=round(data.size*alpha)
        return -data[:n].mean()

In [None]:
# Calculate the VaR & ES using distribution
VaR_normal=RiskMetrics.VaR_dist(fitted_norm)
ES_normal=RiskMetrics.ES_dist(fitted_norm)
VaR_t=RiskMetrics.VaR_dist(fitted_t)
ES_t=RiskMetrics.ES_dist(fitted_t)

## Model Fitter (Fit the dataframe with Model, return a 1-D dataframe of fitted distributions)

In [None]:
class ModelFitter:
    ''' Fit the data with Model, return a 1D Dataframe of fitted distributions

        Parameters:
            FittedModel(Class) ---- a subclass of FittedModel class

        Usage:
            dists=ModelFitter(FittedModel).fit(data)
    '''

    def __init__(self,FittedModel):
        ''' Initialize the model within the class to fit all the data.'''
        self.model=FittedModel()
    
    def fit(self,data):
        '''Fit all the data with the model inside the Fitter
            Data(Dataframe) -- return of stock
        '''
        dists=[]
        for name in data.columns:
            rt=np.array(data[name].values)
            self.model.fit(rt)
            dists.append(self.model.fitted_dist)
        dists=pd.DataFrame(dists).T
        dists.columns=data.columns
        dists.index=["distribution"]
        return dists

In [None]:
stocks = portfolio.Stock.values # stocks of portfilio
rt=returns[stocks] # return of each stock of the portfilio
portfolio=portfolio.set_index('Stock') # set the portfolio index to be Stock
holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
current_price=price[stocks].iloc[-1,:] # get current price of each stock of the portfilio

# Fit the data with Model, return a 1D Dataframe of fitted distributions
dists=ModelFitter(dist).fit(rt)

## Gaussian Copula (Simulate whatever distributions)

In [None]:
class GaussianCopula:
    ''' Construct the Gaussian Copula to simulate
    
        Parameters:
            dists(DataFrame) --- a group of distributions (1D Dataframe of fitted distributions)
            data(DataFrame) --- the data that fits the distributions will be used to generate the simulated sample

        Usage:
            copula=GaussianCopula(dists,data)
            sample=copula.simulate()
    '''
    def __init__(self,dists,data):
        self.models=dists
        self.data=data
    
    def simulate(self,NSim=5000):
        transform_data=pd.DataFrame()
        for name in self.data.columns:
            rt=np.array(self.data[name])
            # Use the CDF to transform the data to uniform universe
            # Use the standard normal quantile function to transform the uniform to normal 
            transform_data[name]=stats.norm.ppf(self.models[name][0].cdf(rt))
        # Spearman correlation
        corr_spearman = stats.spearmanr(transform_data,axis=0)[0]
        # Use PCA simulation
        simulator = Simulator(corr_spearman,NSim)
        # Simulate Normal & Transform to uniform
        SimU=stats.norm.cdf(simulator.PCA_Simulation(1),loc=0,scale=1)
        # Transform to Model Distribution
        simulatedResults = pd.DataFrame()
        for idx,name in enumerate(self.data.columns):
            simulatedResults[name] = self.models[name][0].ppf(SimU[idx,:])
        return simulatedResults.T

In [None]:
# Fit the data with Model to get a group of distributions
dists=ModelFitter(dist).fit(rt)
# Construct Copula
copula=GaussianCopula(dists,rt)
# Simulate
simulated_rt=copula.simulate(NSim=draw_num)

## VaR for Portfolio (升级版)

In [None]:
class VaR_portfolio:
    '''
    Differnet Method to get the VaR of stock portfolio
    
        1. delta_normal
        2. normal_MC
        3. historical_simulation

    The fromat of portfolio, returns, price should be same as the file in the current directory.

    Usage:
        All=VaR_portfolio(portfolio,rt,price).delta_normal(0.05)
        All=VaR_portfolio(portfolio,rt,price).normal_MC(plot=True,p_name='All')
        All=VaR_portfolio(portfolio,rt,price).historical_simulation(plot=True,p_name='All')
    '''
    # initialization
    def __init__(self,portfolio,returns,price):
        '''The fromat of data should be same as the file in the current directory.'''
        # information about portfolio
        self.__portfolio=portfolio
        # returns information about companies
        self.__returns=returns
        # price about stock
        self.__price=price

    def delta_normal(self,alpha=0.05,option='EWMA'):
        ''' Delta Normal method to calculate the VaR of the Stock portfolio
            
            Parmeter:
                option='EWMA' or 'std'
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio
        current_position = holding * current_price # current position of each stock of the portfilio
        PV = current_position.sum() # portfolio value
        delta=current_position/PV # calculate the delta
        stocks = self.__portfolio.Stock.values
        rt=self.__returns[stocks]

        if option=='EWMA':
            # Calculate EWMA covariance
            model=EWMA(rt.values,0.94) # assume lambda = 0.94
            cov_mat=model.cov_mat
        elif option=='std':
            # Calculate standard deviation
            cov_mat=rt.cov()

        # calculate the std of portfolio
        std = np.sqrt(delta @ cov_mat @ delta)
        # get the VaR of the portfolio
        VaR_p = PV*(-stats.norm.ppf(alpha))*std
        return VaR_p

    def normal_MC(self,alpha=0.05,option='EWMA',method='PCA',draw_num=100000,pct=1,plot=False,VaROption='Absolute',p_name=''):
        ''' Use Monte Carlo Methods to simulate the price of each stock of portfolio
            then calculate the value of the portfolio
            
            Parmeter:
                option: 'EWMA' or 'std'
                method: 'PCA' or 'Cholesky'
                VaROption: 'Absolute' VaR or 'Relative' VaR
                draw_num: number of path simulated
                pct: the percentage of variance expained by PCA
                plot: plot the simulated path or not
                alpha: 1-alpha is confidence level
                p_name: portfolio name
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio

        # get the covariance
        if option=='EWMA':
            # Calculate EWMA covariance
            model=EWMA(rt.values,0.94) # assume lambda = 0.94
            cov_mat=model.cov_mat
        elif option=='std':
            # Calculate standard deviation
            cov_mat=rt.cov()
        else:
            raise Exception("Unknown option!")
        
        # MC to get the simulated return
        simulator = Simulator(cov_mat,draw_num)
        if method=='PCA':
            simulated_rt=simulator.PCA_Simulation(pct)
        elif method=='Cholesky':
            simulated_rt=simulator.DirectSimulation()
        else:
            raise Exception("Unknown method!")
        # simulated price
        simulate_price = np.expand_dims(current_price,1).repeat(draw_num,axis=1) * simulated_rt
        # simulated position
        simulate_position=np.expand_dims(holding,1).repeat(draw_num,axis=1) * simulate_price
        # simulated portfolio value
        simulate_PV=pd.DataFrame(simulate_position).sum()
        # sort
        simulate_PV=pd.DataFrame(simulate_PV.sort_values(ascending=True))
        self.simulate_PV=simulate_PV

        if VaROption=="Absolute":
            VaR_p=-np.quantile(simulate_PV,alpha)
        elif VaROption=='Relative':
            VaR_p=simulate_PV.mean()-np.quantile(simulate_PV,alpha)
        else:
            raise Exception("Unknown VaROption!")
        
        
        if plot:
            # add current Portfolio value
            simulate_PV[1]=simulate_PV
            simulate_PV[0]=0
            plot_data=simulate_PV.T
            # plot
            fig, ax = plt.subplots(1,2,figsize=(14,6))
            plot_data.plot(ax=ax[0],legend=False,xlabel='Time',ylabel='Price',title="Mote Carlo Simulation({} path) for portfolio {}".format(draw_num,p_name))
            sns.histplot(data=simulate_PV[1],kde=True,stat="density",ax=ax[1])

            # fill VaR area
            # Get the two lines from the axes to generate shading
            l = ax[1].lines[0]
            # Get the xy data from the lines so that we can shade
            x = l.get_xydata()[:,0]
            y = l.get_xydata()[:,1]
            ax[1].fill_between(x,y,where=x<-VaR_p,color="red", alpha=0.3)
            
            # plot the VaR
            plt.axvline(-VaR_p,color='#FF6347')
            plt.title("Monte Carlo Simulated VaR($) of Portfolio {}".format(p_name))
            plt.legend(['MC simulation kde','VaR'])
        return VaR_p

    def historical_simulation(self,alpha=0.05,draw_num=10000,plot=False,p_name='',VaROption='Absolute'):
        ''' Use historical returns as dataset, draw sample from it to simulate the 
            potential loss (VaR)

            Parameter:
                draw_num: number of path simulated
                p_name: portfolio name
                VaROption: 'Absolute' VaR or 'Relative' VaR
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio

        # sampling from the historical returns
        size=draw_num
        historical_rt=rt.sample(n=size,replace=True)

        # simulated price
        simulate_price = np.expand_dims(current_price,1).repeat(draw_num,axis=1) * (historical_rt.T)
        # simulated position
        simulate_position=np.expand_dims(holding,1).repeat(draw_num,axis=1) * simulate_price
        # simulated portfolio value
        simulate_PV=pd.DataFrame(simulate_position).sum()
        # sort
        simulate_PV=pd.DataFrame(simulate_PV.sort_values(ascending=True))
        self.simulate_PV=simulate_PV

        if VaROption=="Absolute":
            VaR_p=-np.quantile(simulate_PV,alpha)
        elif VaROption=='Relative':
            VaR_p=simulate_PV.mean()-np.quantile(simulate_PV,alpha)
        else:
            raise Exception("Unknown VaROption!")

        if plot:
            # add a column of current Portfolio value
            simulate_PV[1]=simulate_PV
            simulate_PV[0]=0
            plot_data=simulate_PV.T
            # plot
            fig, ax = plt.subplots(1,2,figsize=(14,6))
            plot_data.plot(ax=ax[0],legend=False,xlabel='Time',ylabel='Price',title="Historical Simulation({} path) for portfolio {}".format(draw_num,p_name))
            sns.histplot(data=simulate_PV[1],kde=True,stat="density",ax=ax[1])

            # fill VaR area
            # Get the two lines from the axes to generate shading
            l = ax[1].lines[0]
            # Get the xy data from the lines so that we can shade
            x = l.get_xydata()[:,0]
            y = l.get_xydata()[:,1]
            ax[1].fill_between(x,y,where=x<-VaR_p,color="red", alpha=0.3)

            # plot the VaR
            plt.axvline( x=-VaR_p ,color='#FF6347')
            plt.title("Historical Simulated VaR($) of Portfolio {}".format(p_name))
            plt.legend(['Historical simulation kde','VaR'])

        return VaR_p

    def Copula_MC(self,dist,alpha=0.05,draw_num=10000,pct=1,plot=False,VaROption='Absolute',p_name=''):
        ''' Use Copula Monte Carlo Methods(PCA) to simulate the price of each stock of portfolio
            then calculate the value of the portfolio
            
            Parmeter:
                VaROption: 'Absolute' VaR or 'Relative' VaR
                draw_num: number of path simulated
                plot: plot the simulated path or not
                alpha: 1-alpha is confidence level
                p_name: portfolio name
        '''
        stocks = self.__portfolio.Stock.values # stocks of portfilio
        rt=self.__returns[stocks] # return of each stock of the portfilio
        portfolio=self.__portfolio.set_index('Stock') # set the portfolio index to be Stock
        holding=portfolio.loc[stocks].Holding # get holding of each stock of the portfilio
        current_price=self.__price[stocks].iloc[-1,:] # get current price of each stock of the portfilio

        # Fit the data with Model to get a group of distributions
        dists=ModelFitter(dist).fit(rt)
        # Construct Copula
        copula=GaussianCopula(dists,rt)
        # Simulate
        simulated_rt=copula.simulate(NSim=draw_num)
        # simulated price
        simulate_price = np.expand_dims(current_price,1).repeat(draw_num,axis=1) * simulated_rt
        # simulated position
        simulate_position=np.expand_dims(holding,1).repeat(draw_num,axis=1) * simulate_price
        # simulated portfolio value
        simulate_PV=pd.DataFrame(simulate_position).sum()
        # sort
        simulate_PV=pd.DataFrame(simulate_PV.sort_values(ascending=True))
        self.simulate_PV=simulate_PV

        if VaROption=="Absolute":
            VaR_p=-np.quantile(simulate_PV,alpha)
        elif VaROption=='Relative':
            VaR_p=simulate_PV.mean()-np.quantile(simulate_PV,alpha)
        else:
            raise Exception("Unknown VaROption!")
        
        
        if plot:
            # add current Portfolio value
            simulate_PV[1]=simulate_PV
            simulate_PV[0]=0
            plot_data=simulate_PV.T
            # plot
            fig, ax = plt.subplots(1,2,figsize=(14,6))
            plot_data.plot(ax=ax[0],legend=False,xlabel='Time',ylabel='Price',title="Copula Mote Carlo Simulation({} path) for portfolio {}".format(draw_num,p_name))
            sns.histplot(data=simulate_PV[1],kde=True,stat="density",ax=ax[1])

            # fill VaR area
            # Get the two lines from the axes to generate shading
            l = ax[1].lines[0]
            # Get the xy data from the lines so that we can shade
            x = l.get_xydata()[:,0]
            y = l.get_xydata()[:,1]
            ax[1].fill_between(x,y,where=x<-VaR_p,color="red", alpha=0.3)
            
            # plot the VaR
            plt.axvline(-VaR_p,color='#FF6347',linestyle='--')
            plt.title("Copula Monte Carlo Simulated VaR($) of Portfolio {}".format(p_name))
            plt.legend(['Copula MC simulation kde','VaR'])
        
        return VaR_p

In [None]:
AllPortfolio=VaR_portfolio(portfolio,rt,price)
VaR_All=AllPortfolio.Copula_MC(T_mean0,plot=True,p_name='All')
ES_All=RiskMetrics.ES_historical(np.array(AllPortfolio.simulate_PV[1]))

# Week 6

## Option valuation via integral -- Generalized Black Scholes Merton for European Option

In [None]:
def integral_gbsm(s,strike,t,vol,rf,c,tradingDayYear,call=True):
    ''' Option valuation via integral -- Generalized Black Scholes Merton
        European Style.  Assumed LogNormal Prices
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        call - Call valuation if set True
    '''

    ttm = t/tradingDayYear # time to maturity
    dailyVol = vol/np.sqrt(tradingDayYear) # daily volatility
    sigma = dailyVol*np.sqrt(t)
    mu = np.log(s)+ttm*c-sigma**2/2 

    dist = lognorm(sigma,0,np.exp(mu))
    f = lambda x: 2*dist.pdf(x)

    if call:
        f = lambda x: max(x-strike,0) * dist.pdf(x)
        value = np.exp(-ttm*rf)*integrate.quad(f,0,strike*100)[0]
    else:
        f = lambda x: max(strike-x,0)*dist.pdf(x)
        value = np.exp(-ttm*rf)*integrate.quad(f,0,strike*100)[0]
    return value

    '''
    # lognormal of numpy is a liitle bit strange. 

        # In mathematical notation if X is N(mu,sigma) then Y=exp(X) is LogN(mu, sigma). To get X in scipy I would use norm(mu,sigma) but to get Y I would use lognorm(sigma, 0, exp(mu))

        # X = exp(mu + sigma * Z), Z is standard normal
        # which is the same as: X = exp(mu) * exp(Z)**sigma      
        # This can be sneakily re-written as follows: X = exp(mu) * exp(Z-Z0)**sigma where Z0 = 0
        # This equation is of the form: f(x) = a * ( (x-x0) ** b ) = scale * ( (x-location) ** shape )
    '''
    '''
    Problem: 
        we could not directly use np.inf as upper bound because the quad would generate wrong result.
        
        Solutions and explanations:
        For quad, be aware that pulse shapes and other sharp features as compared to the size of the integration interval may not be integrated correctly using this method. 
        For example, the function is using only two (last=2) intervals. You can add points to the interval to force the routine to use points closer to the left limit. Note that points is a sequence of break points in the bounded integration interval where local difficulties of the integrand may occur (e.g., singularities, discontinuities). In this case, I'm not using it to point out discontinuities, but rather to force quad to perform more integration steps near the left boundary.
        quad use heuristic algorithms, using adaptative step of integration to reduce time computing. Where the function is flat, it goes faster. so on big global interval, it can miss the peak.
        This happens because the adaptive quadrature routine implemented in quad, while working as designed, does not notice the small, important part of the function within such a large, finite interval. For best results, consider using integration limits that tightly surround the important part of the integrand.
        Integrands with several important regions can be broken into pieces as necessary.
    '''

## Generalize Black Scholes Merton Closed Formula for European Option

In [None]:
def gbsm(s,strike,t,vol,rf,c,tradingDayYear,call=True):
    ''' Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        Option valuation via BSM closed formula
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        call - Call valuation if set True
    '''
    ttm=t/tradingDayYear
    d1=(np.log(s/strike)+(c+vol**2/2)*ttm)/vol/np.sqrt(ttm)
    d2=d1-vol*np.sqrt(ttm)
    if call:
        return s*np.exp((c-rf)*ttm)*norm.cdf(d1)-strike*np.exp(-rf*ttm)*norm.cdf(d2)
    else:
        return strike*np.exp(-rf*ttm)*norm.cdf(-d2)-s*np.exp((c-rf)*ttm)*norm.cdf(-d1)

## European Option valuation via Binomial Trees (No dividends)

In [None]:
def binomial_tree_gbsm(s,strike,t,vol,rf,c,tradingDayYear,N,call=True):
    '''
    Generalize Black Scholes Merton
    rf = c       -- Black Scholes 1973
    c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
    c = 0        -- Black 1976 futures option model
    c,r = 0      -- Asay 1982 margined futures option model
    c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

    Option valuation via Binomial Trees
    European Style.  Assumed LogNormal Prices
    s - Underlying Price
    strike - Strike Price
    t - days until maturity
    rf - Risk free rate
    vol - Yearly Volatility
    c - Cost of Carry
    tradingDayYear - trading days in a year
    N - Steps of Binomial Tree
    call - Call valuation if set True
    '''
    ttm=t/tradingDayYear
    delta_t=ttm/N
    # price multiplier in the (positive, negtive) case
    u=np.exp(vol*np.sqrt(delta_t)) 
    d=np.exp(-vol*np.sqrt(delta_t))
    # probability of up/down move in price
    pu=(np.exp(c*delta_t)-d)/(u-d)
    pd=1-pu
    S=[] # stock price
    P=[] # probability of each case
    nPaths=[] # number of paths of each case
    for i in range(N+1):
        S.append(s*(u**i)*(d**(N-i)))
        P.append((pu**i)*(pd**(N-i)))
        nPaths.append(math.factorial(N)/math.factorial(i)/math.factorial(N-i))
    # turn list into array
    S=np.array(S)
    P=np.array(P)
    nPaths=np.array(nPaths)
    # function to convert stock price to value of option
    f=lambda x,y: x-y if x-y>0 else 0
    vfunc = np.vectorize(f,otypes=[float])
    if call:
        Pi=vfunc(S,strike)
    else:
        Pi=vfunc(strike,S)
    return np.exp(-rf*ttm)*(Pi*P)@nPaths

## European Option valuation via Monte Simulation

In [None]:
def sim_gbsm(s,strike,t,vol,rf,c,tradingDayYear,N,call=True):
    ''' Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        Option valuation via Monte Simulation
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        N - Steps of Binomial Tree
        call - Call valuation if set True
    '''
    ttm = t/tradingDayYear # time to maturity
    dailyVol = vol/np.sqrt(tradingDayYear) # daily volatility
    dist=norm(c/tradingDayYear-dailyVol**2/2,dailyVol)
    Price=[]
    for i in range(N):
      rts=dist.rvs(t)
      total_rt=np.exp(sum(rts))
      Price.append(s*total_rt)
    f=lambda x,y: x-y if x-y>0 else 0
    vfunc = np.vectorize(f,otypes=[float])
    if call:
      Pi=vfunc(Price,strike)
    else:
      Pi=vfunc(strike,Price)
    return np.exp(-rf*ttm)*Pi.mean()

In [None]:
s=165
CurrentDate=datetime.datetime.strptime('03/03/2023','%m/%d/%Y')
ExpirationDate=datetime.datetime.strptime('03/17/2023','%m/%d/%Y')
dt=ExpirationDate-CurrentDate
t=dt.days
tradingDayYear=365
rf=0.0425
q=0.0053
c=rf-q
ttm=t/tradingDayYear
N=100 # number of points in range of volatility
strike_call=strike_put=160

gbsm(s,strike_call,t,vol,rf,c,tradingDayYear,call=True)

## Implied Volatility for Options

In [None]:
def findImpliedVol(data):
    ''' Find implied volatility
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        N - Steps of Binomial Tree
        call - Call valuation if set True

        Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency
    '''
    s=151.03
    strike=data['Strike']
    CurrentDate=datetime.datetime.strptime('03/03/2023','%m/%d/%Y')
    ExpirationDate=data['Expiration']
    ExpirationDate=datetime.datetime.strptime(ExpirationDate,'%m/%d/%Y')
    dt=ExpirationDate-CurrentDate
    t=dt.days
    tradingDayYear=365
    P=data['Last Price']
    call=True if data['Type']=='Call' else False
    rf=0.0425
    q=0.0053
    c=rf-q
    ttm=t/tradingDayYear
    def f(vol):
        return gbsm(s,strike,t,vol,rf,c,tradingDayYear,call)-P  
    iVol = optimize.root_scalar(f, bracket=[1e-6, 10], method='brentq')
    return iVol.root

In [2]:
df=pd.read_csv('AAPL_Options.csv')
currentPrice=151.03
iVol=df.apply(findImpliedVol,axis=1)
df['Implied Volatility']=iVol


df.sort_values('Strike',inplace=True)
ax=df.query("Type == 'Call'").plot(x='Strike',y='Implied Volatility',kind='line',xlabel='Strike',ylabel='Implied Volatility',label='Call')
df.query("Type == 'Put'").plot(x='Strike',y='Implied Volatility',kind='line',xlabel='Strike',ylabel='Implied Volatility',ax=ax,label='Put')

NameError: name 'pd' is not defined

## Implied Volatility ( If the type is not option, return np.nan)

In [None]:
def findImpliedVol_plus(data):
    '''Find implied volatility
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        call - Call valuation if set True

        Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency
    '''
    if data['Type']!='Option':
        return np.nan
    s=151.03
    strike=data['Strike']
    CurrentDate=datetime.datetime.strptime('03/03/2023','%m/%d/%Y')
    ExpirationDate=data['ExpirationDate']
    ExpirationDate=datetime.datetime.strptime(ExpirationDate,'%m/%d/%Y')
    dt=ExpirationDate-CurrentDate
    t=dt.days
    tradingDayYear=365
    P=data['CurrentPrice']
    call=True if data['OptionType']=='Call' else False
    rf=0.0425
    q=0.0053
    c=rf-q
    ttm=t/tradingDayYear
    def f(vol):
        return gbsm(s,strike,t,vol,rf,c,tradingDayYear,call)-P  
    iVol = optimize.root_scalar(f, bracket=[1e-6, 10], method='brentq')
    return iVol.root

In [None]:
df=pd.read_csv('problem3.csv')
iVol=df.apply(findImpliedVol_plus,axis=1)
df['Implied Volatility']=iVol

## Generalize Black Scholes Merton Closed Formula for Portfolio Dataframe

In [None]:
def gbsm_plus(data,prices,days=0):
    ''' Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        Option valuation via BSM closed formula
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        call - Call valuation if set True
    '''
    if data['Type']!='Option':
        return prices*data['Holding']

    strike=data['Strike']
    CurrentDate=datetime.datetime.strptime('03/03/2023','%m/%d/%Y')+datetime.timedelta(days=days)
    ExpirationDate=data['ExpirationDate']
    ExpirationDate=datetime.datetime.strptime(ExpirationDate,'%m/%d/%Y')
    dt=ExpirationDate-CurrentDate
    t=dt.days
    tradingDayYear=365
    P=data['CurrentPrice']
    call=True if data['OptionType']=='Call' else False
    rf=0.0425
    q=0.0053
    c=rf-q
    ttm=t/tradingDayYear
    vol = data['Implied Volatility']

    positionValue=[]
    for s in prices:
        d1=(np.log(s/strike)+(c+vol**2/2)*ttm)/vol/np.sqrt(ttm)
        d2=d1-vol*np.sqrt(ttm)
        if call:
            positionValue.append(s*np.exp((c-rf)*ttm)*norm.cdf(d1)-strike*np.exp(-rf*ttm)*norm.cdf(d2))
        else:
            positionValue.append(strike*np.exp(-rf*ttm)*norm.cdf(-d2)-s*np.exp((c-rf)*ttm)*norm.cdf(-d1))
    # list can not times -1, convert to array
    positionValue = np.array(positionValue)
    return positionValue * data['Holding']

In [None]:
for strategy in strategies:
    data=df.query("Portfolio == @strategy")
    positionValue=data.apply(gbsm_plus,prices=prices,axis=1)
    payoff=data.apply(calPayoff,prices=prices,axis=1)

## Payoff Calculator

In [None]:
def calPayoff(data,prices):
    '''
    Calculate the payoff at expiration date according to different price
    '''
    if data['Type']!='Option':
        return prices*data['Holding']
    call=True if data['OptionType']=='Call' else False
    strike=data['Strike']
    payoff=[]
    for s in prices:
        if call:
            payoff.append(max(s-strike,0))
        else:
            payoff.append(max(strike-s,0))
    # list can not times -1, convert to array
    payoff = np.array(payoff)
    return payoff * data['Holding']

In [None]:
for strategy in strategies:
    data=df.query("Portfolio == @strategy")
    positionValue=data.apply(gbsm_plus,prices=prices,axis=1)
    payoff=data.apply(calPayoff,prices=prices,axis=1)

## Fit the AR1 Model and simulate the return 10 days ahead 

In [None]:
# Fit the AR1 Model and simulate the return 10 days ahead 
priceData=pd.read_csv('DailyPrices.csv',index_col='Date')
logReturn=return_calculate(priceData.AAPL,option="CONTINUOUS",rm_means=True)
# fit AR1 model
mod = sm.tsa.arima.ARIMA(logReturn.values, order=(1, 0, 0))
start_params = [0.5,0.1]
res = mod.fit_constrained({'const':0},start_params=start_params)
# current Price
currentPrice=151.03
# simulate price ten days later
simulatedPrice=[]
N=100000 # number of simulations

simulatedReturns=[]
for _ in range(N):
    simulatedReturn=res.simulate(nsimulations=10)
    simulatedReturns.append(simulatedReturn)
    tenDaysReturn=sum(simulatedReturn)
    simulatedPrice.append(currentPrice*np.exp(tenDaysReturn))
simulatedPrice=np.array(simulatedPrice)
simulatedReturns=np.array(simulatedReturns).flatten()

# apply those returns to the current AAPL price (above). Calculate Mean, VaR and ES
strategies=df.Portfolio.unique()
columns=['Mean(Portfolio Value)','Mean(Change)','VaR','ES']
ans=pd.DataFrame(index=strategies,columns=columns)
for idx,strategy in enumerate(strategies):
    data=df.query("Portfolio == @strategy")
    # Simulated Position Value
    positionValue=data.apply(gbsm_plus,prices=simulatedPrice,days=10,axis=1)
    # Initial Position Value
    iniValue=data.apply(gbsm_plus,prices=np.array([currentPrice]),axis=1)
    # Simulated Portfolio Value
    totalPortfolioValue=positionValue.sum()
    # Initial Portfolio Value
    iniValue=iniValue.sum()
    # Changed Portfolio Value
    valueChange=totalPortfolioValue-iniValue
    valueChange = np.array(valueChange)
    # calculate VaR and Expected Shortfall
    VaR_p = RiskMetrics.VaR_historical(valueChange,alpha=0.05)
    ES = RiskMetrics.ES_historical(valueChange,alpha=0.05)

    # insert Mean, VaR, ES to dateframe
    mean_var_es=[totalPortfolioValue.mean(),valueChange.mean(),VaR_p,ES]
    ans.loc[strategy]=mean_var_es

# Week 7

## Greeks using closed formula

In [None]:
def greeks_closed_form(s,strike,ttm,vol,rf,c,call=True):
    '''Closed from for greeks calculation from Generalize Black Scholes Merton
        Generalize Black Scholes Merton:
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        Option valuation via BSM closed formula
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        ttm - time to maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        call - Call valuation if set True
    '''
    d1=(np.log(s/strike)+(c+vol**2/2)*ttm)/vol/np.sqrt(ttm)
    d2=d1-vol*np.sqrt(ttm)
    optionType=['Call'] if call else ['Put']
    ans=pd.DataFrame(index=optionType,columns=['Detla','Gamma','Vega','Theta','Rho','Carry Rho'])
    if call:
        ans['Detla'] = np.exp((c-rf)*ttm)*norm.cdf(d1,loc=0,scale=1)
        ans['Theta'] = -s*np.exp((c-rf)*ttm)*norm.pdf(d1,loc=0,scale=1)*vol/(2*np.sqrt(ttm))-(c-rf)*s*np.exp((c-rf)*ttm)*norm.cdf(d1,loc=0,scale=1)-rf*strike*np.exp(-rf*ttm)*norm.cdf(d2,loc=0,scale=1)
        # ans['Rho'] = ttm*strike*np.exp(-rf*ttm)*norm.cdf(d2,loc=0,scale=1) - s*ttm*np.exp((c-rf)*ttm)*norm.cdf(d1,loc=0,scale=1)
        ans['Rho'] = ttm*strike*np.exp(-rf*ttm)*norm.cdf(d2,loc=0,scale=1)

        ans['Carry Rho'] = ttm*s*np.exp((c-rf)*ttm)*norm.cdf(d1,loc=0,scale=1)
    else:
        ans['Detla'] = np.exp((c-rf)*ttm)*(norm.cdf(d1,loc=0,scale=1)-1)
        ans['Theta'] = -s*np.exp((c-rf)*ttm)*norm.pdf(d1,loc=0,scale=1)*vol/(2*np.sqrt(ttm))+(c-rf)*s*np.exp((c-rf)*ttm)*norm.cdf(-d1,loc=0,scale=1)+rf*strike*np.exp(-rf*ttm)*norm.cdf(-d2,loc=0,scale=1)
        ans['Rho'] = -ttm*strike*np.exp(-rf*ttm)*norm.cdf(-d2,loc=0,scale=1)
        # ans['Rho'] = -ttm*strike*np.exp(-rf*ttm)*norm.cdf(-d2,loc=0,scale=1)+ttm*s*norm.cdf(-d1,loc=0,scale=1)*exp((b-rf)*ttm)
        ans['Carry Rho'] = -ttm*s*np.exp((c-rf)*ttm)*norm.cdf(-d1,loc=0,scale=1)
    ans['Gamma'] = norm.pdf(d1,loc=0,scale=1)*np.exp((c-rf)*ttm)/(s*vol*np.sqrt(ttm))
    ans['Vega'] = s*np.exp((c-rf)*ttm)*norm.pdf(d1,loc=0,scale=1)*np.sqrt(ttm)

    return ans

## Greeks for European option via Finite Difference (Generalize Black Scholes Merton)

In [None]:
def greeks_gbsm_finite_diff(s,strike,ttm,vol,rf,c,call=True):
    '''Greeks for European option via Generalize Black Scholes Merton and finite difference'''
    optionType=['Call'] if call else ['Put']
    s_chg = 0.01 # amount of price change 
    vol_chg = 0.01 # amount of volatility change
    t_chg = 0.01 # amount of ttm change
    rf_chg = 0.01 # amount of rf change
    c_chg = 0.01 # amount of carry of cost change

    ans=pd.DataFrame(index=optionType,columns=['Detla','Gamma','Vega','Theta','Rho','Carry Rho'])

    price_price_m_s_chg=gbsm(s-s_chg,strike,ttm,vol,rf,c,call) # price-s_chg
    price_price_p_s_chg=gbsm(s+s_chg,strike,ttm,vol,rf,c,call) # price+s_chg
    price=gbsm(s,strike,ttm,vol,rf,c,call)

    ans['Detla']=(price_price_p_s_chg-price_price_m_s_chg)/(2*s_chg)
    
    ans['Gamma']=(price_price_p_s_chg+price_price_m_s_chg-2*price)/s_chg**2

    ans['Vega']=(gbsm(s,strike,ttm,vol+vol_chg,rf,c,call)-gbsm(s,strike,ttm,vol-vol_chg,rf,c,call))/(2*vol_chg)

    ans['Theta']=-(gbsm(s,strike,ttm+t_chg,vol,rf,c,call)-gbsm(s,strike,ttm-t_chg,vol,rf,c,call))/(2*t_chg)

    ans['Rho']=(gbsm(s,strike,ttm,vol,rf+rf_chg,c+rf_chg,call)-gbsm(s,strike,ttm,vol,rf-rf_chg,c-rf_chg,call))/(2*rf_chg) # c should also be changed since c=rf-q 

    ans['Carry Rho']=(gbsm(s,strike,ttm,vol,rf,c+c_chg,call)-gbsm(s,strike,ttm,vol,rf,c-c_chg,call))/(2*c_chg)

    return ans

In [None]:
# Initialization
CurrentDate=datetime.datetime.strptime('03/13/2022','%m/%d/%Y')
ExpirationDate=datetime.datetime.strptime('04/15/2022','%m/%d/%Y')
Dividend_date=datetime.datetime.strptime('04/11/2022','%m/%d/%Y')

t=(ExpirationDate-CurrentDate).days
tradingDayYear=365
divTimes=(Dividend_date-CurrentDate).days

s=165
strike=165
rf=0.0425
q=0.0053
c=rf-q
ttm=t/tradingDayYear
vol=0.2

N=200
divTimes=[divTimes/t*N]
divAmts=[0.88]

# European
# Closed Form of Greeks 
call_greeks_closed_form=greeks_closed_form(s,strike,ttm,vol,rf,c,call=True)
put_greeks_closed_form=greeks_closed_form(s,strike,ttm,vol,rf,c,call=False)
# Finite Difference derivative calculation
call_greeks_gbsm_finite_diff=greeks_gbsm_finite_diff(s,strike,ttm,vol,rf,c,call=True)
put_greeks_gbsm_finite_diff=greeks_gbsm_finite_diff(s,strike,ttm,vol,rf,c,call=False)

## American Option Valuation via Binomial Tree (No dividends)

In [None]:
def binomial_tree_gbsm_american(s,strike,ttm,vol,rf,c,N=200,call=True):
    ''' Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        Option valuation via Binomial Trees
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        N - Steps of Binomial Tree
        call - Call valuation if set True
    '''
    delta_t=ttm/N
    # price multiplier in the (positive, negtive) case
    u=np.exp(vol*np.sqrt(delta_t)) 
    d=np.exp(-vol*np.sqrt(delta_t))
    # probability of up/down move in price
    pu=(np.exp(c*delta_t)-d)/(u-d)
    pd=1-pu
    # discount factor
    df=np.exp(-rf*delta_t)
    # call or put
    optionType=1 if call else -1

    # American option
    nNodeFunction = lambda x: (x+1)*(x+2)//2 # Calculate the number of all the nodes
    # j: layer(step) of nodes (python start by 0)
    #i: the i-th node in the j-th step (python start by 0)
    idxFunction = lambda i,j: nNodeFunction(j-1)+i
    nNodes=nNodeFunction(N) # Number of nodes
    
    optionValues = np.empty(nNodes,dtype=float) # An array of all the nodes

    for j in range(N,-1,-1): 
        for i in range(j,-1,-1):
            idx = idxFunction(i,j) 
            price = s*u**i*d**(j-i) # i represt how many times price go up
            optionValues[idx]=max(0,optionType*(price-strike))
            if j < N:
                optionValues[idx]=max(optionValues[idx],df*(pu*optionValues[idxFunction(i+1,j+1)] + pd*optionValues[idxFunction(i,j+1)]))
    return optionValues[0]

## American Option Valuation via Binomial Tree (with Dividends)

In [None]:
def binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N=200,call=True,divAmts=None,divTimes=None):
    '''Generalize Black Scholes Merton
        rf = c       -- Black Scholes 1973
        c = rf - q   -- Merton 1973 stock model where q is the continous dividend yield
        c = 0        -- Black 1976 futures option model
        c,r = 0      -- Asay 1982 margined futures option model
        c = rf - rff -- Garman and Kohlhagen 1983 currency option model where rff is the risk free rate of the foreign currency

        Option valuation via Binomial Trees
        European Style.  Assumed LogNormal Prices
        s - Underlying Price
        strike - Strike Price
        t - days until maturity
        rf - Risk free rate
        vol - Yearly Volatility
        c - Cost of Carry
        tradingDayYear - trading days in a year
        N - Steps of Binomial Tree
        call - Call valuation if set True

        divAmts - Array of dividend amounts
        divTimes - Array of dividend times
    '''
    #if there are no dividends or the first dividend is outside out grid, return the standard bt_american value
    divAmts=np.array(divAmts)
    divTimes=np.array(divTimes).astype(np.int32)
    if  divAmts.size == 0 or divTimes.size == 0 or divTimes[0]>N:
        return binomial_tree_gbsm_american(s,strike,ttm,vol,rf,c,N,call)
        
    delta_t=ttm/N
    # price multiplier in the (positive, negtive) case
    u=np.exp(vol*np.sqrt(delta_t)) 
    d=np.exp(-vol*np.sqrt(delta_t))
    # probability of up/down move in price
    pu=(np.exp(c*delta_t)-d)/(u-d)
    pd=1-pu
    # discount factor
    df=np.exp(-rf*delta_t)
    # call or put
    optionType=1 if call else -1

    # American option
    nNodeFunction = lambda x: (x+1)*(x+2)//2 # Calculate the number of all the nodes
    # j: layer(step) of nodes (python start by 0)
    #i: the i-th node in the j-th step (python start by 0)
    idxFunction = lambda i,j: nNodeFunction(j-1)+i
    nNodes=nNodeFunction(divTimes[0]) # Number of nodes

    optionValues = np.empty(nNodes,dtype=float) # An array of all the nodes

    for j in range(divTimes[0],-1,-1): 
        for i in range(j,-1,-1):
            idx = idxFunction(i,j) 
            price = s*u**i*d**(j-i) # i represt how many times price go up
            if j < divTimes[0]:
                # times before the dividend working backward induction
                optionValues[idx]=max(0,optionType*(price-strike))
                optionValues[idx]=max(optionValues[idx],df*(pu*optionValues[idxFunction(i+1,j+1)] + pd*optionValues[idxFunction(i,j+1)]))
            else:
                # time of the dividend
               valNoExercise = binomial_tree_gbsm_american_div(price-divAmts[0],strike,ttm-divTimes[0]*delta_t,vol,rf,c,N-divTimes[0],call,divAmts[1:],divTimes[1:]-divTimes[0])
               valExercise =  max(0,optionType*(price-strike))
               optionValues[idx] = max(valNoExercise,valExercise)
    return optionValues[0]

In [None]:
# Valuation 

# Continuously Compounding Coupon
# without dividend
call_coupon_no_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,True,[],[])
put_coupon_no_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,False,[],[])
# With dividend
call_coupon_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,True,divAmts,divTimes)
put_coupon_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,False,divAmts,divTimes)

# No Coupon
# without dividend
call_no_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,rf,N,True,[],[])
put_no_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,rf,N,False,[],[])
# With dividend
call_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,rf,N,True,divAmts,divTimes)
put_dividend = binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,rf,N,False,divAmts,divTimes)

## Greeks for American/European Option via Binomial Tree and Finite Difference

In [None]:
def greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,c,N=200,call=True,divAmts=None,divTimes=None):
    '''Greeks for American option via binomial tree and finite difference'''
    optionType=['Call'] if call else ['Put']
    s_chg = 0.2 # amount of price change 
    vol_chg = 0.01 # amount of volatility change
    t_chg = 0.01 # amount of ttm change
    rf_chg = 0.01 # amount of rf change
    c_chg = 0.01 # amount of carry of cost change
    divAmts_chg=0.01 # amount of dividend amount change

    ans=pd.DataFrame(index=optionType,columns=['Detla','Gamma','Vega','Theta','Rho','Carry Rho','Sensitivity to Dividend Amount'])

    price_price_m_s_chg=binomial_tree_gbsm_american_div(s-s_chg,strike,ttm,vol,rf,c,N,call,divAmts,divTimes) # price-s_chg
    price_price_p_s_chg=binomial_tree_gbsm_american_div(s+s_chg,strike,ttm,vol,rf,c,N,call,divAmts,divTimes) # price+s_chg
    price=binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,call,divAmts,divTimes) 

    ans['Detla']=(price_price_p_s_chg-price_price_m_s_chg)/(2*s_chg)
    
    ans['Gamma']=(price_price_p_s_chg+price_price_m_s_chg-2*price)/s_chg**2

    ans['Vega']=(binomial_tree_gbsm_american_div(s,strike,ttm,vol+vol_chg,rf,c,N,call,divAmts,divTimes)-binomial_tree_gbsm_american_div(s,strike,ttm,vol-vol_chg,rf,c,N,call,divAmts,divTimes))/(2*vol_chg)

    ans['Theta']=-(binomial_tree_gbsm_american_div(s,strike,ttm+t_chg,vol,rf,c,N,call,divAmts,divTimes) - binomial_tree_gbsm_american_div(s,strike,ttm-t_chg,vol,rf,c,N,call,divAmts,divTimes) )/(2*t_chg)

    ans['Rho']=(binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf+rf_chg,c+rf_chg,N,call,divAmts,divTimes) - binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf-rf_chg,c-rf_chg,N,call,divAmts,divTimes) )/(2*rf_chg) # c should also be changed since c=rf-q

    ans['Carry Rho']=(binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c+c_chg,N,call,divAmts,divTimes)  - binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c-c_chg,N,call,divAmts,divTimes) )/(2*c_chg)

    if divAmts and divTimes:
        divAmts = np.array(divAmts)
        divTimes = np.array(divTimes)
        ans['Sensitivity to Dividend Amount'] = (binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,call,divAmts+divAmts_chg,divTimes)  - binomial_tree_gbsm_american_div(s,strike,ttm,vol,rf,c,N,call,divAmts-divAmts_chg,divTimes) )/(2*divAmts_chg)


    return ans

In [None]:
# Greeks derived by binomial tree and finite difference

# Continuously Compounding Coupon
# without dividend (Continuously Compounding Coupon of 0.53%)
call_greeks_coupon_no_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,c,N,True,[],[])
put_greeks_coupon_no_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,c,N,False,[],[])
# With dividend (Continuously Compounding Coupon of 0.53% + Discrete dividends)
call_greeks_coupon_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,c,N,True,divAmts,divTimes)
put_greeks_coupon_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,c,N,False,divAmts,divTimes)

# No coupon
# without dividend (No coupon)
call_greeks_no_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,rf,N,True,[],[])
put_greeks_no_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,rf,N,False,[],[])
# With dividend (No coupon + Discrete dividends)
call_greeks_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,rf,N,True,divAmts,divTimes)
put_greeks_dividend=greeks_binomial_tree_finite_diff(s,strike,ttm,vol,rf,rf,N,False,divAmts,divTimes)

## Option Delta (could be applid to DataFrame)

In [None]:
def option_delta(data):
    '''Use binomial tree and finite difference to find the delta'''

    if data['Type']!='Option': # if the security is not option return 1
        return 1

    s_chg=0.2

    tradingDayYear=365

    CurrentDate=datetime.datetime.strptime('03/03/2023','%m/%d/%Y')
    ExpirationDate=data['ExpirationDate']
    Dividend_date=datetime.datetime.strptime('03/15/2023','%m/%d/%Y')
    ExpirationDate=datetime.datetime.strptime(ExpirationDate,'%m/%d/%Y')
    t=(ExpirationDate-CurrentDate).days
    divTimes=(Dividend_date-CurrentDate).days

    s=151.03
    strike=data['Strike']
    call=True if data['OptionType']=='Call' else False
    rf=0.0425
    c=rf
    ttm=t/tradingDayYear
    P=data['CurrentPrice']
    vol=data['Implied Volatility']

    N=200
    divTimes=[divTimes/t*N]
    divAmts=[1]

    price_price_m_s_chg=binomial_tree_gbsm_american_div(s-s_chg,strike,ttm,vol,rf,c,N,call,divAmts,divTimes) # price-s_chg
    price_price_p_s_chg=binomial_tree_gbsm_american_div(s+s_chg,strike,ttm,vol,rf,c,N,call,divAmts,divTimes) # price+s_chg

    ans=(price_price_p_s_chg-price_price_m_s_chg)/(2*s_chg)
    return ans

## Delta Normal for Option Strategy

In [None]:
df=pd.read_csv('problem2.csv')
# current stock price
s=151.03
# Calculate the implied volatility of each option
iVol=df.apply(findImpliedVol_plus,axis=1)
df['Implied Volatility']=iVol
# Calculate the delta of each option
deltas=df.apply(option_delta,axis=1)
df['Delta']=deltas
# Asset Value
df['AssetValue']=df.apply(lambda x: abs(x['Holding'])*x['CurrentPrice'],axis=1)
# Portfolio Value
PortfolioValue=df.groupby('Portfolio').sum()['AssetValue']
df['PortfolioValue']=df.apply(lambda x: PortfolioValue[x['Portfolio']],axis=1)
# Weighted Delta
df['WeightedDelta']=df.apply(lambda x: x['Holding']*s*x['Delta']/x['PortfolioValue'],axis=1)
# df['WeightedDelta']=df.apply(lambda x: x['Holding']*s*x['Delta'],axis=1)

# Calculate the return of AAPL
priceData=pd.read_csv('DailyPrices.csv',index_col='Date')
logReturn=return_calculate(priceData.AAPL,option="CONTINUOUS",rm_means=True)
# Calculate the Volatility of AAPL
AAPLvol=logReturn.var()
# Calculate the Volatility of Portfolio
PortfolioStd=np.sqrt((df.groupby('Portfolio').sum()['WeightedDelta'])**2*AAPLvol*10)
df['PortfolioStd']=df.apply(lambda x: PortfolioStd[x['Portfolio']],axis=1)


ans=pd.DataFrame()
alpha=0.05
# Calculate the VaR of Portfolio (alpha=0.05)
VaR=-norm.ppf(alpha,loc=0,scale=PortfolioStd)
# Calculate the ES of Portfolio (alpha=0.05)
ES=[]
for std,var in zip(PortfolioStd,VaR):
    ES.append(-norm(loc=0,scale=std).expect(lb=-np.inf,ub=var)/alpha)
ES=np.array(ES)

ans['VaR']=df.groupby('Portfolio')['PortfolioValue'].first()*VaR
ans['ES']=df.groupby('Portfolio')['PortfolioValue'].first()*ES
ans['Mean']=0
ans

## Super Efficient Portfolio

In [None]:
def super_efficient_portfolio(expected_rts,cov,rf=0.0425):
    '''Given a target return, use assets to find the optimal portfolio with lowest risk'''
    fun=lambda wts: -(wts@expected_rts-rf)/np.sqrt(wts@cov@wts)
    x0 = np.full(expected_rts.shape[0],1/expected_rts.shape[0])
    cons = [{'type':'ineq', 'fun':lambda x:x},
        {'type':'eq', 'fun':lambda x:sum(x)-1}]
    bounds = [(0, 1) for _ in range(expected_rts.shape[0])]
    res = optimize.minimize(fun, x0, method='SLSQP',bounds=bounds,constraints=cons)
    return res

In [None]:
from scipy.stats import norm
# delta of stock is 1 * holdings
delta_stock=1 * 1
# delta of call option is delta(call) * holdings
delta_call=call_greeks_closed_form['Detla'][0] * (-1)
# Portfolio Delta 
delta_portfolio = delta_stock + delta_call

# Delta Normal VaR is just the Portfolio Delta * quantile * current Underlying Price
alpha=0.05
VaR=abs(norm.ppf(alpha,loc=0,scale=vol))*delta_portfolio*s
VaR