# 02 Backtesting Environment
---
This notebook executes the first notebook and then provides methods for testing a set on input weights on a selected timeframe

---

In [1]:
%run 01_data_loading.ipynb
import importlib
import config
from datetime import datetime, timedelta
import scipy.optimize as opt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from sklearn.linear_model import LinearRegression
from sklearn.multioutput import MultiOutputRegressor
importlib.reload(config)

<module 'config' from '/Users/valentinennser/Desktop/taa_bac/config.py'>

In [2]:
def backtest_portfolio(weights, start_date, end_date):

    """
    This function calculates the cumulative returns of
    the portfolio according to the weights provided.

    Order of weights:
    SPY, FI, REIT, GOLD

    example:
    backtest_portfolio([0.2,0.2,0.3,0.3], '2010-01-01', '2019-12-31')
    """

    #check weights dimensions
    if len(weights) != 4:
        raise ValueError("Weights must be a list of 4 elements")
    elif sum(weights) != 1:
        raise ValueError("Weights must sum to 1")
    
    #create a new dataframe with all prices
    SPY_prices = get_data(config.SPY,start_date,end_date)
    FI_prices = get_data(config.FIXED_INCOME,start_date,end_date)
    REIT_prices = get_data(config.REIT,start_date,end_date)
    GOLD_prices = get_data(config.GOLD,start_date,end_date)

    data_all = pd.concat([SPY_prices, FI_prices, REIT_prices, GOLD_prices], axis=1)
    
    #create a new dataframe with all returns
    data_all_returns = data_all.ffill().pct_change().dropna()
    
    #weigh them according to the weights
    for i in range(len(weights)):
        data_all_returns.iloc[:,i] = data_all_returns.iloc[:,i]*weights[i]
    
    #calculate the portfolio returns
    data_all_returns['portfolio_returns'] = data_all_returns.sum(axis=1)

    #calculate the portfolio cumulative returns
    data_all_returns['cumulative_returns'] = (1 + data_all_returns['portfolio_returns']).cumprod()

    return data_all_returns['cumulative_returns'].iloc[-1] - 1


In [3]:
def portfolio_vol(weights, covmat):
    """
    helper function to calculate portfolio volatility
    """
    return (weights.T @ covmat @ weights)**0.5

def portfolio_return(weights,returns):
    """
    helper function to calculate portfolio return
    """
    return weights.T @ returns

def annualize_retss(r, periods_per_year):
    """
    helper function to annualize returns
    """
    compounded_growth = (1 + r).prod()
    n_periods = r.shape[0]
    return compounded_growth**(periods_per_year / n_periods) - 1

def max_profit(df_prices, window_size=30, min_weight=0.1):
    weights_to_return = pd.DataFrame()

    for i in range(len(df_prices) - window_size + 1):
        df_prices_window = df_prices.iloc[i:i+window_size]
        df_returns = df_prices_window.pct_change(fill_method=None).dropna()

        expected_returns = annualize_retss(df_returns, 252/window_size)

        n = expected_returns.shape[0]

        # Find the index of the asset with the highest expected return
        max_return_index = np.argmax(expected_returns)

        # Set weights
        weights = np.full(n, min_weight)
        weights[max_return_index] = 1.0 - (n-1) * min_weight

        new_weights = pd.DataFrame(weights).T
        weights_to_return = pd.concat([weights_to_return, new_weights], axis=0)

    weights_to_return.index = df_prices.iloc[window_size-1:].index
    weights_to_return.columns = df_prices.columns
    return weights_to_return

def gmv(df_prices, rf_rate, window_size = 30):
    """
    Returns the weights of the Global Minimum Volatility portfolio given a covariance matrix
    """
    # I love this function, what we are doing here is we are feeding the MSR optimizer the idea that all the returns are the same and the way it reacts
    # is it starts to increase the SR by lowering volatility and does so until it reaches the point of lowest possible volatility

    for i in range(len(df_prices) - window_size + 1):
        df_prices_window = df_prices.iloc[i:i+window_size]
        df_returns = df_prices_window.pct_change(fill_method=None).dropna()

        cov_matrix = df_returns.cov() * 252/window_size
        n = cov_matrix.shape[0]

        weights = msr(rf_rate, np.repeat(1, n), cov_matrix)
       
        new_weights = pd.DataFrame(weights.x).T
        weights_to_return = pd.concat([weights_to_return, new_weights], axis=0)

    weights_to_return.index = df_prices.iloc[window_size-1:].index
    weights_to_return.columns = df_prices.columns
        
    return weights_to_return


