# Import packages

In [None]:
import pandas as pd
import numpy as np
from scipy.stats import kurtosis, skew
import seaborn as sns
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
from sklearn.linear_model import LinearRegression
import warnings
warnings.filterwarnings("ignore")


%matplotlib inline

import matplotlib.pyplot as plt

# HW 1 Helper Functions

In [None]:
def summary_stats(df, annual_fac):
    ss_df = (df.mean() * annual_fac).to_frame('Mean')
    ss_df['Vol'] = df.std() * np.sqrt(annual_fac)
    ss_df['Sharpe'] = ss_df['Mean'] / ss_df['Vol']
    
    return round(ss_df, 4)

In [1]:
def performance_summary(return_data):
    """ 
        Returns the Performance Stats for given set of returns
        Inputs: 
            return_data - DataFrame with Date index and Monthly Returns for different assets/strategies.
        Output:
            summary_stats - DataFrame with annualized mean return, vol, sharpe ratio. Skewness, Excess Kurtosis, Var (0.5) and
                            CVaR (0.5) and drawdown based on monthly returns. 
    """
    summary_stats = return_data.mean().to_frame('Annualized Return').apply(lambda x: x*12)
    summary_stats['Annualized Volatility'] = return_data.std().apply(lambda x: x*np.sqrt(12))
    summary_stats['Annualized Sharpe Ratio'] = summary_stats['Annualized Return']/summary_stats['Annualized Volatility']
    
    summary_stats['Skewness'] = return_data.skew()
    summary_stats['Excess Kurtosis'] = return_data.kurtosis()
    summary_stats['VaR (0.05)'] = return_data.quantile(.05, axis = 0)
    summary_stats['CVaR (0.05)'] = return_data[return_data <= return_data.quantile(.05, axis = 0)].mean()
    summary_stats['Min'] = return_data.min()
    summary_stats['Max'] = return_data.max()
    
    wealth_index = 1000*(1+return_data).cumprod()
    previous_peaks = wealth_index.cummax()
    drawdowns = (wealth_index - previous_peaks)/previous_peaks

    summary_stats['Max Drawdown'] = drawdowns.min()
    summary_stats['Peak'] = [previous_peaks[col][:drawdowns[col].idxmin()].idxmax() for col in previous_peaks.columns]
    summary_stats['Bottom'] = drawdowns.idxmin()
    
    recovery_date = []
    for col in wealth_index.columns:
        prev_max = previous_peaks[col][:drawdowns[col].idxmin()].max()
        recovery_wealth = pd.DataFrame([wealth_index[col][drawdowns[col].idxmin():]]).T
        recovery_date.append(recovery_wealth[recovery_wealth[col] >= prev_max].index.min())
    summary_stats['Recovery'] = recovery_date
    
    return summary_stats

In [2]:
def mvo_performance_stats(asset_returns,cov_matrix,port_weights, port_type,period):
    """ 
        Returns the Annualized Performance Stats for given asset returns, portfolio weights and covariance matrix
        Inputs: 
            asset_return - Excess return over the risk free rate for each asset (n x 1) Vector
            cov_matrix = nxn covariance matrix for the assets
            port_weights = weights of the assets in the portfolio (1 x n) Vector
            port_type = Type of Portfolio | Eg - Tangency or Mean-Variance Portfolio
            period = Monthly frequency
    """
    
    ret = np.dot(port_weights,asset_returns)
    vol = np.sqrt(port_weights @ cov_matrix @ port_weights.T)*np.sqrt(period)
    sharpe = ret/vol

    stats = pd.DataFrame([[ret,vol,sharpe]],columns= ["Annualized Return","Annualized Volatility","Annualized Sharpe Ratio"], index = [port_type])
    return stats

In [None]:
def compute_tangency(df_tilde, diagonalize_Sigma=False):

    """Compute tangency portfolio given a set of excess returns.
    Also, for convenience, this returns the associated vector of average
    returns and the variance-covariance matrix.

    Parameters
    ----------
    diagonalize_Sigma: bool
        When `True`, set the off diagonal elements of the variance-covariance
        matrix to zero.
    """
    Sigma = df_tilde.cov()

    # N is the number of assets
    N = Sigma.shape[0]

    Sigma_adj = Sigma.copy()
    if diagonalize_Sigma:

        Sigma_adj.loc[:,:] = np.diag(np.diag(Sigma_adj))

    mu_tilde = df_tilde.mean()
    Sigma_inv = np.linalg.inv(Sigma_adj)
    weights = Sigma_inv @ mu_tilde / (np.ones(N) @ Sigma_inv @ mu_tilde)

    # For convenience, I'll wrap the solution back into a pandas.Series object.
    omega_tangency = pd.Series(weights, index=mu_tilde.index)
    return omega_tangency, mu_tilde, Sigma_adj

omega_tangency, mu_tilde, Sigma = compute_tangency(df_ex)
omega_tangency.to_frame('Tangency Weights')

