In [9]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
import sys


# Plotting settings.
plt.rcParams["axes.grid"] = True
plt.rcParams["axes.spines.right"] = False
plt.rcParams["axes.spines.top"] = False

# Pandas settings.
pd.set_option("display.float_format", lambda x: "{:.4f}".format(x))

# Constants for risk metrics, return metrics, and annualization.
RETURN_COLS = ["Annualized Return", "Annualized Volatility", "Annualized Sharpe Ratio"]
RISK_COLS = [
    "Skewness",
    "Excess Kurtosis",
    "VaR (0.05)",
    "CVaR (0.05)",
    "Max Drawdown",
    "Bottom",
    "Peak",
    "Recovery",
    "Duration (days)",
]
ADJ = 12


def calc_return_metrics(data, as_df=False, adj=12):
    """
    Calculate return metrics for a given dataset. Specifically:
    - Annualized Return
    - Annualized Volatility
    - Annualized Sharpe Ratio
    - Annualized Sortino Ratio (not part of the course, but useful to know)

    Args:
        data : Returns time series.
        as_df (bool, optional): Return a df or dict. Defaults to False.
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        DataFrame or dict: Summary of return metrics.
    """
    summary = dict()
    summary["Annualized Return"] = data.mean() * adj
    summary["Annualized Volatility"] = data.std() * np.sqrt(adj)
    summary["Annualized Sharpe Ratio"] = (
        summary["Annualized Return"] / summary["Annualized Volatility"]
    )
    summary["Annualized Sortino Ratio"] = (
        summary["Annualized Return"] * np.sqrt(adj) / (data[data < 0].std())
    )

    # Here, we use what is known as a "ternary operator", usually denoted as "condition ? if_true : if_false",
    # in other programming languages. This is equivalent to having an explicit if-else statement, but is more
    # concise and can be written on a single line.
    return pd.DataFrame(summary, index=data.columns) if as_df else summary

# Utility Functions from Tobias

In [12]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# ------------------------------
# FINM 25000 - UTILITY FUNCTIONS
# ------------------------------
# Disclaimer & Usage Notes:
# ------------------------------------------------------------------------------
# While a decent amount of effort has gone into ensuring correctness,
# there is no guarantee that these functions are 100% correct. You are
# free to use these functions, without attribution, in your own work.
#
# The general structure in terms of naming conventions is that any function
# beginning with "calc_", is a function that calculates something, and will
# return a DataFrame or a dict. Any function beginning with "plot_", is a
# function that plots something, and will return a matplotlib axes object.
#
# Any function that begins with "print_", is a function that prints something
# and returns nothing.
# ------------------------------------------------------------------------------


import statsmodels.api as sm
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.patheffects as PathEffects

# --------------------
# REGRESSION FUNCTIONS
# --------------------


def calc_univariate_regression(y, X, intercept=True, adj=12):
    """
    Calculate a univariate regression of y on X. Note that both X and y
    need to be one-dimensional.

    Args:
        y : target variable
        X : independent variable
        intercept (bool, optional): Fit the regression with an intercept or not. Defaults to True.
        adj (int, optional): What to adjust the returns by. Defaults to 12.

    Returns:
        DataFrame: Summary of regression results
    """
    X_down = X[y < 0]
    y_down = y[y < 0]
    if intercept:
        X = sm.add_constant(X)
        X_down = sm.add_constant(X_down)

    model = sm.OLS(y, X, missing="drop")
    results = model.fit()

    inter = results.params[0] if intercept else 0
    beta = results.params[1] if intercept else results.params[0]

    summary = dict()

    summary["Alpha"] = inter * adj
    summary["Beta"] = beta
    down_mod = sm.OLS(y_down, X_down, missing="drop").fit()
    summary["Downside Beta"] = down_mod.params[1] if intercept else down_mod.params[0]

    summary["R-Squared"] = results.rsquared
    summary["Treynor Ratio"] = (y.mean() / beta) * adj
    summary["Information Ratio"] = (inter / results.resid.std()) * np.sqrt(adj)
    summary["Tracking Error"] = (
        inter / summary["Information Ratio"]
        if intercept
        else results.resid.std() * np.sqrt(adj)
    )

    return pd.DataFrame(summary, index=[y.name])


def calc_multivariate_regression(y, X, intercept=True, adj=12):
    """
    Calculate a multivariate regression of y on X. Adds useful metrics such
    as the Information Ratio and Tracking Error. Note that we can't calculate
    Treynor Ratio or Downside Beta here.

    Args:
        y : target variable
        X : independent variables
        intercept (bool, optional): Defaults to True.
        adj (int, optional): Annualization factor. Defaults to 12.

    Returns:
        DataFrame: Summary of regression results
    """
    if intercept:
        X = sm.add_constant(X)

    model = sm.OLS(y, X, missing="drop")
    results = model.fit()
    summary = dict()

    inter = results.params[0] if intercept else 0
    betas = results.params[1:] if intercept else results.params

    summary["Alpha"] = inter * adj
    summary["R-Squared"] = results.rsquared

    X_cols = X.columns[1:] if intercept else X.columns

    for i, col in enumerate(X_cols):
        summary[f"{col} Beta"] = betas[i]

    summary["Information Ratio"] = (inter / results.resid.std()) * np.sqrt(adj)
    summary["Tracking Error"] = (
        inter / summary["Information Ratio"]
        if intercept
        else results.resid.std() * np.sqrt(adj)
    )
    return pd.DataFrame(summary, index=[y.name])