def msr(df_prices,rf_rate=0.01, window_size = 30, min_weight = 0.1):
    """
    Returns the weights of the portfolio that gives you the maximum sharpe ratio
    given the riskfree rate and expected returns and a covariance matrix

    example: msr(expected_returns, covariance_matrix, rf_rate)
    """
    weights_to_return = pd.DataFrame()
    
    for i in range(len(df_prices) - window_size + 1):
        df_prices_window = df_prices.iloc[i:i+window_size]
        df_returns = df_prices_window.pct_change(fill_method=None).dropna()


        expected_returns  = annualize_rets(df_returns, 252/window_size)
        cov_matrix = df_returns.cov() * 252/window_size

        n = expected_returns.shape[0] 
        init_guess = np.repeat(1/n, n) 
        bounds = ((min_weight, 1.0),) * n 

        weights_sum_to_1 = {'type': 'eq',
                        'fun': lambda weights: np.sum(weights) - 1}
        #maximize function does not exist, thus what we will be doing is minimizing the negative sharpe ratio
        def neg_sharpe(weights, riskfree_rate, er, cov):
            """
            Returns the negative of the sharpe ratio
            of the given portfolio
            """
            r = portfolio_return(weights, er)
            vol = portfolio_vol(weights, cov)
            return -(r - riskfree_rate)/vol

        weights = opt.minimize(neg_sharpe, init_guess,
                        args=(rf_rate, expected_returns, cov_matrix), method='SLSQP',
                        options={'disp': False},
                        constraints=(weights_sum_to_1,),
                        bounds=bounds)
        
        new_weights = pd.DataFrame(weights.x).T
        weights_to_return = pd.concat([weights_to_return, new_weights], axis=0)

    weights_to_return.index = df_prices.iloc[window_size-1:].index
    
    weights_to_return.columns = df_prices.columns
        
    return weights_to_return



def annualize_rets(r, periods_per_year):
    """
    Annualizes a set of returns given the periods per year
    """
    compounded_growth = (1+r).prod() 
    n_periods = r.shape[0] 
    return compounded_growth**(periods_per_year/n_periods)-1


In [4]:
def calculate_trailing_returns(df, window_size):
    """
    This function calculates the trailing returns of a given dataframe
    """
    returns = df.pct_change()
    trailing_returns = returns.rolling(window=window_size).sum()  # Calculate trailing returns
    return trailing_returns

In [5]:
def calculate_trailing_risk_parity_weights(prices, window_size=30):
    """
    This function calculates the risk parity weights using a rolling window
    """
    # Calculate daily returns
    returns = prices.pct_change().dropna()

    #remove column "cash" by colname
    returns = returns.drop(columns=['cash'])

    # Calculate Risk Parity Weights using a rolling window
    risk_parity_weights = returns.rolling(window=window_size).apply(lambda x: 1 / (x.std()), raw=True)
    risk_parity_weights = risk_parity_weights.div(risk_parity_weights.sum(axis=1), axis=0)

    risk_parity_weights["cash"] = 0
    
    return risk_parity_weights


