## Field Lab: Quantitative Investment Startegy 
#### Tiago Senra Casanova Vasco (39569): Analyzing the Dynamic Relationship Between Consumer Spending Patterns and Stock Market Sector Performance

## Analyzing the Dynamic Relationship Between Consumer Spending Patterns and Stock Market Sector Performance

In [None]:
import wrds
import pandas as pd
import plotly.express as px
import statsmodels.api as sm
import numpy as np
import plotly.graph_objects as go
import os
import requests
from zipfile import ZipFile
import io
import matplotlib.pyplot as plt
import matplotlib.cm as cm

In [None]:
# Download the files using URLs

urls = ['https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/12_Industry_Portfolios_daily_CSV.zip',
        'https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Research_Data_Factors_daily_CSV.zip',
        'https://www.census.gov/econ_getzippedfile/?programCode=MRTS',
        'https://www.census.gov/retail/marts/www/MARTSreleasedates.xls'
]

local_filenames = ['12_Industry_Portfolios_Daily.zip',
                   'F-F_Research_Data_Factors_daily_CSV.zip',
                   'MRTS-mf.zip',
                   'MARTSreleasedates.xls']

current_directory = os.getcwd()

try:
    for url, local_filename in zip(urls, local_filenames):
        response = requests.get(url)
        if response.status_code == 200:
            local_file_path = os.path.join(current_directory, local_filename)
            with open(local_file_path, 'wb') as file:
                file.write(response.content)
            print(f"File downloaded as {local_file_path}")
        else:
            print(f"Failed to download file from {url}. Status code: {response.status_code}")

except Exception as e:
    print(f"An error occurred: {str(e)}")


In [None]:
# Getting Sector Daily Returns

zip_file_path = '12_Industry_Portfolios_Daily.zip'


with ZipFile(zip_file_path, 'r') as archive:
    with archive.open('12_Industry_Portfolios_Daily.csv') as csv_file:
        df_Industry_Portfolios = pd.read_csv(io.TextIOWrapper(csv_file),skiprows=9)
        

index_to_cut = df_Industry_Portfolios[df_Industry_Portfolios['Unnamed: 0'] == '  Average Equal Weighted Returns -- Daily'].index[0]
df_Industry_Portfolios = df_Industry_Portfolios.iloc[:index_to_cut]
df_Industry_Portfolios.rename(columns={df_Industry_Portfolios.columns[0]: 'Date'}, inplace=True)
df_Industry_Portfolios['Date'] = pd.to_datetime(df_Industry_Portfolios['Date'], format='%Y%m%d').dt.strftime('%Y-%m-%d')
df_Industry_Portfolios.columns = df_Industry_Portfolios.columns.str.replace(' ', '')
df_Industry_Portfolios = df_Industry_Portfolios[df_Industry_Portfolios['Date'] <= '2023-10-31'].copy()

In [None]:
# Getting the 452-General Merchandise Stores Signal from MRTS

zip_file_path = 'MRTS-mf.zip'

with ZipFile(zip_file_path, 'r') as archive:
    with archive.open('MRTS-mf.csv') as csv_file:
        df_ConsumerData = pd.read_csv(io.TextIOWrapper(csv_file),skiprows=496)

with ZipFile(zip_file_path, 'r') as archive:
    with archive.open('MRTS-mf.csv') as csv_file:
        df_Categories = pd.read_csv(io.TextIOWrapper(csv_file),skiprows = 1, nrows=65)
        
with ZipFile(zip_file_path, 'r') as archive:
    with archive.open('MRTS-mf.csv') as csv_file:
        df_Time_Periods = pd.read_csv(io.TextIOWrapper(csv_file),skiprows=93, nrows=383)

df_ConsumerData = df_ConsumerData[df_ConsumerData['dt_idx']==5]
df_ConsumerData = df_ConsumerData[df_ConsumerData['et_idx']==0]
df_ConsumerData = df_ConsumerData[df_ConsumerData['is_adj']==1]

mapping_dict_Categories = df_Categories.set_index('cat_idx')['cat_desc'].to_dict()
mapping_dict_Time_Periods = df_Time_Periods.set_index('per_idx')['per_name'].to_dict()

df_ConsumerData['cat_desc'] = df_ConsumerData['cat_idx'].map(mapping_dict_Categories)
df_ConsumerData['per_name'] = df_ConsumerData['per_idx'].map(mapping_dict_Time_Periods)

df_ConsumerData = df_ConsumerData[['per_name','cat_desc','val']]
df_ConsumerData = df_ConsumerData.copy()

df_ConsumerData[['Month', 'Year']] = df_ConsumerData['per_name'].str.split('-', n=1, expand=True)

df_ConsumerData.drop(columns=['per_name'], inplace=True)

month_mapping = {
    'Jan': 'Mar',
    'Feb': 'Apr',
    'Mar': 'May',
    'Apr': 'Jun',
    'May': 'Jul',
    'Jun': 'Aug',
    'Jul': 'Sep',
    'Aug': 'Oct',
    'Sep': 'Nov',
    'Oct': 'Dec',
    'Nov': 'Jan',  
    'Dec': 'Feb'   
}

df_ConsumerData['Month'] = df_ConsumerData['Month'].map(month_mapping)

df_ConsumerData['Year'] = df_ConsumerData['Year'].astype(int)
mask = (df_ConsumerData['Month'].isin(['Jan', 'Feb']))
df_ConsumerData['Year'] = df_ConsumerData['Year'].where(~mask, df_ConsumerData['Year'] + 1)


In [None]:
# Getting the MRTS release dates

excel_file_path = 'MARTSreleasedates.xls'

df_matrix = pd.read_excel(excel_file_path, header=None, skiprows=4)

df_matrix.rename(columns={df_matrix.columns[0]: 'Year'}, inplace=True)


month_names = [
    'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
    'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'
]


df_matrix.columns = ['Year'] + month_names

df_matrix = df_matrix.iloc[:-11]