def calc_iterative_regression(y, X, intercept=True, one_to_many=False, adj=12):
    """
    Iterative regression for checking one X column against many different y columns,
    or vice versa. "one_to_many=True" means that we are checking one X column against many
    y columns, and "one_to_many=False" means that we are checking many X columns against a
    single y column.

    To enforce dynamic behavior in terms of regressors and regressands, we
    check that BOTH X and y are DataFrames

    Args:
        y : Target variable(s)
        X : Independent variable(s)
        intercept (bool, optional): Defaults to True.
        one_to_many (bool, optional): Which way to run the regression. Defaults to False.
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        DataFrame : Summary of regression results.
    """

    if not isinstance(X, pd.DataFrame) or not isinstance(y, pd.DataFrame):
        raise TypeError("X and y must both be DataFrames.")

    if one_to_many:
        if isinstance(X, pd.Series) or X.shape[1] > 1:
            summary = pd.concat(
                [
                    calc_multivariate_regression(y[col], X, intercept, adj)
                    for col in y.columns
                ],
                axis=0,
            )
        else:
            summary = pd.concat(
                [
                    calc_univariate_regression(y[col], X, intercept, adj)
                    for col in y.columns
                ],
                axis=0,
            )
        summary.index = y.columns
        return summary
    else:
        summary = pd.concat(
            [
                calc_univariate_regression(y, X[col], intercept, adj)
                for col in X.columns
            ],
            axis=0,
        )
        summary.index = X.columns
        return summary


# -----------------------
# RISK & RETURN FUNCTIONS
# -----------------------


def calc_return_metrics(data, as_df=False, adj=12):
    """
    Calculate return metrics for a DataFrame of assets.

    Args:
        data (pd.DataFrame): DataFrame of asset returns.
        as_df (bool, optional): Return a DF or a dict. Defaults to False (return a dict).
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        Union[dict, DataFrame]: Dict or DataFrame of return metrics.
    """
    summary = dict()
    summary["Annualized Return"] = data.mean() * adj
    summary["Annualized Volatility"] = data.std() * np.sqrt(adj)
    summary["Annualized Sharpe Ratio"] = (
        summary["Annualized Return"] / summary["Annualized Volatility"]
    )
    summary["Annualized Sortino Ratio"] = summary["Annualized Return"] / (
        data[data < 0].std() * np.sqrt(adj)
    )
    return pd.DataFrame(summary, index=data.columns) if as_df else summary


def calc_risk_metrics(data, as_df=False, adj=12):
    """
    Calculate risk metrics for a DataFrame of assets.

    Args:
        data (pd.DataFrame): DataFrame of asset returns.
        as_df (bool, optional): Return a DF or a dict. Defaults to False.
        adj (int, optional): Annualizatin. Defaults to 12.

    Returns:
        Union[dict, DataFrame]: Dict or DataFrame of risk metrics.
    """
    summary = dict()
    summary["Skewness"] = data.skew()
    summary["Excess Kurtosis"] = data.kurtosis()
    summary["VaR (0.05)"] = data.quantile(0.05, axis=0)
    summary["CVaR (0.05)"] = data[data <= data.quantile(0.05, axis=0)].mean()
    summary["Min"] = data.min()
    summary["Max"] = data.max()

    wealth_index = 1000 * (1 + data).cumprod()
    previous_peaks = wealth_index.cummax()
    drawdowns = (wealth_index - previous_peaks) / previous_peaks

    summary["Max Drawdown"] = drawdowns.min()

    summary["Bottom"] = drawdowns.idxmin()
    summary["Peak"] = previous_peaks.idxmax()

    recovery_date = []
    for col in wealth_index.columns:
        prev_max = previous_peaks[col][: drawdowns[col].idxmin()].max()
        recovery_wealth = pd.DataFrame([wealth_index[col][drawdowns[col].idxmin() :]]).T
        recovery_date.append(
            recovery_wealth[recovery_wealth[col] >= prev_max].index.min()
        )
    summary["Recovery"] = ["-" if pd.isnull(i) else i for i in recovery_date]

    summary["Duration (days)"] = [
        (i - j).days if i != "-" else "-"
        for i, j in zip(summary["Recovery"], summary["Bottom"])
    ]

    return pd.DataFrame(summary, index=data.columns) if as_df else summary


def calc_performance_metrics(data, adj=12):
    """
    Aggregating function for calculating performance metrics. Returns both
    risk and performance metrics.

    Args:
        data (pd.DataFrame): DataFrame of asset returns.
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        DataFrame: DataFrame of performance metrics.
    """
    summary = {**calc_return_metrics(data, adj), **calc_risk_metrics(data, adj)}
    summary["Calmar Ratio"] = summary["Annualized Return"] / abs(
        summary["Max Drawdown"]
    )
    return pd.DataFrame(summary, index=data.columns)


# -----------------------
# CORRELATION HELPERS
# -----------------------