In [3]:
def tangency_portfolio_rfr(asset_return,cov_matrix, cov_diagnolize = False):
    """ 
        Returns the tangency portfolio weights in a (1 x n) vector
        Inputs: 
            asset_return - return for each asset (n x 1) Vector
            cov_matrix = nxn covariance matrix for the assets
    """
    if cov_diagnolize:
        asset_cov = np.diag(np.diag(cov_matrix))
    else:
        asset_cov = np.array(cov_matrix)
    inverted_cov= np.linalg.inv(asset_cov)
    one_vector = np.ones(len(cov_matrix.index))
    
    den = (one_vector @ inverted_cov) @ (asset_return)
    num =  inverted_cov @ asset_return
    return (1/den) * num

In [4]:
def mv_portfolio_rfr(asset_return,cov_matrix,target_ret,tangency_port):
    """ 
        Returns the Mean-Variance portfolio weights in a (1 x n) vector when a riskless assset is available
        Inputs: 
            asset_return - Excess return over the risk free rate for each asset (n x 1) Vector
            cov_matrix = nxn covariance matrix for the assets
            target_ret = Target Return (Annualized)
            tangency_port = Tangency portfolio when a riskless assset is available
    """
    asset_cov = np.array(cov_matrix)
    inverted_cov= np.linalg.inv(asset_cov)
    one_vector = np.ones(len(cov_matrix.index))
    
    delta_den = (asset_return.T @ inverted_cov) @ (asset_return)
    delta_num = (one_vector @ inverted_cov) @ (asset_return)
    delta_tilde = (delta_num/delta_den) * target_ret
    return (delta_tilde * tangency_port)

In [None]:
def target_mv_portfolio(df_tilde, target_return=0.01, diagonalize_Sigma=False):
    """Compute MV optimal portfolio, given target return and set of excess returns.

    Parameters
    ----------
    diagonalize_Sigma: bool
        When `True`, set the off diagonal elements of the variance-covariance
        matrix to zero.
    """
    omega_tangency, mu_tilde, Sigma = compute_tangency(df_tilde, diagonalize_Sigma=diagonalize_Sigma)
    Sigma_adj = Sigma.copy()

    if diagonalize_Sigma:
        Sigma_adj.loc[:,:] = np.diag(np.diag(Sigma_adj))

    Sigma_inv = np.linalg.inv(Sigma_adj)
    N = Sigma_adj.shape[0]
    delta_tilde = ((np.ones(N) @ Sigma_inv @ mu_tilde)/(mu_tilde @ Sigma_inv @ mu_tilde)) * target_return
    omega_star = delta_tilde * omega_tangency

    return omega_star, mu_tilde, Sigma_adj

omega_star, mu_tilde, Sigma = target_mv_portfolio(df_ex)
omega_star_df = omega_star.to_frame('MV Portfolio Weights')
omega_star_df

In [5]:
def gmv_portfolio(asset_return,cov_matrix):
    """ 
        Returns the Global Minimum Variance portfolio weights in a (1 x n) vector
        Inputs: 
            asset_return - return for each asset (n x 1) Vector
            cov_matrix = nxn covariance matrix for the assets
    """
    asset_cov = np.array(cov_matrix)
    inverted_cov= np.linalg.inv(asset_cov)
    one_vector = np.ones(len(cov_matrix.index))
    
    den = (one_vector @ inverted_cov) @ (one_vector)
    num =  inverted_cov @ one_vector
    return (1/den) * num

In [None]:
def mv_portfolio(asset_return,cov_matrix,target_ret,tangency_port):
    """ 
        Returns the Mean-Variance portfolio weights in a (1 x n) vector when no riskless assset is available
        Inputs: 
            asset_return - total return for each asset (n x 1) Vector
            cov_matrix = nxn covariance matrix for the assets
            target_ret = Target Return (Not-Annualized)
            tangency_port = Tangency portfolio
    """
    omega_tan = tangency_portfolio_rfr(asset_return.mean(),cov_matrix)
    omega_gmv = gmv_portfolio(asset_return,cov_matrix) 
    
    mu_tan = asset_return.mean() @ omega_tan
    mu_gmv = asset_return.mean() @ omega_gmv
    
    delta = (target_ret - mu_gmv)/(mu_tan - mu_gmv)
    mv_weights = delta * omega_tan + (1-delta)*omega_gmv
    return mv_weights