df_matrix.replace('14*', '14', inplace=True)
df_matrix.replace('11*', '11', inplace=True)
df_matrix.replace('1*,18*', '1', inplace=True)
df_matrix = df_matrix.fillna(0)
df_matrix = df_matrix.astype(float)
df_matrix = df_matrix[df_matrix['Year'] >= 1992]

df_matrix_tidy = df_matrix.melt(id_vars=['Year'], var_name='Month', value_name='Days')

df_matrix_tidy['Year'] = df_matrix_tidy['Year'].astype(int)
df_matrix_tidy['Month'] = df_matrix_tidy['Month'].astype(str)
df_matrix_tidy['Days'] = df_matrix_tidy['Days'].astype(int)

df_ConsumerData['Year'] = df_ConsumerData['Year'].astype(int)
df_ConsumerData['Month'] = df_ConsumerData['Month'].astype(str)

df_ConsumerData['val'] = pd.to_numeric(df_ConsumerData['val'], errors='coerce')

merged_df = df_ConsumerData.merge(df_matrix_tidy, on=['Year', 'Month'], how='left')

mask_0 = (merged_df['Month'] == 'Dec') & (merged_df['Year'] == 2018) & (merged_df['Days'] == 14)
mask_0 = (merged_df['Month'] == 'Dec') & (merged_df['Year'] == 2018) & (merged_df['Days'] == 14)

mask_1 = (merged_df['Month'] == 'Jan') & (merged_df['Year'] == 2019) & (merged_df['Days'] == 0)
mask_1 = (merged_df['Month'] == 'Jan') & (merged_df['Year'] == 2019) & (merged_df['Days'] == 0)

mask_2 = (merged_df['Month'] == 'Feb') & (merged_df['Year'] == 2019) & (merged_df['Days'] == 14)
mask_2 = (merged_df['Month'] == 'Feb') & (merged_df['Year'] == 2019) & (merged_df['Days'] == 14)

mask_3 = (merged_df['Month'] == 'Mar') & (merged_df['Year'] == 2019) & (merged_df['Days'] == 11)
mask_3 = (merged_df['Month'] == 'Mar') & (merged_df['Year'] == 2019) & (merged_df['Days'] == 11)

merged_df.loc[mask_0, 'Month'] = 'Feb'
merged_df.loc[mask_0, 'Year'] = 2019
merged_df.loc[mask_0, 'Days'] = 14

merged_df.loc[mask_1, 'Month'] = 'Mar'
merged_df.loc[mask_1, 'Days'] = 11

merged_df.loc[mask_2, 'Month'] = 'Apr'
merged_df.loc[mask_2, 'Days'] = 1

merged_df.loc[mask_3, 'Month'] = 'Apr'
merged_df.loc[mask_3, 'Days'] = 18

merged_df['Date'] = pd.to_datetime(merged_df['Year'].astype(str) + '-' + merged_df['Month'] + '-' + merged_df['Days'].astype(str), format='%Y-%b-%d')
merged_df = merged_df[['Date','cat_desc','val']]

merged_df['signal'] = 0

# Generating the signal using 452 direction
prev_signals = {}

for index, row in merged_df.iterrows():
    cat_desc = row['cat_desc']
    val = row['val']

    if val > 0:
        merged_df.at[index, 'signal'] = 1
        prev_signals[cat_desc] = 1
    elif val < 0:
        merged_df.at[index, 'signal'] = -1
        prev_signals[cat_desc] = -1
    elif val == 0 and cat_desc in prev_signals:
        merged_df.at[index, 'signal'] = prev_signals[cat_desc]

merged_df.sort_values(by = 'Date')
merged_df.reset_index(drop=True, inplace=True)

categories_to_keep = ['452: General Merchandise Stores']
merged_df = merged_df[merged_df['cat_desc'].str.startswith(tuple(categories_to_keep))]

In [None]:
# Creating the returns based on signals
signals_df = merged_df.copy()

signals_df = signals_df.groupby(['Date', 'cat_desc']).agg({'signal': 'mean'}).reset_index()

signals_df = signals_df.pivot(index='Date', columns='cat_desc', values='signal')

signals_df.reset_index(inplace=True)

df_Industry_Portfolios = df_Industry_Portfolios.copy()

signals_df['Date'] = pd.to_datetime(signals_df['Date'])
df_Industry_Portfolios['Date'] = pd.to_datetime(df_Industry_Portfolios['Date'])

combined_df = df_Industry_Portfolios.merge(signals_df, on='Date', how='left')

combined_df.fillna(method='ffill', inplace=True)
combined_df = combined_df[combined_df['Date'] >= '1992-04-14'].copy()
combined_df.reset_index(inplace=True)
del combined_df['index']

data = combined_df

df_base = pd.DataFrame(data)
df_base['Date'] = pd.to_datetime(df_base['Date'])
df_base.set_index('Date', inplace=True)

columns_for_returns = [
    'NoDur', 'Durbl', 'Manuf', 'Enrgy', 'Chems', 'BusEq', 'Telcm', 'Utils', 'Shops',
    'Hlth', 'Money', 'Other'
]

signal_column = '452: General Merchandise Stores'

for return_column in columns_for_returns:
    df_base[return_column] = pd.to_numeric(df_base[return_column], errors='coerce')
    df_base[return_column + '_Port'] = (
        df_base[return_column] * df_base[signal_column] 
    )

In [None]:
# FF3 factors

zip_file_path = 'F-F_Research_Data_Factors_daily_CSV.zip'

with ZipFile(zip_file_path, 'r') as archive:
    with archive.open('F-F_Research_Data_Factors_daily.CSV') as csv_file:
        ff3_df = pd.read_csv(io.TextIOWrapper(csv_file),skiprows=3)


index_to_cut = ff3_df[ff3_df['Unnamed: 0'] == 'Copyright 2023 Kenneth R. French'].index[0]
ff3_df = ff3_df.iloc[:index_to_cut]
ff3_df = ff3_df.rename(columns={'Unnamed: 0': 'Date'})
ff3_df['Date'] = pd.to_datetime(ff3_df['Date'], format='%Y%m%d')
ff3_df = ff3_df[ff3_df['Date'] >= '1992-04-14'].copy()
ff3_df.set_index('Date', inplace=True)
df_base = df_base.merge(ff3_df, on='Date', how='left')