def plot_correlation_matrix(corrs):
    # Correlation helper function.
    return sns.heatmap(
        corrs,
        annot=True,
        cmap="coolwarm",
        vmin=-1,
        vmax=1,
        linewidths=0.7,
        annot_kws={"size": 10},
        fmt=".2f",
        square=True,
        cbar_kws={"shrink": 0.75},
    )


def print_max_min_correlation(corrs):
    # Correlation helper function.
    corr_series = corrs.unstack()
    corr_series = corr_series[corr_series != 1]

    max_corr = corr_series.abs().agg(["idxmax", "max"]).T
    min_corr = corr_series.abs().agg(["idxmin", "min"]).T
    min_corr_raw = corr_series.agg(["idxmin", "min"]).T
    max_corr, max_corr_val = max_corr["idxmax"], max_corr["max"]
    min_corr, min_corr_val = min_corr["idxmin"], min_corr["min"]
    min_corr_raw, min_corr_raw_val = min_corr_raw["idxmin"], min_corr_raw["min"]

    print(
        f"Max Corr (by absolute value): {max_corr[0]} and {max_corr[1]} with a correlation of {max_corr_val:.2f}"
    )
    print(
        f"Min Corr (by absolute value): {min_corr[0]} and {min_corr[1]} with a correlation of {min_corr_val:.2f}"
    )
    print(
        f"Min Corr (raw): {min_corr_raw[0]} and {min_corr_raw[1]} with a correlation of {min_corr_raw_val:.2f}"
    )


# -------------------
# PORTFOLIO FUNCTIONS
# -------------------


def calc_tangency_portfolio(mean_rets, cov_matrix):
    """
    Function to calculate tangency portfolio weights. Comes from the
    formula seen in class (Week 1).

    Args:
        mean_rets: Vector of mean returns.
        cov_matrix: Covariance matrix of returns.

    Returns:
        Vector of tangency portfolio weights.
    """
    inv_cov = np.linalg.inv(cov_matrix)
    ones = np.ones(mean_rets.shape)
    return (inv_cov @ mean_rets) / (ones.T @ inv_cov @ mean_rets)


def calc_gmv_portfolio(cov_matrix):
    """
    Function to calculate the weights of the global minimum variance portfolio.

    Args:
        cov_matrix : Covariance matrix of returns.

    Returns:
        Vector of GMV portfolio weights.
    """
    try:
        cov_inv = np.linalg.inv(cov_matrix)
    except TypeError:
        cov_inv = np.linalg.inv(np.array(cov_matrix))

    one_vector = np.ones(len(cov_matrix.index))
    return cov_inv @ one_vector / (one_vector @ cov_inv @ (one_vector))


def calc_mv_portfolio(mean_rets, cov_matrix, target=None):
    """
    Function to calculate the weights of the mean-variance portfolio. If
    target is not specified, then the function will return the tangency portfolio.
    If target is specified, then we return the MV-efficient portfolio with the target
    return.

    Args:
        mean_rets : Vector of mean returns.
        cov_matrix : Covariance matrix of returns.
        target (optional):  Target mean return. Defaults to None. Note: must be adjusted for
                            annualization the same time-frequency as the mean returns. If the
                            mean returns are monthly, the target must be monthly as well.

    Returns:
        Vector of MV portfolio weights.
    """
    w_tan = calc_tangency_portfolio(mean_rets, cov_matrix)

    if target is None:
        return w_tan

    w_gmv = calc_gmv_portfolio(cov_matrix)
    delta = (target - mean_rets @ w_gmv) / (mean_rets @ w_tan - mean_rets @ w_gmv)
    return delta * w_tan + (1 - delta) * w_gmv


# -----------------------------------------------
# MISC. FUNCTIONS
#
# These are functions that you, as a student, will
# not use, but are used to illustrate some of the
# concepts in the class, via TA code.
# -----------------------------------------------


def plot_capm_regression(assets, market, adj=12):
    """
    Plot CAPM regressions on a scatter plot.

    Args:
        assets : DataFrame of asset returns.
        market : DataFrame of market returns.
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        fig, ax: Figure and axes objects for the plot.
    """
    fig, ax = plt.subplots(figsize=(8, 6))
    betas = []
    means = []

    market_const = sm.add_constant(market)
    for asset in assets.columns:
        regr = sm.OLS(assets[asset], market_const).fit()
        ax.scatter(regr.params[1], assets[asset].mean() * adj, label=asset, zorder=2)
        ax.annotate(asset, (regr.params[1], assets[asset].mean() * adj), zorder=3)
        betas.append(regr.params[1])
        means.append(assets[asset].mean() * adj)

    ax.plot([0, 1.4], [0, market.mean()[0] * adj * 1.4], c="black", zorder=1, alpha=0.5)
    beta_mean_regr = sm.OLS(means, sm.add_constant(betas)).fit()

    ax.plot(
        np.arange(0, 1.6, 0.1),
        beta_mean_regr.params[0] + beta_mean_regr.params[1] * np.arange(0, 1.6, 0.1),
        zorder=1,
        alpha=0.7,
    )

    ax.set_yticks(np.arange(0, 0.15, 0.02))
    ax.set_xticks(np.arange(0, 1.6, 0.2))
    ax.set_title("Mean Return vs. Beta")
    ax.set_xlabel("Beta")
    ax.set_ylabel("Mean Return")
    fig.tight_layout()
    return fig, ax


