In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from arch import arch_model
from arch.univariate import GARCH, EWMAVariance
from sklearn import linear_model
import scipy
import scipy.stats as stats
from statsmodels.regression.rolling import RollingOLS
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
pd.set_option("display.precision", 4)

## HW1

In [2]:
# summary assets' mean return, voaltility(stdev) and sharpe ratio
from math import tan


def summary_stat(df, annual_factor):
    '''summary assets' mean return, voaltility(stdev) and sharpe ratio'''
    result = pd.DataFrame()
    result["mean"] = df.mean() * annual_factor
    result["volatility"] = df.std() * np.sqrt(annual_factor)
    result["Sharpe Ratio"] = result["mean"]/result["volatility"]
    return result


def display_portfolio(weight, df, name, annual_factor):
    '''given weight and assets and portfolio name, 
    display portfolio's performance'''
    portfolio = (weight * df).sum(axis=1)
    portfolio_sum = summary_stat(portfolio.to_frame(name), annual_factor)
    return portfolio_sum

# calculate tangency portfolio omega

def cal_tangency_w(df, annual_factor):
    '''give assets return and annualize factor, return tangency weight'''
    N = df.shape[1]

    Sigma = df.cov() * annual_factor  # covariance matrix, annualized
    mean = df.mean() * annual_factor
    Sigma_inv = np.linalg.inv(Sigma)
    omega_t = (1/(np.ones(N) @ Sigma_inv @ mean)) * Sigma_inv @ mean
    return pd.Series(omega_t, index=df.columns)


def cal_GMV_w(df, annual_factor):
    '''give assets return and annulize factor return global minimum variance weight'''
    N = df.shape[1]

    Sigma = df.cov() * annual_factor  # covariance matrix, annualized
    Sigma_inv = np.linalg.inv(Sigma)
    omega_v = (1/(np.ones(N) @ Sigma_inv @ np.ones(N))) * \
        Sigma_inv @ np.ones(N)
    return pd.Series(omega_v, index=df.columns)


def cal_delta(mu_p, df, annual_factor=12):
    '''give target mu_p, assets return and annualize factor, return delta and weight. 
    Remember that m_p is the monthly(not annualized return. '''
    mu_p_ann = mu_p * annual_factor
    mu = df.mean() * annual_factor
    tangency_w = cal_tangency_w(df, annual_factor)
    GMV_w = cal_GMV_w(df, annual_factor)
    delta = (mu_p_ann - mu.T @ GMV_w)/(mu.T @ tangency_w - mu.T @ GMV_w)
    weight_star = delta * tangency_w + (1-delta) * GMV_w
    return delta, weight_star


def cal_risk_parity_w(df):
    std = df.std()
    risk_parity_w = 1/std/(1/std).sum()
    return risk_parity_w

In [None]:
def cal_excess_tangency(df_ex, annual_factor):
    N = df_ex.shape[1]

    Sigma = df_ex.cov() * annual_factor
    Sigma_inv = np.linalg.inv(Sigma)
    mean_ex = df_ex.mean() * annual_factor

    w_t = 1/(np.ones(N) @ Sigma_inv @ mean_ex) * Sigma_inv @ mean_ex
    return pd.Series(w_t, index=df_ex.columns)


def cal_excess_weight(mu_p_telda, df_ex, annual_factor):
    N = df_ex.shape[1]

    mean_ex = df_ex.mean() * annual_factor
    Sigma = df_ex.cov() * annual_factor
    Sigma_inv = np.linalg.inv(Sigma)

    w_t = 1/(np.ones(N) @ Sigma_inv @ mean_ex) * Sigma_inv @ mean_ex
    w_t = pd.Series(w_t, index=df_ex.columns)

    delta_telda = (np.ones(N) @ Sigma_inv @ mean_ex.T)/(mean_ex @
                                                        Sigma_inv @ mean_ex.T) * mu_p_telda * annual_factor

    w_star = delta_telda * w_t
    w_star = pd.Series(w_star, index=df_ex.columns)
    return w_star, w_t, delta_telda

In [None]:
# regression
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
from sklearn.linear_model import LinearRegression

# basic sm OLS model
model = sm.OLS(y, X).fit()
model.params
model.rsquared
model.resid