In [None]:
# Performance of the sectors

portfolio_names = df_base.columns[13:25]
portfolio_returns = df_base.iloc[:,13:25]
portfolio_returns = portfolio_returns/100.0

performance_stats = pd.DataFrame(index=portfolio_names)

for portfolio_name, returns in portfolio_returns.items():
    alpha = 0.01 
    var = np.percentile(returns, alpha*100)

    cumulative_returns = (1 + returns).cumprod()
    cumulative_max = cumulative_returns.cummax() 
    drawdown = cumulative_returns / cumulative_max - 1
    max_drawdown = drawdown.min()
    
    max_cum_return = cumulative_max.max() - 1

    drawdown = pd.to_numeric(drawdown, errors='coerce')
    worst_periods = drawdown.nsmallest(1)
    worst_period_dates = returns.index[drawdown.index.isin(worst_periods.index)]
    
    average_returns = returns.mean()*252
    average_vola = (returns.std() * np.sqrt(252))
    
    cum_return = (1 + returns).prod() - 1

    skewness = returns.skew()
    kurt = returns.kurtosis()

    sharpe_ratio = average_returns/ average_vola

    positive_days = returns[returns > 0] 
    percentage_positive_days = (len(positive_days) / len(returns)) * 100

    annual_returns = returns.resample('Y').apply(lambda x: (1 + x).prod() - 1)

    max_annual_return = annual_returns.max()
    min_annual_return = annual_returns.min()
    best_year = annual_returns.idxmax()
    worst_year = annual_returns.idxmin()

    performance_stats.at[portfolio_name, 'Annualized Return'] = "{:.2f}%".format(average_returns*100)
    performance_stats.at[portfolio_name, 'Annualized Volatility'] = "{:.2f}%".format(average_vola*100)
    performance_stats.at[portfolio_name, 'Sharpe Ratio'] = "{:.3f}".format(sharpe_ratio)
    performance_stats.at[portfolio_name, 'Max Annual Return'] = "{:.2f}%".format(max_annual_return * 100)
    performance_stats.at[portfolio_name, 'Best Year'] = best_year.strftime('%Y')
    performance_stats.at[portfolio_name, 'Min Annual Return'] = "{:.2f}%".format(min_annual_return * 100)
    performance_stats.at[portfolio_name, 'Worst Year'] = worst_year.strftime('%Y')
    performance_stats.at[portfolio_name, 'VaR'] = "{:.2f}%".format(var * 100)
    performance_stats.at[portfolio_name, 'Skew'] = "{:.3f}".format(skewness)
    performance_stats.at[portfolio_name, 'Kurtosis'] = "{:.3f}".format(kurt)
    performance_stats.at[portfolio_name, 'Cumulative Return'] = "{:.2f}%".format(cum_return*100)
    performance_stats.at[portfolio_name, 'Max Cum. Return'] = "{:.2f}%".format(max_cum_return*100)
    performance_stats.at[portfolio_name, 'Max Drawdown'] = "{:.2f}%".format(max_drawdown * 100) 
    performance_stats.at[portfolio_name, 'Worst Perf. Period'] = worst_period_dates.to_list()
    performance_stats.at[portfolio_name, 'Positive Days'] = "{:.2f}%".format(percentage_positive_days)

performance_stats


In [None]:
# Plotting cumulative retusn of the sectors
columns_for_cumprod = [return_column + '_Port' for return_column in columns_for_returns]

df_base_cum = pd.DataFrame(df_base)
df_base_cum[columns_for_cumprod] = (1 + df_base_cum[columns_for_cumprod]/100.0).cumprod() - 1

sorted_columns = df_base_cum[columns_for_cumprod].iloc[-1].sort_values(ascending=False).index
df_base_sorted = df_base_cum[columns_for_cumprod][sorted_columns]

fig = px.line(df_base_sorted, y=df_base_sorted.columns, title='Cumulative Returns')