def plot_mv_frontier(rets, delta=2, plot_tan=True, adj=12):
    """
    Plot MV frontier, and the tangency and GMV portfolios.

    Args:
        rets : Returns DataFrame
        delta (int, optional): Delta range (from -delta to +delta). Defaults to 2. Use to make
                                the plot look nicer, and keep the MV frontier within a reasonable range.
        plot_tan (bool, optional): Set to False if the tangency gives an extreme value. Defaults to True.
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        fig, ax: Figure and axes objects for the plot.
    """
    omega_tan = pd.Series(
        calc_tangency_portfolio(rets.mean(), rets.cov()), index=rets.columns
    )

    omega_gmv = pd.Series(calc_gmv_portfolio(rets.cov()), index=rets.columns)
    omega = pd.concat([omega_tan, omega_gmv], axis=1)
    omega.columns = ["tangency", "gmv"]

    delta_grid = np.linspace(-delta, delta, 150)
    mv_frame = pd.DataFrame(columns=["mean", "vol"], index=delta_grid)
    for i, delta in enumerate(delta_grid):
        omega_mv = delta * omega_tan + (1 - delta) * omega_gmv
        rets_p = rets @ omega_mv
        mv_frame["mean"].iloc[i] = rets_p.mean() * adj
        mv_frame["vol"].iloc[i] = rets_p.std() * np.sqrt(adj)

    rets_special = pd.DataFrame(index=rets.index)
    rets_special["tan"] = rets @ omega_tan.values
    rets_special["gmv"] = rets @ omega_gmv.values

    mv_assets = pd.concat([rets.mean() * adj, rets.std() * np.sqrt(adj)], axis=1)
    mv_special = pd.concat(
        [rets_special.mean() * adj, rets_special.std() * np.sqrt(adj)], axis=1
    )
    mv_assets.columns = ["mean", "vol"]
    mv_special.columns = ["mean", "vol"]

    fig, ax = plt.subplots(figsize=(8, 8))

    ax.plot(
        mv_frame["vol"],
        mv_frame["mean"],
        c="k",
        linewidth=3,
        label="MV Frontier",
        zorder=1,
    )
    if plot_tan:
        ax.scatter(
            mv_special["vol"][0],
            mv_special["mean"][0],
            c="g",
            linewidth=3,
            label="Tangency Portfolio",
        )
        text = ax.text(
            x=mv_special["vol"][0] + 0.0005,
            y=mv_special["mean"][0] + 0.0005,
            s="Tangency",
            fontsize=11,
            c="w",
        )
        text.set_path_effects([PathEffects.withStroke(linewidth=2, foreground="black")])

    ax.scatter(
        mv_special["vol"][1],
        mv_special["mean"][1],
        c="r",
        linewidth=3,
        label="GMV Portfolio",
    )
    text = ax.text(
        x=mv_special["vol"][1] + 0.0005,
        y=mv_special["mean"][1] + 0.0005,
        s="GMV",
        fontsize=11,
        c="w",
    )
    text.set_path_effects([PathEffects.withStroke(linewidth=2, foreground="black")])
    ax.scatter(
        mv_assets["vol"],
        mv_assets["mean"],
        c="b",
        linewidth=3,
        label="Individual Assets",
    )

    for i in range(mv_assets.shape[0]):
        text = ax.text(
            x=mv_assets["vol"][i] + 0.0005,
            y=mv_assets["mean"][i] + 0.0005,
            s=mv_assets.index[i],
            fontsize=11,
            c="w",
        )
        text.set_path_effects([PathEffects.withStroke(linewidth=2, foreground="black")])
    ax.set_xlabel("Volatility (Annualized)")
    ax.set_ylabel("Mean (Annualized)")
    fig.tight_layout()
    return fig, ax


def plot_pairplot(rets):
    """
    Plot a pairplot of returns. Add a vertical line at 0 -- this is useful
    for visualizing the skewness of the returns.

    Args:
        rets (_type_): _description_

    Returns:
        axes : Axes object for the plot.
    """

    axes = sns.pairplot(rets, diag_kind="kde", plot_kws={"alpha": 0.5})
    for _, ax in enumerate(axes.diag_axes):
        ax.axvline(0, c="k", lw=1, alpha=0.7)

    return axes


# Mean Variance Portfolio Functions

In [10]:
def tan_portfolio(mean_rets, cov_matrix):
    """
    Function to calculate tangency portfolio weights. Comes from the
    formula seen in class (Week 1).

    Args:
        mean_rets: Vector of mean returns.
        cov_matrix: Covariance matrix of returns.

    Returns:
        Vector of tangency portfolio weights.
    """
    inv_cov = np.linalg.inv(cov_matrix)
    ones = np.ones(mean_rets.shape)
    return (inv_cov @ mean_rets) / (ones.T @ inv_cov @ mean_rets)


def gmv_portfolio(cov_matrix):
    """
    Function to calculate the weights of the global minimum variance portfolio.

    Args:
        cov_matrix : Covariance matrix of returns.

    Returns:
        Vector of GMV portfolio weights.
    """
    try:
        cov_inv = np.linalg.inv(cov_matrix)
    except TypeError:
        cov_inv = np.linalg.inv(np.array(cov_matrix))

    one_vector = np.ones(len(cov_matrix.index))
    return cov_inv @ one_vector / (one_vector @ cov_inv @ (one_vector))