# add constant
X = sm.add_constant(X)
static_model = sm.OLS(y, X).fit()
sm.OLS(lhs, rhs, missing='drop').fit()
static_model.summary()
# confidence interval


# rolling
model = RollingOLS(y, X, window=60)
rolling_betas = model.fit().params.copy()
rolling_betas = model.fit().params.copy()
rolling_betas


model = LinearRegression().fit(X, y)
model.score(X, y)
yfit = model.predict(X)
model.intercept_
model.coef_

## HW2

In [None]:
def tail_risk_report(data, q):
    df = data.copy()
    df.index = data.index.date
    report = pd.DataFrame(columns = df.columns)
    
    report.loc['Skewness'] = df.skew()
    report.loc['Excess Kurtosis'] = df.kurtosis()
    report.loc['VaR'] = df.quantile(q)
    report.loc['Expected Shortfall'] = df[df < df.quantile(q)].mean()
    
    cum_ret = (1 + df).cumprod()
    rolling_max = cum_ret.cummax()
    drawdown = (cum_ret - rolling_max) / rolling_max
    report.loc['Max Drawdown'] = drawdown.min()
    report.loc['MDD Start'] = None
    report.loc['MDD End'] = drawdown.idxmin()
    report.loc['Recovery Date'] = None
    
    for col in df.columns:
        report.loc['MDD Start', col] = (rolling_max.loc[:report.loc['MDD End', col]])[col].idxmax()
        recovery_df = (drawdown.loc[report.loc['MDD End', col]:])[col]
        # modify the threshold for recovery from 0 to 0.001
        try:
            report.loc['Recovery Date', col] = recovery_df[recovery_df >= 0].index[0]
            report.loc['Recovery period (days)'] = (report.loc['Recovery Date'] - report.loc['MDD Start']).dt.days

        except:
            report.loc['Recovery Date', col] = None
            report.loc['Recovery period (days)'] = None

    return round(report,4)

In [None]:
# one target, different regressors.
def get_ols_result(regressor, targets, annualization=12, ignorenan=True):
    result = pd.DataFrame(
        columns=["Market Beta", "Treynor Ratio", "Info Ratio"], index=targets.columns)
    for ind in targets.columns:
        data = pd.merge(
            regressor, targets[[ind]], how="inner", left_index=True, right_index=True)
        X = data[regressor.columns]
        y = data[ind]
        model = LinearRegression().fit(X, y)
        yfit = model.predict(X)
        residuals = y - yfit

        # result.loc[ind, "Alpha"] = model.intercept_ * annualization
        result.loc[ind, "Market Beta"] = model.coef_[0]
        result.loc[ind, "Treynor Ratio"] = (
            y.mean()/model.coef_[0]) * annualization
        result.loc[ind, "Info Ratio"] = (
            model.intercept_ / residuals.std()) * np.sqrt(annualization)

    return result


# different target, one regressor
def get_ols_metrics(regressors, targets, annualization=1, ignorenan=True):
    # ensure regressors and targets are pandas dataframes, as expected
    if not isinstance(regressors, pd.DataFrame):
        regressors = regressors.to_frame()
    if not isinstance(targets, pd.DataFrame):
        targets = targets.to_frame()

    # align the targets and regressors on the same dates
    df_aligned = targets.join(regressors, how='inner', lsuffix='y ')
    Y = df_aligned[targets.columns]
    Xset = df_aligned[regressors.columns]

    reg = pd.DataFrame(index=targets.columns)
    for col in Y.columns:
        y = Y[col]

        if ignorenan:
            # ensure we use only non-NaN dates
            alldata = Xset.join(y)
            mask = alldata.notnull().all(axis=1)
            y = y[mask]
            X = Xset[mask]
        else:
            X = Xset

        model = LinearRegression().fit(X, y)
        reg.loc[col, 'alpha'] = model.intercept_ * annualization
        reg.loc[col, regressors.columns] = model.coef_
        reg.loc[col, 'r-squared'] = model.score(X, y)

        # sklearn does not return the residuals, so we need to build them
        yfit = model.predict(X)
        residuals = y - yfit

        # Treynor Ratio is only defined for univariate regression
        if Xset.shape[1] == 1:
            reg.loc[col, 'Treynor Ratio'] = (
                y.mean() / model.coef_) * annualization

        # if intercept =0, numerical roundoff will nonetheless show nonzero Info Ratio
        num_roundoff = 1e-12
        if np.abs(model.intercept_) < num_roundoff:
            reg.loc[col, 'Info Ratio'] = None
        else:
            reg.loc[col, 'Info Ratio'] = (
                model.intercept_ / residuals.std()) * np.sqrt(annualization)

    return reg