fig.update_layout(
    xaxis_title='Date',
    yaxis_title='Portfolio Cumulative Return',
    xaxis=dict(title='Date', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    yaxis=dict(title='Cumulative Return', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    legend=dict(
        orientation="h",
        x=0.5,
        y=-0.15,
        xanchor='center',
        yanchor='top',
        bgcolor='rgba(0,0,0,0)'),
)

fig.add_shape(
    type="rect",
    x0=0,
    x1=1,
    y0=0,
    y1=1,
    xref="paper",
    yref="paper",
    line=dict(color="black", width=1.5),
    layer="below"
)

fig.update_layout(
    plot_bgcolor='white',
    paper_bgcolor='white',
    legend_title='',
)

fig.show()

In [None]:
# CAPM and FF3 for the sectors, full period

market_df = ff3_df.merge(df_base.iloc[:, 13:25], left_index=True, right_index=True, how='left')

portfolio_results = []

for portfolio_return in market_df.columns[4:]:
    try:
        #CAPM
        market_return = market_df['Mkt-RF']
        excess_return = market_df[portfolio_return]

        results_capm = sm.OLS(excess_return, sm.add_constant(market_return)).fit()

        alpha_capm = results_capm.params[0]
        t_statistic_alpha_capm = results_capm.tvalues[0]

        tracking_error_capm = excess_return.std()
        ir_capm = alpha_capm / tracking_error_capm
        
        beta_mkt_capm = results_capm.params[1]
        t_statistic_beta_mkt_capm = results_capm.tvalues[1]
        
        #FF3
        market_return = market_df[['Mkt-RF', 'SMB', 'HML']]
        excess_return = market_df[portfolio_return]

        results_ff3 = sm.OLS(excess_return, sm.add_constant(market_return)).fit()

        alpha_ff3 = results_ff3.params[0]
        t_statistic_alpha_ff3 = results_ff3.tvalues[0]
        
        tracking_error_ff3 = excess_return.std()
        ir_ff3 = alpha_ff3 / tracking_error_ff3
        
        beta_mkt_ff3 = results_ff3.params[1]
        beta_smb_ff3 = results_ff3.params[2]
        beta_hml_ff3 = results_ff3.params[3]

        
        t_statistic_beta_mkt_ff3 = results_ff3.tvalues[1]
        t_statistic_beta_smb_ff3 = results_ff3.tvalues[2]
        t_statistic_beta_hml_ff3 = results_ff3.tvalues[3]

        result_dict = {'Portfolio': portfolio_return,
                       'IR CAPM': ir_capm,
                       'Alpha ': alpha_capm,
                       'T-Stat ': t_statistic_alpha_capm,
                       
                       'Beta Mkt-Rf': beta_mkt_capm,
                       'T-Stat Beta Mkt-Rf': t_statistic_beta_mkt_capm,
                       
                       'IR FF3': ir_ff3,
                       'Alpha': alpha_ff3,
                       'T-Stat': t_statistic_alpha_ff3,
        
                       'Beta MKT': beta_mkt_ff3,
                       'T-Stat Beta MKT': t_statistic_beta_mkt_ff3,
                       'Beta SMB': beta_smb_ff3,
                       'T-Stat Beta SMB': t_statistic_beta_smb_ff3,
                       'Beta HML': beta_hml_ff3,
                       'T-Stat Beta HML': t_statistic_beta_hml_ff3,
                      }

        portfolio_results.append(result_dict)
        
    except Exception as e:
        print(f"Error occurred: {e}")
results_df = pd.DataFrame(portfolio_results)
results_df


## Period Analysis 

In [None]:
# Defining the date ranges for the two periods and their performance
periods = [
    ("Period 1", pd.to_datetime("1992-04-14"), pd.to_datetime("2007-12-31")),
    ("Period 2", pd.to_datetime("2008-01-01"), pd.to_datetime("2023-12-31"))
]

period_stats = {period_name: pd.DataFrame(index=portfolio_names) for period_name, _, _ in periods}

for portfolio_name, returns in portfolio_returns.items():
    for period_name, period_start, period_end in periods:
        returns_period = returns.loc[(returns.index >= period_start) & (returns.index <= period_end)]
        
        alpha = 0.01
        var_period = np.percentile(returns_period, alpha * 100)

        cumulative_returns_period = (1 + returns_period).cumprod()
        cumulative_max_period = cumulative_returns_period.cummax()
        drawdown_period = cumulative_returns_period / cumulative_max_period - 1
        max_drawdown_period = drawdown_period.min()

        max_cum_return_period = cumulative_max_period.max() - 1
        
        drawdown_period = pd.to_numeric(drawdown_period, errors='coerce')

        worst_periods_period = drawdown_period.nsmallest(1).index
        worst_period_dates_period = returns_period.index[drawdown_period.index.isin(worst_periods_period)]

        average_returns_period = returns_period.mean()*252
        average_vola_period = (returns_period.std() * np.sqrt(252))

        cum_return_period = (1 + returns_period).prod() - 1

        skewness_period = returns_period.skew()
        kurtosis_period = returns_period.kurtosis()

        sharpe_ratio_period = average_returns_period / average_vola_period

        positive_days_period = returns_period[returns_period > 0] 
        percentage_positive_days_period = (len(positive_days_period) / len(returns_period)) * 100

        annual_returns = returns_period.resample('Y').apply(lambda x: (1 + x).prod() - 1)

        max_annual_return = annual_returns.max()
        min_annual_return = annual_returns.min()
        best_year = annual_returns.idxmax()
        worst_year = annual_returns.idxmin()

        period_stats[period_name].at[portfolio_name, 'Annualized Return'] = "{:.2f}%".format(average_returns_period * 100)
        period_stats[period_name].at[portfolio_name, 'Annualized Volatility'] = "{:.2f}%".format(average_vola_period * 100)
        period_stats[period_name].at[portfolio_name, 'Sharpe Ratio'] = "{:.3f}".format(sharpe_ratio_period)
        period_stats[period_name].at[portfolio_name, 'Max Annual Return'] = "{:.2f}%".format(max_annual_return * 100)
        period_stats[period_name].at[portfolio_name, 'Best Year'] = best_year.strftime('%Y')
        period_stats[period_name].at[portfolio_name, 'Min Annual Return'] = "{:.2f}%".format(min_annual_return * 100)
        period_stats[period_name].at[portfolio_name, 'Worst Year'] = worst_year.strftime('%Y')
        period_stats[period_name].at[portfolio_name, 'VaR'] = "{:.2f}%".format(var_period * 100)
        period_stats[period_name].at[portfolio_name, 'Skew'] = "{:.3f}".format(skewness_period)
        period_stats[period_name].at[portfolio_name, 'Kurtosis'] = "{:.3f}".format(kurtosis_period)
        period_stats[period_name].at[portfolio_name, 'Cumulative Return'] = "{:.2f}%".format(cum_return_period * 100)
        period_stats[period_name].at[portfolio_name, 'Max Cum. Return'] = "{:.2f}%".format(max_cum_return_period * 100)
        period_stats[period_name].at[portfolio_name, 'Max Drawdown'] = "{:.2f}%".format(max_drawdown_period * 100)
        period_stats[period_name].at[portfolio_name, 'Worst Perf. Period'] = worst_period_dates_period.to_list()
        period_stats[period_name].at[portfolio_name, 'Positive Days'] = "{:.2f}%".format(percentage_positive_days_period)

In [None]:
# CAPM and FF3 in the periods
def calculate_metrics_capm(portfolio_returns, market_returns):
    X = sm.add_constant(market_returns)
    model = sm.OLS(portfolio_returns, X).fit()
    
    alpha_capm = model.params[0]
    beta_mkt_capm = model.params[1]
    t_statistic_alpha_capm = model.tvalues[0]
    t_statistic_beta_mkt_capm = model.tvalues[1]
    
    tracking_error_capm = portfolio_returns.std()
    ir_capm = alpha_capm / tracking_error_capm
        
    return pd.Series({
        'IR CAPM': ir_capm,
        'Alpha CAPM': alpha_capm,
        'T-Stat CAPM': t_statistic_alpha_capm,               
        'Beta Mkt-Rf': beta_mkt_capm,
        'T-Stat Beta Mkt-Rf': t_statistic_beta_mkt_capm, 
        })

def calculate_metrics_ff3(portfolio_returns, market_returns):
    X = sm.add_constant(market_returns)
    model = sm.OLS(portfolio_returns, X).fit()
    
    alpha_ff5 = model.params[0]
    t_statistic_alpha_ff5 = model.tvalues[0]
    
    tracking_error_ff5 = portfolio_returns.std()
    ir_ff5 = alpha_ff5 / tracking_error_ff5
    
    beta_mkt_ff5 = model.params[1]
    beta_smb_ff5 = model.params[2]
    beta_hml_ff5 = model.params[3]

    
    t_statistic_beta_mkt_ff5 = model.tvalues[1]
    t_statistic_beta_smb_ff5 = model.tvalues[2]
    t_statistic_beta_hml_ff5 = model.tvalues[3]

        
    return pd.Series({
        'IR FF3': ir_ff5,
        'Alpha FF3': alpha_ff5,
        'T-Stat FF3': t_statistic_alpha_ff5,               
        'Beta MKT': beta_mkt_ff5,
        'T-Stat Beta MKT': t_statistic_beta_mkt_ff5,
        'Beta SMB': beta_smb_ff5,
        'T-Stat Beta SMB': t_statistic_beta_smb_ff5,
        'Beta HML': beta_hml_ff5,
        'T-Stat Beta HML': t_statistic_beta_hml_ff5,

        })

portfolio_columns = portfolio_returns.columns

results_period1 = pd.DataFrame(index=portfolio_columns)
results_period2 = pd.DataFrame(index=portfolio_columns)

for portfolio_column in portfolio_columns:
    for period, start_date, end_date in periods:
        
        portfolio_returns_period = portfolio_returns.loc[start_date:end_date, portfolio_column].reset_index(drop=True)
        
        market_returns_period_capm = ff3_df.loc[start_date:end_date, 'Mkt-RF'].reset_index(drop=True)
        market_returns_period_ff5 = ff3_df.loc[start_date:end_date, ['Mkt-RF', 'SMB', 'HML']].reset_index(drop=True)
        
        metrics_capm = calculate_metrics_capm(portfolio_returns_period, market_returns_period_capm)
        metrics_ff3 = calculate_metrics_ff3(portfolio_returns_period, market_returns_period_ff5)
        
        if period == "Period 1":
            results_df = results_period1
        elif period == "Period 2":
            results_df = results_period2
        else:
            raise ValueError(f"Unknown period: {period}")
        
        
        results_df.loc[portfolio_column, 'CAPM_IR'] = metrics_capm['IR CAPM']
        results_df.loc[portfolio_column, 'Alpha '] = metrics_capm['Alpha CAPM']
        results_df.loc[portfolio_column, 'T-Stat '] = metrics_capm['T-Stat CAPM']
        results_df.loc[portfolio_column, 'Beta_Mkt-Rf'] = metrics_capm['Beta Mkt-Rf']
        results_df.loc[portfolio_column, 'T-Stat Beta_Mkt-Rf'] = metrics_capm['T-Stat Beta Mkt-Rf']

        results_df.loc[portfolio_column, 'FF3_IR'] = metrics_ff3['IR FF3']
        results_df.loc[portfolio_column, 'Alpha'] = metrics_ff3['Alpha FF3']
        results_df.loc[portfolio_column, 'T-Stat'] = metrics_ff3['T-Stat FF3']
        results_df.loc[portfolio_column, 'Beta_MKT'] = metrics_ff3['Beta MKT']
        results_df.loc[portfolio_column, 'T-Stat Beta_MKT'] = metrics_ff3['T-Stat Beta MKT']
        results_df.loc[portfolio_column, 'Beta_SMB'] = metrics_ff3['Beta SMB']
        results_df.loc[portfolio_column, 'T-Stat Beta_SMB'] = metrics_ff3['T-Stat Beta SMB']
        results_df.loc[portfolio_column, 'Beta_HML'] = metrics_ff3['Beta HML']
        results_df.loc[portfolio_column, 'T-Stat Beta_HML'] = metrics_ff3['T-Stat Beta HML']


In [None]:
period_stats["Period 1"]

In [None]:
results_period1

In [None]:
# Plotting period cumulative returns
cumulative_returns = {str(period_name): {} for period_name, _, _ in periods}

for portfolio_name, returns in portfolio_returns.items():
    for period_name, period_start, period_end in periods:
        returns_period = returns.loc[(returns.index >= period_start) & (returns.index <= period_end)]

        cumulative_returns_period = (1 + returns_period).cumprod() - 1
        key = str(period_name)  
        cumulative_returns[key][portfolio_name] = cumulative_returns_period

period_name = periods[0][0]
key = str(period_name)

sorted_portfolio_names = sorted(
    cumulative_returns[key].keys(),
    key=lambda portfolio_name: cumulative_returns[key][portfolio_name].iloc[-1],
    reverse=True
)

traces = []
for portfolio_name in sorted_portfolio_names:
    cumulative_return = cumulative_returns[key][portfolio_name]
    trace = go.Scatter(x=cumulative_return.index, y=cumulative_return, mode='lines', name=portfolio_name)
    traces.append(trace)

layout = go.Layout(
    title=f'{period_name} Cumulative Returns',
    xaxis=dict(title='Date', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    yaxis=dict(title='Cumulative Return', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    legend=dict(
        orientation="h",
        x=0.5,
        y=-0.15,
        xanchor='center',
        yanchor='top',
        bgcolor='rgba(0,0,0,0)'),
)

fig = go.Figure(data=traces, layout=layout)

fig.add_shape(
    type="rect",
    x0=0,
    x1=1,
    y0=0,
    y1=1,
    xref="paper",
    yref="paper",
    line=dict(color="black", width=1.5),
    layer="below"
)

fig.update_layout(
    plot_bgcolor='white',
    paper_bgcolor='white',
    legend_title='',
)
fig.show()

In [None]:
period_stats["Period 2"]

In [None]:
results_period2

In [None]:
period_name = periods[1][0]
key = str(period_name)

sorted_portfolio_names = sorted(
    cumulative_returns[key].keys(),
    key=lambda portfolio_name: cumulative_returns[key][portfolio_name].iloc[-1],
    reverse=True
)

traces = []
for portfolio_name in sorted_portfolio_names:
    cumulative_return = cumulative_returns[key][portfolio_name]
    trace = go.Scatter(x=cumulative_return.index, y=cumulative_return, mode='lines', name=portfolio_name)
    traces.append(trace)

layout = go.Layout(
    title=f'{period_name} Cumulative Returns',
    xaxis=dict(title='Date', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    yaxis=dict(title='Cumulative Return', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    legend=dict(
        orientation="h",
        x=0.5,
        y=-0.15,
        xanchor='center',
        yanchor='top',
        bgcolor='rgba(0,0,0,0)'),
)

fig = go.Figure(data=traces, layout=layout)

fig.add_shape(
    type="rect",
    x0=0,
    x1=1,
    y0=0,
    y1=1,
    xref="paper",
    yref="paper",
    line=dict(color="black", width=1.5),
    layer="below"
)

fig.update_layout(
    plot_bgcolor='white',
    paper_bgcolor='white',
    legend_title='',
)

fig.show()

## Long-short Portfolio

In [None]:
# Creating the Long-Only and Long-Short Portfolios

portfolio_returns_longShort = pd.DataFrame(index=portfolio_returns.index, columns=['LongShortPortfolio'])
portfolio_returns_longShort['LongShortPortfolio'] = (1/12) * portfolio_returns.sum(axis=1)

raw_return= pd.DataFrame(index=portfolio_returns.index)
raw_return = df_base.iloc[:,:12]/100
raw_return = raw_return[raw_return.index >= '1992-04-14'].copy()
portfolio_returns_longOnly = pd.DataFrame(index=raw_return.index, columns=['LongOnlyPortfolio'])

portfolio_returns_longOnly['LongOnlyPortfolio'] = (1/12) * raw_return.sum(axis=1)

market = pd.DataFrame(index=raw_return.index, columns=['Market'])
market = market[market.index >= '1992-04-14'].copy()
market['Market'] = ff3_df['Mkt-RF']/100

all_portfolios = pd.concat([portfolio_returns_longShort, portfolio_returns_longOnly, market], axis=1)
all_portfolios_cum = (1+all_portfolios).cumprod()-1


# Plotting
fig = px.line(all_portfolios_cum, y=all_portfolios_cum.columns, title='Cumulative Returns')

fig.update_layout(
    xaxis_title='Date',
    yaxis_title='Portfolio Cumulative Return',
    xaxis=dict(showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    yaxis=dict(showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    plot_bgcolor='white',
    paper_bgcolor='white',
    legend_title='',
    legend=dict(
        orientation="h",
        x=0.5,
        y=-0.15,
        xanchor='center',
        yanchor='top',
        bgcolor='rgba(0,0,0,0)'),
)

fig.add_shape(
    type="rect",
    x0=0,
    x1=1,
    y0=0,
    y1=1,
    xref="paper",
    yref="paper",
    line=dict(color="black", width=1.5),
    layer="below"
)

fig.update_layout(width=1000, height=600)
fig.show()


In [None]:
# Performance Stats for each Portfolio
strat_names = all_portfolios.columns
strat_returns = all_portfolios.iloc[:,:]

strat_stats = pd.DataFrame(index=strat_names)

for strat_name, returns in strat_returns.items():
    alpha = 0.01
    var = np.percentile(returns, alpha*100)

    cumulative_returns = (1 + returns).cumprod()
    cumulative_max = cumulative_returns.cummax() 
    drawdown = cumulative_returns / cumulative_max - 1
    max_drawdown = drawdown.min()
    
    max_cum_return = cumulative_max.max() - 1

    drawdown = pd.to_numeric(drawdown, errors='coerce')
    worst_periods = drawdown.nsmallest(1)
    worst_period_dates = returns.index[drawdown.index.isin(worst_periods.index)]
    
    average_returns = returns.mean()*252
    average_vola = (returns.std() * np.sqrt(252))
    
    cum_return = (1 + returns).prod() - 1

    skewness = returns.skew()
    kurt = returns.kurtosis()

    sharpe_ratio = average_returns/ average_vola

    positive_days = returns[returns > 0]
    percentage_positive_days = (len(positive_days) / len(returns)) * 100
    
    annual_returns = returns.resample('Y').apply(lambda x: (1 + x).prod() - 1)

    max_annual_return = annual_returns.max()
    min_annual_return = annual_returns.min()
    best_year = annual_returns.idxmax()
    worst_year = annual_returns.idxmin()

    strat_stats.at[strat_name, 'Annualized Return'] = "{:.2f}%".format(average_returns*100)
    strat_stats.at[strat_name, 'Annualized Volatility'] = "{:.2f}%".format(average_vola*100)
    strat_stats.at[strat_name, 'Sharpe Ratio'] = "{:.3f}".format(sharpe_ratio)
    strat_stats.at[strat_name, 'Max Annual Return'] = "{:.2f}%".format(max_annual_return * 100)
    strat_stats.at[strat_name, 'Best Year'] = best_year.strftime('%Y')
    strat_stats.at[strat_name, 'Min Annual Return'] = "{:.2f}%".format(min_annual_return * 100)
    strat_stats.at[strat_name, 'Worst Year'] = worst_year.strftime('%Y')
    strat_stats.at[strat_name, 'VaR'] = "{:.2f}%".format(var * 100)
    strat_stats.at[strat_name, 'Skew'] = "{:.3f}".format(skewness)
    strat_stats.at[strat_name, 'Kurtosis'] = "{:.3f}".format(kurt)
    strat_stats.at[strat_name, 'Cumulative Return'] = "{:.2f}%".format(cum_return*100)
    strat_stats.at[strat_name, 'Max Cum. Return'] = "{:.2f}%".format(max_cum_return*100)
    strat_stats.at[strat_name, 'Max Drawdown'] = "{:.2f}%".format(max_drawdown * 100) 
    strat_stats.at[strat_name, 'Worst Perf. Period'] = worst_period_dates.to_list()
    strat_stats.at[strat_name, 'Positive Days'] = "{:.2f}%".format(percentage_positive_days)

strat_stats

In [None]:
# CAPM and FF3 for the 2 portfolios
market_df = ff3_df.merge(all_portfolios, left_index=True, right_index=True, how='left')

portfolio_results = []

for portfolio_return in market_df.columns[4:6]:
    try:
        #CAPM
        market_return = market_df['Mkt-RF']
        excess_return = market_df[portfolio_return]

        results_capm = sm.OLS(excess_return, sm.add_constant(market_return)).fit()

        alpha_capm = results_capm.params[0]
        t_statistic_alpha_capm = results_capm.tvalues[0]

        tracking_error_capm = excess_return.std()
        ir_capm = alpha_capm / tracking_error_capm
        
        beta_mkt_capm = results_capm.params[1]
        t_statistic_beta_mkt_capm = results_capm.tvalues[1]
        
        #FF3
        market_return = market_df[['Mkt-RF', 'SMB', 'HML']]
        excess_return = market_df[portfolio_return]

        results_ff5 = sm.OLS(excess_return, sm.add_constant(market_return)).fit()

        alpha_ff5 = results_ff5.params[0]
        t_statistic_alpha_ff5 = results_ff5.tvalues[0]
        
        tracking_error_ff5 = excess_return.std()
        ir_ff5 = alpha_ff5 / tracking_error_ff5
        
        beta_mkt_ff5 = results_ff5.params[1]
        beta_smb_ff5 = results_ff5.params[2]
        beta_hml_ff5 = results_ff5.params[3]

        
        t_statistic_beta_mkt_ff5 = results_ff5.tvalues[1]
        t_statistic_beta_smb_ff5 = results_ff5.tvalues[2]
        t_statistic_beta_hml_ff5 = results_ff5.tvalues[3]

        result_dict = {'Portfolio': portfolio_return,
                       'IR CAPM': ir_capm,
                       'Alpha ': alpha_capm,
                       'T-Stat ': t_statistic_alpha_capm,
                       
                       'Beta Mkt-Rf': beta_mkt_capm,
                       'T-Stat Beta Mkt-Rf': t_statistic_beta_mkt_capm,
                       
                       'IR FF3': ir_ff5,
                       'Alpha': alpha_ff5,
                       'T-Stat': t_statistic_alpha_ff5,
        
                       'Beta MKT': beta_mkt_ff5,
                       'T-Stat Beta MKT': t_statistic_beta_mkt_ff5,
                       'Beta SMB': beta_smb_ff5,
                       'T-Stat Beta SMB': t_statistic_beta_smb_ff5,
                       'Beta HML': beta_hml_ff5,
                       'T-Stat Beta HML': t_statistic_beta_hml_ff5,

                      }

        portfolio_results.append(result_dict)
        
    except Exception as e:
        print(f"Error occurred: {e}")

results_df = pd.DataFrame(portfolio_results)
results_df

## Portfolio Period Analysis

In [None]:
# Creating the period for the portfolios and the performance

strat_stats = {period_name: pd.DataFrame(index=strat_names) for period_name, _, _ in periods}

for strat_name, returns in strat_returns.items():
    for period_name, period_start, period_end in periods:
        returns_period = returns.loc[(returns.index >= period_start) & (returns.index <= period_end)]
        alpha = 0.01  
        var_period = np.percentile(returns_period, alpha * 100)

        cumulative_returns_period = (1 + returns_period).cumprod()
        cumulative_max_period = cumulative_returns_period.cummax()
        drawdown_period = cumulative_returns_period / cumulative_max_period - 1
        max_drawdown_period = drawdown_period.min()

        max_cum_return_period = cumulative_max_period.max() - 1
        
        drawdown_period = pd.to_numeric(drawdown_period, errors='coerce')

        worst_periods_period = drawdown_period.nsmallest(1).index
        worst_period_dates_period = returns_period.index[drawdown_period.index.isin(worst_periods_period)]

        average_returns_period = returns_period.mean()*252
        average_vola_period = (returns_period.std() * np.sqrt(252))

        cum_return_period = (1 + returns_period).prod() - 1

        skewness_period = returns_period.skew()
        kurtosis_period = returns_period.kurtosis()

        sharpe_ratio_period = average_returns_period / average_vola_period

        positive_days_period = returns_period[returns_period > 0]
        percentage_positive_days_period = (len(positive_days_period) / len(returns_period)) * 100

        annual_returns = returns_period.resample('Y').apply(lambda x: (1 + x).prod() - 1)

        max_annual_return = annual_returns.max()
        min_annual_return = annual_returns.min()
        best_year = annual_returns.idxmax()
        worst_year = annual_returns.idxmin()

        strat_stats[period_name].at[strat_name, 'Annualized Return'] = "{:.2f}%".format(average_returns_period * 100)
        strat_stats[period_name].at[strat_name, 'Annualized Volatility'] = "{:.2f}%".format(average_vola_period * 100)
        strat_stats[period_name].at[strat_name, 'Sharpe Ratio'] = "{:.3f}".format(sharpe_ratio_period)
        strat_stats[period_name].at[strat_name, 'Max Annual Return'] = "{:.2f}%".format(max_annual_return * 100)
        strat_stats[period_name].at[strat_name, 'Best Year'] = best_year.strftime('%Y')
        strat_stats[period_name].at[strat_name, 'Min Annual Return'] = "{:.2f}%".format(min_annual_return * 100)
        strat_stats[period_name].at[strat_name, 'Worst Year'] = worst_year.strftime('%Y')
        strat_stats[period_name].at[strat_name, 'VaR'] = "{:.2f}%".format(var_period * 100)
        strat_stats[period_name].at[strat_name, 'Skew'] = "{:.3f}".format(skewness_period)
        strat_stats[period_name].at[strat_name, 'Kurtosis'] = "{:.3f}".format(kurtosis_period)
        strat_stats[period_name].at[strat_name, 'Cumulative Return'] = "{:.2f}%".format(cum_return_period * 100)
        strat_stats[period_name].at[strat_name, 'Max Cum. Return'] = "{:.2f}%".format(max_cum_return_period * 100)
        strat_stats[period_name].at[strat_name, 'Max Drawdown'] = "{:.2f}%".format(max_drawdown_period * 100)
        strat_stats[period_name].at[strat_name, 'Worst Perf. Period'] = worst_period_dates_period.to_list()
        strat_stats[period_name].at[strat_name, 'Positive Days'] = "{:.2f}%".format(percentage_positive_days_period)

In [None]:
# CAPM and FF3
strategy_columns = ['LongShortPortfolio', 'LongOnlyPortfolio']

results_period1 = pd.DataFrame(index=strategy_columns)
results_period2 = pd.DataFrame(index=strategy_columns)

for strategy_column in strategy_columns:
    for period, start_date, end_date in periods:
        
        strategy_returns_period = strat_returns.loc[start_date:end_date, strategy_column].reset_index(drop=True)
        
        market_returns_period_capm = ff3_df.loc[start_date:end_date, 'Mkt-RF'].reset_index(drop=True)
        market_returns_period_ff3 = ff3_df.loc[start_date:end_date, ['Mkt-RF', 'SMB', 'HML']].reset_index(drop=True)
        
        metrics_capm = calculate_metrics_capm(strategy_returns_period, market_returns_period_capm)
        metrics_ff3 = calculate_metrics_ff3(strategy_returns_period, market_returns_period_ff3)
            
        if period == "Period 1":
            results_df = results_period1
        elif period == "Period 2":
            results_df = results_period2
        else:
            raise ValueError(f"Unknown period: {period}")
        
        results_df.loc[strategy_column, 'CAPM_IR'] = metrics_capm['IR CAPM']
        results_df.loc[strategy_column, 'Alpha '] = metrics_capm['Alpha CAPM']
        results_df.loc[strategy_column, 'T-Stat '] = metrics_capm['T-Stat CAPM']
        results_df.loc[strategy_column, 'Beta_Mkt-Rf'] = metrics_capm['Beta Mkt-Rf']        
        results_df.loc[strategy_column, 'T-Stat Beta_Mkt-Rf'] = metrics_capm['T-Stat Beta Mkt-Rf']

        results_df.loc[strategy_column, 'FF3_IR'] = metrics_ff3['IR FF3']
        results_df.loc[strategy_column, 'Alpha'] = metrics_ff3['Alpha FF3']
        results_df.loc[strategy_column, 'T-Stat'] = metrics_ff3['T-Stat FF3']
        results_df.loc[strategy_column, 'Beta_MKT'] = metrics_ff3['Beta MKT']
        results_df.loc[strategy_column, 'T-Stat Beta_MKT'] = metrics_ff3['T-Stat Beta MKT']
        results_df.loc[strategy_column, 'Beta_SMB'] = metrics_ff3['Beta SMB']
        results_df.loc[strategy_column, 'T-Stat Beta_SMB'] = metrics_ff3['T-Stat Beta SMB']
        results_df.loc[strategy_column, 'Beta_HML'] = metrics_ff3['Beta HML']
        results_df.loc[strategy_column, 'T-Stat Beta_HML'] = metrics_ff3['T-Stat Beta HML']



In [None]:
strat_stats["Period 1"]

In [None]:
results_period1

In [None]:
# Plotting
cumulative_returns = {str(period_name): {} for period_name, _, _ in periods}

for strat_name, returns in strat_returns.items():
    for period_name, period_start, period_end in periods:
        returns_period = returns.loc[(returns.index >= period_start) & (returns.index <= period_end)]

        cumulative_returns_period = (1 + returns_period).cumprod() - 1
        key = str(period_name)
        cumulative_returns[key][strat_name] = cumulative_returns_period

period_name = periods[0][0]
key = str(period_name)

traces = []
for portfolio_name, cumulative_return in cumulative_returns[key].items():
    trace = go.Scatter(x=cumulative_return.index, y=cumulative_return, mode='lines', name=portfolio_name)
    traces.append(trace)

layout = go.Layout(
    title=f'{period_name} Cumulative Returns',
    xaxis=dict(title='Date', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    yaxis=dict(title='Cumulative Return', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    legend=dict(
        orientation="h",
        x=0.5,
        y=-0.15,
        xanchor='center',
        yanchor='top',
        bgcolor='rgba(0,0,0,0)'),
)

fig = go.Figure(data=traces, layout=layout)

fig.add_shape(
    type="rect",
    x0=0,
    x1=1,
    y0=0,
    y1=1,
    xref="paper",
    yref="paper",
    line=dict(color="black", width=1.5),
    layer="below"
)

fig.update_layout(
    plot_bgcolor='white',
    paper_bgcolor='white',
    legend_title='',
)

fig.show()


In [None]:
strat_stats["Period 2"]

In [None]:
results_period2

In [None]:
period_name = periods[1][0]
key = str(period_name) 

traces = []
for portfolio_name, cumulative_return in cumulative_returns[key].items():
    trace = go.Scatter(x=cumulative_return.index, y=cumulative_return, mode='lines', name=portfolio_name)
    traces.append(trace)

layout = go.Layout(
    title=f'{period_name} Cumulative Returns',
    xaxis=dict(title='Date', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    yaxis=dict(title='Cumulative Return', showline=True, showgrid=True, gridwidth=0.2, gridcolor='lightgray', showticklabels=True, linecolor='black'),
    legend=dict(
        orientation="h",
        x=0.5,
        y=-0.15,
        xanchor='center',
        yanchor='top',
        bgcolor='rgba(0,0,0,0)'),
)

fig = go.Figure(data=traces, layout=layout)

fig.add_shape(
    type="rect",
    x0=0,
    x1=1,
    y0=0,
    y1=1,
    xref="paper",
    yref="paper",
    line=dict(color="black", width=1.5),
    layer="below"
)

fig.update_layout(
    plot_bgcolor='white',
    paper_bgcolor='white',
    legend_title='',
)

fig.show()