def mv_portfolio(mean_rets, cov_matrix, target=None):
    """
    Function to calculate the weights of the mean-variance portfolio. If
    target is not specified, then the function will return the tangency portfolio.
    If target is specified, then we return the MV-efficient portfolio with the target
    return.

    Args:
        mean_rets : Vector of mean returns.
        cov_matrix : Covariance matrix of returns.
        target (optional):  Target mean return. Defaults to None. Note: must be adjusted for
                            annualization the same time-frequency as the mean returns. If the
                            mean returns are monthly, the target must be monthly as well.

    Returns:
        Vector of MV portfolio weights.
    """
    w_tan = tan_portfolio(mean_rets, cov_matrix)

    if target is None:
        return w_tan

    w_gmv = gmv_portfolio(cov_matrix)
    delta = (target - mean_rets @ w_gmv) / (mean_rets @ w_tan - mean_rets @ w_gmv)
    return delta * w_tan + (1 - delta) * w_gmv

# OLS Formulas/Functions

In [11]:
def performanceMetrics(returns,annualization=1):
    metrics = pd.DataFrame(index=returns.columns)
    metrics['Mean'] = returns.mean() * annualization
    metrics['Vol'] = returns.std() * np.sqrt(annualization)
    metrics['Sharpe'] = (returns.mean() / returns.std()) * np.sqrt(annualization)

    metrics['Min'] = returns.min()
    metrics['Max'] = returns.max()
    return metrics

def tailMetrics(returns, quantile=.05, relative=False, mdd=True):
    
    #Maximum Drawdown
    def maximumDrawdown(returns):
        cum_returns = (1 + returns).cumprod()
        rolling_max = cum_returns.cummax()
        drawdown = (cum_returns - rolling_max) / rolling_max

        max_drawdown = drawdown.min()
        end_date = drawdown.idxmin()
        summary = pd.DataFrame({'Max Drawdown': max_drawdown, 'Bottom': end_date})

        for col in drawdown:
            summary.loc[col,'Peak'] = (rolling_max.loc[:end_date[col],col]).idxmax()
            recovery = (drawdown.loc[end_date[col]:,col])
            try:
                summary.loc[col,'Recover'] = pd.to_datetime(recovery[recovery >= 0].index[0])
            except:
                summary.loc[col,'Recover'] = pd.to_datetime(None)

            summary['Peak'] = pd.to_datetime(summary['Peak'])
            try:
                summary['Duration (to Recover)'] = (summary['Recover'] - summary['Peak'])
            except:
                summary['Duration (to Recover)'] = None

            summary = summary[['Max Drawdown','Peak','Bottom','Recover','Duration (to Recover)']]

        return summary  
    
    metrics = pd.DataFrame(index=returns.columns)
    metrics['Skewness'] = returns.skew()
    metrics['Kurtosis'] = returns.kurtosis()

    VaR = returns.quantile(quantile)
    CVaR = (returns[returns < returns.quantile(quantile)]).mean()

    if relative:
        VaR = (VaR - returns.mean())/returns.std()
        CVaR = (CVaR - returns.mean())/returns.std()

    metrics[f'VaR ({quantile})'] = VaR
    metrics[f'CVaR ({quantile})'] = CVaR

    if mdd:
        mdd_stats = maximumDrawdown(returns)
        metrics = metrics.join(mdd_stats)

        if relative:
            metrics['Max Drawdown'] = (metrics['Max Drawdown'] - returns.mean())/returns.std()

    return metrics


def get_ols_metrics(regressors, targets, annualization=1, ignorenan=True):
    # ensure regressors and targets are pandas dataframes, as expected
    if not isinstance(regressors, pd.DataFrame):
        regressors = regressors.to_frame()
    if not isinstance(targets, pd.DataFrame):
        targets = targets.to_frame()

    # align the targets and regressors on the same dates
    df_aligned = targets.join(regressors, how='inner', lsuffix='y ')
    Y = df_aligned[targets.columns]
    Xset = df_aligned[regressors.columns]

    reg = pd.DataFrame(index=targets.columns)
    for col in Y.columns:
        y = Y[col]
        
        if ignorenan:
            # ensure we use only non-NaN dates
            alldata = Xset.join(y,lsuffix='X')
            mask = alldata.notnull().all(axis=1)
            y = y[mask]
            X = Xset[mask]
        else:
            X = Xset

        model = LinearRegression().fit(X, y)
        reg.loc[col, 'alpha'] = model.intercept_ * annualization
        reg.loc[col, regressors.columns] = model.coef_
        reg.loc[col, 'r-squared'] = model.score(X, y)

        # sklearn does not return the residuals, so we need to build them
        yfit = model.predict(X)
        residuals = y - yfit

        # Treynor Ratio is only defined for univariate regression
        if Xset.shape[1] == 1:
            reg.loc[col,'Treynor Ratio'] = (y.mean() / model.coef_) * annualization

        
        # if intercept =0, numerical roundoff will nonetheless show nonzero Info Ratio
        num_roundoff = 1e-12
        if np.abs(model.intercept_) < num_roundoff:
            reg.loc[col, 'Info Ratio'] = None
        else:
            reg.loc[col, 'Info Ratio'] = (model.intercept_ / residuals.std()) * np.sqrt(annualization)

    return reg