In [None]:
def get_training_and_test_data(start_date, length, portfolio = config.MAX_SHARPE, window_size = 30, min_weight = 0.1,rf_rate = 0.02, freq = "M"):

    """
    Creates a training and test set from the available data, given a start date 
    and a length. The output sets contain the set of 11 macroeconomic input variables
    for each day as well as the optimal 30 day forward calculated risk parity portfolio
    weights. 

    example: get_training_and_test_data("2012-01-01",365)
    """
    print("Getting data...")
    #calculating the end_date
    temp_train_start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
    #-30 days cause of 30 day lag (NAs) when calculating 30day rolling returns later
    train_start_date_obj =  temp_train_start_date_obj - timedelta(days=30)
    train_start_date_str = train_start_date_obj.strftime("%Y-%m-%d")
    train_end_date_obj =  train_start_date_obj + timedelta(days=length+30)
    train_end_date_str = train_end_date_obj.strftime("%Y-%m-%d")

    test_start_date_obj = train_end_date_obj + timedelta(days=2)
    test_start_date_str = train_end_date_obj.strftime("%Y-%m-%d")
    test_end_date_obj = test_start_date_obj + timedelta(days = length-1)
    test_end_date_str = test_end_date_obj.strftime("%Y-%m-%d")

    #pulling macro indicators in the select timeframe for train & test
    macro_indicators_train = get_indicators(train_start_date_str, train_end_date_str).dropna()
    macro_indicators_test = get_indicators(test_start_date_str, test_end_date_str).dropna()

    #pulling index prices in the select timeframe
    df_prices_train = get_indices(train_start_date_str, train_end_date_str)
    df_prices_test = get_indices(test_start_date_str, test_end_date_str)

    #getting optimal portfolio weights
    if(portfolio == config.RISK_PARITY):
        train_weights = calculate_trailing_risk_parity_weights(df_prices_train)
        test_weights = calculate_trailing_risk_parity_weights(df_prices_test)

    elif(portfolio == config.MAX_SHARPE):
        train_weights = msr(df_prices_train,rf_rate,int(window_size),min_weight)
        test_weights = msr(df_prices_test,rf_rate,int(window_size),min_weight)

    elif(portfolio == config.EQUAL):
        train_data = np.full((len(df_prices_train), len(df_prices_train.columns)), 0.2)
        test_data = np.full((len(df_prices_test), len(df_prices_test.columns)), 0.2)
        train_weights = pd.DataFrame(train_data,columns = df_prices_train.columns,index = df_prices_train.index)
        test_weights = pd.DataFrame(test_data,columns = df_prices_test.columns,index = df_prices_test.index)

    elif(portfolio == config.MAX_PROFIT):
        train_weights = max_profit(df_prices_train,window_size,min_weight)
        test_weights = max_profit(df_prices_test,window_size,min_weight)

    elif(portfolio == config.GMV):
        train_weights = gmv(df_prices_train,rf_rate,window_size)
        test_weights = gmv(df_prices_test,rf_rate,window_size)

    else:
        raise ValueError("Portfolio must be either risk parity, max sharpe or equal")

    macro_indicators_train = pd.concat([macro_indicators_train,train_weights], axis = 1).dropna()
    macro_indicators_test = pd.concat([macro_indicators_test,test_weights], axis = 1).dropna()

    macro_indicators_train = macro_indicators_train.resample(freq).last("D")
    macro_indicators_test = macro_indicators_test.resample(freq).last("D")
    
    return macro_indicators_train,macro_indicators_test


def linear_regression(train_set, test_set):
    print("Running linear regression...")
    macroeconomic_indicators = [
    "CONSSENT_Consumer_Confidence", "PIDSDPS_SAVINGS_DISP_INC", "USYC2Y10_10y_2y",
    "CPI_YOY_US_CPI", "CSI_BARC_HY_SPREAD", "GDP_CURY_US_GDP_GROWTH",
    "S&P500_PE_PE_Ratio_S&P", "LB1_Lumber", "CL1_OIL_WTI", "GLDHG_COPPER_GOLD_RATIO",
    "USURTOT_US_UNEMPLOYMENT"
    ]
    weight_predictions = ["LUATTRUU_Fixed_Income", "RMZ_REITs", "SPX_Equities", "XAU_Gold", "cash"]

    # Drop rows with NaNs or Infs
    train_set_clean = train_set.replace([np.inf, -np.inf], np.nan).dropna(subset=macroeconomic_indicators + weight_predictions)
    test_set_clean = test_set.replace([np.inf, -np.inf], np.nan).dropna(subset=macroeconomic_indicators)

    macroeconomic_indicators_to_train = train_set_clean[macroeconomic_indicators]
    outcome_weights_to_train = train_set_clean[weight_predictions]

    # Instantiate and fit the model
    model = MultiOutputRegressor(LinearRegression())
    model.fit(macroeconomic_indicators_to_train, outcome_weights_to_train)

    # Predict (on training data or separate test data)
    weights_train_set_raw = model.predict(macroeconomic_indicators_to_train)  
    weights_test_set_raw = model.predict(test_set_clean[macroeconomic_indicators])

    weights_train_set = pd.DataFrame(weights_train_set_raw, columns=weight_predictions, index=train_set_clean.index)
    weights_test_set = pd.DataFrame(weights_test_set_raw, columns=weight_predictions, index=test_set_clean.index)

    return weights_train_set, weights_test_set