## HW3

In [None]:
def summary_stat(df, annual_factor, q=0.05):
    '''summary assets' mean return, voaltility(stdev) and sharpe ratio'''
    result = pd.DataFrame()
    result["mean"] = df.mean() * annual_factor
    result["volatility"] = df.std() * np.sqrt(annual_factor)
    result["Sharpe Ratio"] = result["mean"]/result["volatility"]

    result["VaR"] = df.quantile(q)
    return result


def get_capm_matrics(targets, regressors, add_constant=True, annualize_factor=12):

    result = pd.DataFrame(index=targets.columns)
    resid_matrix = pd.DataFrame(columns=targets.columns)
    t_p_value = pd.DataFrame(index=targets.columns)

    if add_constant:
        X = sm.add_constant(regressors)
    else:
        X = regressors.copy()
    for column in targets.columns:
        y = targets[[column]]
        model = sm.OLS(y, X, missing='drop').fit()
        if add_constant:
            result.loc[column, "alpha"] = model.params['const'] * \
                annualize_factor
        result.loc[column, regressors.columns] = model.params[regressors.columns]
        if add_constant:
            result.loc[column, 'Info Ratio'] = np.sqrt(
                annualize_factor) * model.params['const'] / model.resid.std()

        if regressors.shape[1] == 1:
            result.loc[column, "Treynor Ratio"] = (
                y.mean().values[0]/model.params[regressors.columns].values[0]) * annualize_factor

        resid_matrix[column] = model.resid
        t_p_value.loc[column, "t-value"] = model.tvalues['const']
        t_p_value.loc[column, "p-value"] = model.pvalues['const']

    return result, resid_matrix, t_p_value


def cal_excess_tangency(df_ex, annual_factor):
    N = df_ex.shape[1]

    Sigma = df_ex.cov() * annual_factor
    Sigma_inv = np.linalg.inv(Sigma)
    mean_ex = df_ex.mean() * annual_factor

    w_t = 1/(np.ones(N) @ Sigma_inv @ mean_ex) * Sigma_inv @ mean_ex
    return pd.Series(w_t, index=df_ex.columns)


T = portfolio_ex.dropna().shape[0]
# annualize
SR = factors['Mkt-RF'].mean() / factors['Mkt-RF'].std() * np.sqrt(12)
Sigma = resid_matrix.cov()
Sigma_inv = pd.DataFrame(np.linalg.inv(
    Sigma), index=Sigma.index, columns=Sigma.columns)
alpha = capm_res['alpha']
H = T * (1 + SR**2)**(-1) * alpha @ Sigma_inv @ alpha
print('H = {:.2f}'.format(H))
pvalue = 1 - stats.chi2.cdf(H, df=25)
print('p-value = {:.4f}'.format(pvalue))


## HW5

In [None]:
def get_capm_matrics(targets, regressors, add_constant=True, annualize_factor=12):

    result = pd.DataFrame(index=targets.columns)
    resid_matrix = pd.DataFrame(columns=targets.columns)
    t_p_value = pd.DataFrame(index=targets.columns)

    if add_constant:
        X = sm.add_constant(regressors)
    else:
        X = regressors.copy()
    for column in targets.columns:
        y = targets[[column]]
        model = sm.OLS(y, X, missing='drop').fit()
        if add_constant:
            result.loc[column, "alpha"] = model.params['const'] * \
                annualize_factor
        result.loc[column, regressors.columns] = model.params[regressors.columns]

        result.loc[column, "R-squared"] = model.rsquared

        resid_matrix[column] = model.resid
        if add_constant:
            t_p_value.loc[column, "t-value"] = model.tvalues['const']
            t_p_value.loc[column, "p-value"] = model.pvalues['const']

    return result, resid_matrix, t_p_value