def tangency_portfolio(data):
    mu = data.mean()
    sigma = np.linalg.inv(data.cov())
    one_vector = np.ones(len(data.columns))
    return sigma @ mu / (one_vector @ sigma @ mu)

def tangency_portfolio_allocation(data, target_return = 0.01):
    mu = data.mean()
    sigma = np.linalg.inv(data.cov())
    one_vector = np.ones(len(data.columns))
    tan_wts = tangency_portfolio(data)
    return ((one_vector @ sigma @ mu) / (mu @ sigma @ mu)) * target_return 

## 1 Return Analysis (35pts)

In [13]:
factors = pd.read_excel('midterm_A_2023_data.xlsx', 0).set_index('Date')
commodities = pd.read_excel('midterm_A_2023_data.xlsx', 1).set_index('Date')
forecasting = pd.read_excel('midterm_A_2023_data.xlsx', 2).set_index('Date')

1. (5pts) For each of the 14 assets, report the following annualized excess return statistics:
• mean
• volatility
• Sharpe ratio

In [117]:
metrics = performanceMetrics(commodities, annualization=12)
metrics[['Mean', 'Vol', 'Sharpe']]

Unnamed: 0,Mean,Vol,Sharpe
NG1,0.144,0.5381,0.2676
KC1,0.045,0.3209,0.1401
CC1,0.0848,0.3246,0.2612
LB1,0.1309,0.4008,0.3267
CT1,0.0572,0.3071,0.1864
SB1,0.0913,0.3467,0.2635
LC1,0.0296,0.1876,0.1576
W1,0.0761,0.3116,0.2441
S1,0.0713,0.2651,0.2688
C1,0.0861,0.2986,0.2882


2. (5pts) For each of the 14 assets, report the following statistics (no annualization needed).
• VaR (0.01). That is to say, the 1st quantile of returns.
• CVaR (0.01). That is to say, the average of the returns less than the 1st quantile.
• maximum drawdown1 Though we usually calculate maximum drawdown on total returns,
keep things simple and just continue to use the excess returns we’re already using in all
the other problems.

In [118]:
tailMetrics(commodities, quantile=0.01)[['VaR (0.01)', 'CVaR (0.01)', 'Max Drawdown']]

Unnamed: 0,VaR (0.01),CVaR (0.01),Max Drawdown
NG1,-0.359,-0.3875,-0.9015
KC1,-0.1781,-0.2153,-0.7044
CC1,-0.1981,-0.2314,-0.5115
LB1,-0.2393,-0.2952,-0.7117
CT1,-0.2069,-0.2632,-0.764
SB1,-0.2015,-0.2632,-0.7109
LC1,-0.1272,-0.1662,-0.4947
W1,-0.1922,-0.2208,-0.6695
S1,-0.2141,-0.2539,-0.5484
C1,-0.1961,-0.2287,-0.6339


3. (5pts) Conceptual question: Suppose you could only invest in one of these assets, not an
entire portfolio. Which statistics would you recommend that an investor consider in choosing
the single asset.

ANSWER: The statistic that I would recommend the outside investor consider is the Sharpe Ratio (that being greater is better). This is because the greater the Sharpe Ratio, the greater the expected mean (maintaining risk constant).

4. (5pts) Conceptual question: Suppose you already have a large portfolio and are considering
which of these assets to hold as an additional investment, along with your other assets. Would
your answer be the same? Conceptually, which other statistics might interest you?

ANSWER: My answer would no longer the same. Although Sharpe Ratio may play a factor it is NOT the most important. The most important factors would then be the correlations/covariances between the investments already within the portfolio and mean returns.

5. (10pts) Run a Linear Factor Decomposition of sugar (SB1) on coffee, (C1). That is, use the two
return series rSB1 and rC1. Report the following (annualized) statistics:
• market alpha
• market beta
• market Information ratio

In [119]:
one_five_ols = get_ols_metrics(commodities['SB1'], commodities['C1'], annualization=12)
one_five_ols.rename(columns={"SB1": "beta"})


Unnamed: 0,alpha,beta,r-squared,Treynor Ratio,Info Ratio
C1,0.0774,0.0944,0.012,0.9111,0.2609


6. (5pts) Based on the statistics above, what is the hedge ratio between sugar (SB1) and coffee
(C1)?

ANSWER: The hedge ratio would equal beta, thus 9.44%

## 2 Mean-Variance Optimization (35pts)

1. (10pts) Calculate the weights of the tangency portfolio formed from the 14 assets.

In [29]:
tangency_portfolio_weights = tangency_portfolio(commodities)
pd.DataFrame(tangency_portfolio_weights, index = commodities.columns, columns = ['Weights for the Tangency Portfolio']).sort_values('Weights for the Tangency Portfolio', ascending = False).style.format('{:,.2%}')

Unnamed: 0,Weights for the Tangency Portfolio
GC1,60.31%
LC1,12.89%
LB1,8.66%
C1,8.48%
HG1,7.46%
CC1,7.45%
PA1,7.33%
SB1,6.36%
NG1,5.74%
S1,2.73%