In [None]:
def draw_weights(weights):
    """
    Plots the weights of the portfolio over time

    draw_weights(weights)
    """
    #plot weights and show legend
    plt.plot(weights)
    #plot legend outside
    plt.legend(weights.columns,loc='center left', bbox_to_anchor=(1, 0.5))
    

def draw_returns(weights, portfolio):
    """
    Plots the returns of the portfolio over time

    draw_returns(returns)
    """

    #extract date range from weights and get asset prices for that timeframe, -1 row for dropping na
    date_range = pd.date_range(start=weights.index.min(), end=weights.index.max(), freq='D')
    start_date = date_range[0] - timedelta(days=1)
    df_prices = get_indices(start_date.strftime("%Y-%m-%d"), date_range[-1].strftime("%Y-%m-%d")).pct_change().dropna()

    #calculate and plot returns
    portfolio_return = (weights * df_prices).sum(axis=1)
    portfolio_return.cumsum().plot(label = "NN Weighted Portfolio")

    if portfolio == config.SPY:
        #calculate and plot returns
        sp500_return = df_prices[config.SPY]
        sp500_return.cumsum().plot(label = "S&P500")
        plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    elif portfolio == config.EQUAL:
        #calculate and plot returns
        equal_return = df_prices.mean(axis=1)
        equal_return.cumsum().plot(label = "Equally Weighted Portfolio")
        plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))

def draw_strategies(weights_1, portfolio_name_1, weights_2, portfolio_name_2, title, comparison = False):
    """
    Plots the performance of two portfolios over time

    draw_strategies(weights_1, weights_2)
    """
    date_range = pd.date_range(start=weights_1.index.min(), end=weights_1.index.max(), freq='D')
    start_date = date_range[0] - timedelta(days=1)
    df_prices = get_indices(start_date.strftime("%Y-%m-%d"), date_range[-1].strftime("%Y-%m-%d")).pct_change().dropna()

    #calculate and plot returns
    portfolio_1_return = (weights_1 * df_prices).sum(axis=1)
    portfolio_1_return.cumsum().plot(label = portfolio_name_1)

    portfolio_2_return = (weights_2 * df_prices).sum(axis=1)
    portfolio_2_return.cumsum().plot(label = portfolio_name_2)

    portfolio_1_cum = portfolio_1_return.cumsum()
    portfolio_2_cum = portfolio_2_return.cumsum()

    plt.text(portfolio_1_cum.index[-1], portfolio_1_cum.iloc[-1], 
             f'{100*portfolio_1_cum.iloc[-1]:.2f}:{"%"}', 
             va='center', ha='left', fontsize=9)

    plt.text(portfolio_2_cum.index[-1], portfolio_2_cum.iloc[-1], 
             f'{100*portfolio_2_cum.iloc[-1]:.2f}:{"%"}', 
             va='center', ha='left', fontsize=9)
    
    # Plot S&P 500 for comparison
    if comparison:
        equal_return = df_prices.mean(axis=1)
        equal_return.cumsum().plot(label = "Equal Weights Portfolio", color='grey')
        sp_return_cum = equal_return.cumsum()
        plt.text(equal_return.index[-1], sp_return_cum.iloc[-1], 
             f'{100*sp_return_cum.iloc[-1]:.2f}:{"%"}', 
             va='center', ha='left', fontsize=9)


    plt.legend(loc='best', bbox_to_anchor=(1, 0.5))
    plt.title(title)



def calc_returns(weights, returns):
    """
    Calculates the returns of the portfolio given the weights and the returns

    calc_returns(weights, returns)
    """

    portfolio_df = weights.T @ returns
    portfolio_df['portfolio_returns'] = portfolio_df.sum(axis=1)
    return portfolio_df