In [None]:
def MV_dynamic(strat, no_tips = True, target_mean = .01, start_year = 2014, end_year = 2020, diagonal_param = False):

    df_ex_noTIPS = df_ex.drop(columns = ['TIP'])
    
    if strat == 'Diagonalized':
        diagonal = True
    else:
        diagonal = diagonal_param
    
    if no_tips == True:
        df = pd.DataFrame(data = None, columns = [strat + ' Mean',strat + ' Vol',strat + ' Sharpe'])
    else:
        df = pd.DataFrame(data = None, columns = [strat + ' TIPS Mean',strat + ' TIPS Vol',strat + ' TIPS Sharpe',
                                              strat + ' No TIPS Mean',strat + ' No TIPS Vol',strat + ' No TIPS Sharpe'])

    curr_year = start_year

    while curr_year <= end_year:
        OOS_year = curr_year + 1
        curr_df = df_ex.loc[:str(curr_year)]
        curr_df_noTIPS = df_ex_noTIPS.loc[:str(curr_year)]
        OOS_df = df_ex.loc[str(OOS_year)]
        OOS_df_noTIPS = df_ex_noTIPS.loc[str(OOS_year)]

        omega_t,  mu_t, _ = compute_tangency(curr_df, diagonalize_Sigma = diagonal)

        _, mu_t_OOS, Sigma_t_OOS = compute_tangency(OOS_df, diagonalize_Sigma = False)

        omega_noT, mu_noT, _ = compute_tangency(curr_df_noTIPS, diagonalize_Sigma = diagonal)

        _, mu_noT_OOS, Sigma_noT_OOS = compute_tangency(OOS_df_noTIPS, diagonalize_Sigma = False)
        
        if strat == 'Equal Weights':
            equal_weights = np.ones(len(omega_t))
            equal_weights[equal_weights_tot == 1] = (1/len(omega_t))
            omega_t = equal_weights * (target_mean / (mu_t @ equal_weights))
            
            if no_tips == False:
                equal_weights = np.ones(len(omega_noT))
                equal_weights[equal_weights == 1] = (1/len(omega_noT))
                omega_noT = equal_weights * (target_mean / (mu_noT @ equal_weights))
            
        if strat == 'Risk Parity':
            omega_t = 1/curr_df.std()
            omega_noT = 1/curr_df_noTIPS.std()
        
        factor_t = 1 / ((omega_t @ mu_t / target_mean))

        omega_t = factor_t * omega_t
                        
        factor_noT = 1 / ((omega_noT @ mu_noT / target_mean))

        omega_noT = factor_noT * omega_noT
        
        if no_tips == True:
            df.loc[OOS_year] = list(portfolio_stats(omega_t, mu_t_OOS, Sigma_t_OOS, 12)['Portfolio Stats'].values)
        else:
            df.loc[OOS_year] = (list(portfolio_stats(omega_t, mu_t_OOS, Sigma_t_OOS, 12)['Portfolio Stats'].values)
            + list(portfolio_stats(omega_noT, mu_noT_OOS, Sigma_noT_OOS, 12)['Portfolio Stats'].values))

        curr_year += 1

    if no_tips == False:
        df[r'$\Delta Sharpe$'] = df[df.columns[2]] - df[df.columns[5]]
    
    return df

# HW2 Helper Functions

In [7]:
def regression_based_performance(factor,fund_ret,rf,constant = True):
    """ 
        Returns the Regression based performance Stats for given set of returns and factors
        Inputs:
            factor - Dataframe containing monthly returns of the regressors
            fund_ret - Dataframe containing monthly excess returns of the regressand fund
            rf - Monthly risk free rate of return
        Output:
            summary_stats - (Beta of regression, treynor ratio, information ratio, alpha). 
    """
    if constant:
        X = sm.tools.add_constant(factor)
    else:
        X = factor
    y=fund_ret
    model = sm.OLS(y,X,missing='drop').fit()
    
    if constant:
        beta = model.params[1:]
        alpha = round(float(model.params['const']),6)
        
    else:
        beta = model.params
    treynor_ratio = ((fund_ret.values-rf.values).mean()*12)/beta[0]
    tracking_error = (model.resid.std()*np.sqrt(12))
    if constant:        
        information_ratio = model.params[0]*12/tracking_error
    r_squared = model.rsquared
    if constant:
        return (beta,treynor_ratio,information_ratio,alpha,r_squared,tracking_error)
    else:
        return (beta,treynor_ratio,r_squared,tracking_error)

In [8]:
def rolling_regression_param(factor,fund_ret,roll_window = 60):
    """ 
        Returns the Rolling Regression parameters for given set of returns and factors
        Inputs:
            factor - Dataframe containing monthly returns of the regressors
            fund_ret - Dataframe containing monthly excess returns of the regressand fund
            roll_window = rolling window for regression
        Output:
            params - Dataframe with time-t as the index and constant and Betas as columns
    """
    X = sm.add_constant(factor)
    y= fund_ret
    rols = RollingOLS(y, X, window=roll_window)
    rres = rols.fit()
    params = rres.params.copy()
    params.index = np.arange(1, params.shape[0] + 1)
    return params

In [None]:
def display_correlation(df,list_maxmin=True):
    
    corrmat = df.corr()
    #ignore self-correlation
    corrmat[corrmat==1] = None
    sns.heatmap(corrmat)

    if list_maxmin:
        corr_rank = corrmat.unstack().sort_values().dropna()
        pair_max = corr_rank.index[-1]
        pair_min = corr_rank.index[0]

        print(f'MIN Correlation pair is {pair_min}')
        print(f'MAX Correlation pair is {pair_max}')

# HW3 Helper Fcns