2. (5pts) What are the weights of the optimal portfolio, w∗, with a targeted mean excess return of
0.0075 per month?

In [32]:
# Allocation to tangency portfolio
portfolio_weights = tangency_portfolio_weights * tangency_portfolio_allocation(commodities, target_return=0.0075)
weights = pd.DataFrame(portfolio_weights, index = commodities.columns, columns = ['Weights for the Target Portfolio']).sort_values('Weights for the Target Portfolio', ascending = False).style.format('{:,.2%}')
weights

Unnamed: 0,Weights for the Target Portfolio
GC1,62.02%
LC1,13.26%
LB1,8.91%
C1,8.72%
HG1,7.67%
CC1,7.66%
PA1,7.54%
SB1,6.54%
NG1,5.91%
S1,2.81%


3. (5pts) Does the tangency portfolio weight securities in order of their individual Sharpe ratios?
Why or why not?

No it does not, because the Sharpe Ratio loses much of its significance when it comes to building a portfolio. The most significant factor/influence on weight distribution would be covariance with other assets. This makes sense as we want stocks that perform well with others (via the covariances and mean returns) not just themselves.

4. (5pts) Report the mean, volatility, and Sharpe ratio for the optimized portfolio, w∗, (calculated
in the previous question.) Annualize the statistics.

In [120]:
performanceMetrics(pd.DataFrame(commodities@portfolio_weights, columns=['Optimal Portfolio']), annualization=12)[['Mean', 'Vol', 'Sharpe']]\
.style.format('{:,.2%}')

Unnamed: 0,Mean,Vol,Sharpe
Optimal Portfolio,9.00%,11.96%,75.24%


5. (5pts) Conceptual question: How does Harvard make their portfolio allocation more realistic
than a basic mean-variance optimization would imply?

ANSWER: Within the Harvard TIPS case study, it is explained how parameters are put upon the weights maintain realistic allocation.

6. (5pts) Conceptual question: State one reason that Mean-Variance optimization is not robust,
(i.e. the solution is fragile with respect to the inputs.)

ANSWER: When faced with numerous assets, covariance matrix estimation becomes inaccurate. Fragility increases when inverting the covariance matrix (sigma inversed), especially when high correlations are present, exacerbating the condition number and instability.

## 3 Pricing (30pts)

1. (10pts) Estimate the time-series test of the pricing model.
Report, for each asset,
• annualized alpha
• beta
• r-squared.

In [123]:
ols_metrics = get_ols_metrics(factors, commodities, annualization=12)
ols_metrics

Unnamed: 0,alpha,MKT,UMD,r-squared,Info Ratio
NG1,0.112,0.3541,0.3812,0.0173,0.2099
KC1,0.0232,0.3151,-0.0275,0.0259,0.0733
CC1,0.0708,0.2073,-0.0358,0.012,0.2194
LB1,0.0645,0.9421,-0.0048,0.1368,0.1732
CT1,0.0249,0.5042,-0.1786,0.099,0.0855
SB1,0.0931,0.058,-0.3192,0.0327,0.2731
LC1,0.0154,0.1831,0.0661,0.02,0.083
W1,0.0545,0.2989,0.0224,0.0213,0.1769
S1,0.0425,0.3995,0.0273,0.0529,0.1649
C1,0.0609,0.3404,0.062,0.0282,0.2068


2. (5pts) If the pricing model worked perfectly, what would these statistics be?

ANSWER: The alphas would all be zero, but does not say anything about beta and r-squared

3. (5pts) Which asset does the pricing model fit best?

In [124]:
ols_metrics['r-squared'].idxmax()

'HG1'

ANSWER: The asset that fits the pricing model best HG1 with an r-squared of 21.5%

4. (5pts) Which factor has a higher risk premium in the estimated model above?

In [125]:
factors_metric = factors.describe().loc[['mean','std']].transpose()
factors_metric['Annual_Mean'] = factors_metric['mean'] * 12
factors_metric['Annual_Mean']

MKT   0.0706
UMD   0.0184
Name: Annual_Mean, dtype: float64

ANSWER: MKT has a higher risk premium as the MKT's annual excess mean returns are 7.06% whereas UMD's annual excess mean returns are 1.84%

5. (5pts) Conceptual question: Instead of the 2-factor model above, suppose the CAPM is true
and fits perfectly in our sample.
For n assets, what do we know about their...
• time-series r-squared metrics?
• Treynor Ratios?
• Information Ratios?

ANSWER: 
We would know nothing about their time-series r-squared metrics, 
the Treynor Ratios would be equal to the expected market premium,
and Information Ratios would be zero (as alphas would be).

## 4 Forecasting (20pts)

1. (7pts) Consider the lagged regression, where the regressor, (X,) is a period behind the target,
(rGLD ).
rGLD
t = αGLD,X + (βGLD,X )′ Xt−1 + εGLD,X
t (2)
Estimate (2) and report the R2, as well as the OLS estimates for α and β. (No need to annualize
these stats.)
Do this using both interest rate signals together. So we are estimating just one model:
• X as the two regressors: Tbill rate and Tbill change.

In [127]:
signals = forecasting[['Tbill rate', 'Tbill change']]
signals_lag = signals.shift(1).dropna()
gld = forecasting[['GLD']]
gld = gld.loc[signals_lag.index]

# Expanding.
forecast_df = gld.expanding().mean().shift(1).dropna()
forecast_df.columns = ['Mean']

# Tbill rate/Tbill change multi regression.
multi_regr = sm.OLS(gld, sm.add_constant(signals_lag)).fit()
forecast_df['Multi'] = multi_regr.params[0] + multi_regr.params[1] * signals['Tbill rate'] + multi_regr.params[2] * signals['Tbill change']

display(multi_regr.params)
print(f'R^2: {multi_regr.rsquared:,.2%}')

const          0.0010
Tbill rate     0.0003
Tbill change   0.0005
dtype: float64

R^2: 0.01%


2. (5pts) Use the forecasted GLD returns, ˆrGLD
t+1 , to build trading weights:
wt = 0.2 + 80 ˆrGLD
t+1
Calculate the return on this strategy:
rx
t+1 = wtrGLD
t+1
Report the first and last 5 values.

In [129]:
forecast_df.dropna(inplace=True)
forecast_df['Actuals'] = gld['GLD']
forecast_wt = 0.2 + forecast_df * 80
forecast_model = forecast_wt[['Multi']]
forecast_model = forecast_model.multiply(forecast_df['Actuals'], axis=0)
forecast_model['Actuals'] = forecast_df['Actuals']

print(f"Non-annualized Expected Returns: {round(forecast_model['Multi'].mean(), 6)}")

# forecast_wt
print("First five elements:")
forecast_subsample = forecast_model[['Multi']].head(5)
display(forecast_subsample)
print("Last five elements:")
forecast_subsample = forecast_model[['Multi']].tail(5)
display(forecast_subsample)


# forecast_df

Non-annualized Expected Returns: 0.000306
First five elements:


Unnamed: 0_level_0,Multi
Date,Unnamed: 1_level_1
2009-05-03,-0.0088
2009-05-10,0.0099
2009-05-17,0.0049
2009-05-24,0.0081
2009-05-31,0.0061


Last five elements:


Unnamed: 0_level_0,Multi
Date,Unnamed: 1_level_1
2022-11-06,0.0078
2022-11-13,0.0187
2022-11-20,-0.0039
2022-11-27,0.001
2022-12-04,-0.0011


3. (3pts) For both rx and rGLD, report
• mean
• volatility
Which strategy seems better?

In [130]:
forecast_metrics = forecast_model.describe().loc[['mean','std']].transpose()
forecast_metrics['Annualized_Mean'], forecast_metrics['Annualized_Std'] = forecast_metrics['mean'] * 12, forecast_metrics['std'] * (12**0.5)
forecast_metrics['Annualized_Sharpe_Ratio'] = forecast_metrics['Annualized_Mean']/forecast_metrics['Annualized_Std']
forecast_metrics
# ANNUALIZED

Unnamed: 0,mean,std,Annualized_Mean,Annualized_Std,Annualized_Sharpe_Ratio
Multi,0.0003,0.006,0.0037,0.0209,0.1757
Actuals,0.0011,0.0211,0.0127,0.0729,0.1746


The forecast using tbill rate and tbill changes is seemingly the better strategy as it has a greater annualized sharpe ratio of 0.1757 (> 0.1746).

4. (5pts) Suppose we were going to forecast GLD using just one of our two signals. Which of the
signals would likely lead to a result where the long-term forecast compounds the effect over long
horizons, as we saw for forecasting SPY using dividend-price ratios? Explain.

In [131]:
# Univariate regression for both bills and changes
tr_regr = sm.OLS(gld, sm.add_constant(signals_lag['Tbill rate'])).fit()
forecast_df['Tbill rate'] = tr_regr.params[0] + tr_regr.params[1] * signals['Tbill rate']
tc_regr = sm.OLS(gld, sm.add_constant(signals_lag['Tbill change'])).fit()
forecast_df['Tbill change'] = tc_regr.params[0] + tc_regr.params[1] * signals['Tbill change']

print(f'TBill Rate Regression: Alpha: {tr_regr.params[0]:.4f}, Beta: {tr_regr.params[1]:.4f}, R^2: {tr_regr.rsquared:.4f}')
print(f'TBill Change Regression: Alpha: {tc_regr.params[0]:.4f}, Beta: {tc_regr.params[1]:.4f}, R^2: {tc_regr.rsquared:.4f}')
signal_metrics = forecasting.describe().loc[['mean','std']].transpose()
signal_metrics

TBill Rate Regression: Alpha: 0.0010, Beta: 0.0003, R^2: 0.0001
TBill Change Regression: Alpha: 0.0011, Beta: 0.0013, R^2: 0.0000


Unnamed: 0,mean,std
GLD,0.0011,0.0211
Tbill rate,0.5593,0.8536
Tbill change,0.0057,0.0552


The signal Tbill  change would likely lead to a result where the long-term forecast compounds over greater time periods. This can be attributed to the standard deviation or general rate of change of GLD, Tbill rate, and Tbill change. As seen above, Tbill rate is far too volatile to accurately predict GLD; Tbill change is closer and better suited.