In [None]:
import os
import gc
import numpy as np
import pandas as pd

from glob import glob
from tqdm import tqdm
import statsmodels.api as sm

import matplotlib.pyplot as plt
import matplotlib as mpl
from scipy.stats import skew, kurtosis
mpl.rcParams['font.family'] = 'Times New Roman'

# 3 by 3 CAP VW

In [None]:
sorted_port_3by3 = pd.read_csv(r'sorted_portfolio\ViT_3by3.csv')

return_matrix = np.zeros((3, 3))
t_stat_matrix = np.zeros((3, 3))
t_stat_matrix = t_stat_matrix.astype(str)  # Convert to string for formatting later
std_matrix = np.zeros((3, 3))
shrape_matrix = np.zeros((3, 3)) # need to adjust monthly -> anually 

for size in [1,2,3]:
    for signal in [1,2,3]:
        temp = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == size) & (sorted_port_3by3['signal_decile_3'] == signal)].copy()
        temp.set_index('date', inplace=True)
        temp.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)

        return_matrix[size-1, signal-1] = temp.mean()['Cap_VW_return']

        returns = list(temp['Cap_VW_return'])

        # Regress on a constant (to test average return)
        X = np.ones_like(returns)  # Intercept only
        model = sm.OLS(returns, X)
        results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  

        t_stat_matrix[size-1, signal-1] = '(' + np.round(results.tvalues[0], 3).astype(str) + ')'
        std_matrix[size-1, signal-1] = np.std(returns)

        # annualize the Sharpe ratio
        shrape_matrix[size-1, signal-1] = np.mean(returns) / np.std(returns) * np.sqrt(12) 

latex_row = []

for i in range(3):
    latex_row.append(np.round(return_matrix*100,3)[i])
    latex_row.append(t_stat_matrix[i])

latex_table = pd.DataFrame(latex_row)

print(latex_table.astype(str).to_latex(index=False))

In [None]:
sorted_port_3by3 = pd.read_csv(r'sorted_portfolio\ViT_3by3.csv')

# size 1 High minus low

small_high = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 1) & (sorted_port_3by3['signal_decile_3'] == 3)].copy()
small_low = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 1) & (sorted_port_3by3['signal_decile_3'] == 1)].copy()

small_high.set_index('date', inplace=True)
small_low.set_index('date', inplace=True)

small_high.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
small_low.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)

returns = small_high['Cap_VW_return'] - small_low['Cap_VW_return']

returns = list(returns)

# Regress on a constant (to test average return)
X = np.ones_like(returns)  # Intercept only
model = sm.OLS(returns, X)
results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  

print(np.round(np.mean(returns)*100, 3).astype(str))
print('(' + np.round(results.tvalues[0], 3).astype(str) + ')')

# size 2 High minus low

mid_high = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 2) & (sorted_port_3by3['signal_decile_3'] == 3)].copy()
mid_low = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 2) & (sorted_port_3by3['signal_decile_3'] == 1)].copy()
mid_high.set_index('date', inplace=True)
mid_low.set_index('date', inplace=True)

mid_high.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
mid_low.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)

returns = mid_high['Cap_VW_return'] - mid_low['Cap_VW_return']
returns = list(returns)
# Regress on a constant (to test average return)
X = np.ones_like(returns)  # Intercept only
model = sm.OLS(returns, X)
results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  
print(np.round(np.mean(returns)*100, 3).astype(str))
print('(' + np.round(results.tvalues[0], 3).astype(str) + ')')

# size 3 High minus low
large_high = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 3) & (sorted_port_3by3['signal_decile_3'] == 3)].copy()
large_low = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 3) & (sorted_port_3by3['signal_decile_3'] == 1)].copy()
large_high.set_index('date', inplace=True)
large_low.set_index('date', inplace=True)
large_high.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
large_low.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
returns = large_high['Cap_VW_return'] - large_low['Cap_VW_return']
returns = list(returns)

# Regress on a constant (to test average return)
X = np.ones_like(returns)  # Intercept only
model = sm.OLS(returns, X)
results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  
print(np.round(np.mean(returns)*100, 3).astype(str))
print('(' + np.round(results.tvalues[0], 3).astype(str) + ')')

In [None]:
sorted_port_3by3 = pd.read_csv(r'sorted_portfolio\CNN20_3by3.csv')

return_matrix = np.zeros((3, 3))
t_stat_matrix = np.zeros((3, 3))
t_stat_matrix = t_stat_matrix.astype(str)  # Convert to string for formatting later
std_matrix = np.zeros((3, 3))
shrape_matrix = np.zeros((3, 3)) # need to adjust monthly -> anually 

for size in [1,2,3]:
    for signal in [1,2,3]:
        temp = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == size) & (sorted_port_3by3['signal_decile_3'] == signal)].copy()
        temp.set_index('date', inplace=True)
        temp.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)

        return_matrix[size-1, signal-1] = temp.mean()['Cap_VW_return']

        returns = list(temp['Cap_VW_return'])

        # Regress on a constant (to test average return)
        X = np.ones_like(returns)  # Intercept only
        model = sm.OLS(returns, X)
        results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  

        t_stat_matrix[size-1, signal-1] = '(' + np.round(results.tvalues[0], 3).astype(str) + ')'
        std_matrix[size-1, signal-1] = np.std(returns)

        # annualize the Sharpe ratio
        shrape_matrix[size-1, signal-1] = np.mean(returns) / np.std(returns) * np.sqrt(12) 

latex_row = []

for i in range(3):
    latex_row.append(np.round(return_matrix*100,3)[i])
    latex_row.append(t_stat_matrix[i])

latex_table = pd.DataFrame(latex_row)

print(latex_table.astype(str).to_latex(index=False))

In [None]:
sorted_port_3by3 = pd.read_csv(r'sorted_portfolio\CNN20_3by3.csv')

small_high = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 1) & (sorted_port_3by3['signal_decile_3'] == 3)].copy()
small_low = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 1) & (sorted_port_3by3['signal_decile_3'] == 1)].copy()

small_high.set_index('date', inplace=True)
small_low.set_index('date', inplace=True)

small_high.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
small_low.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)

returns = small_high['Cap_VW_return'] - small_low['Cap_VW_return']

returns = list(returns)

# Regress on a constant (to test average return)
X = np.ones_like(returns)  # Intercept only
model = sm.OLS(returns, X)
results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  

print('& '+ np.round(np.mean(returns)*100, 3).astype(str))
print('& '+ '(' + np.round(results.tvalues[0], 3).astype(str) + ')')

# size 2 High minus low

mid_high = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 2) & (sorted_port_3by3['signal_decile_3'] == 3)].copy()
mid_low = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 2) & (sorted_port_3by3['signal_decile_3'] == 1)].copy()
mid_high.set_index('date', inplace=True)
mid_low.set_index('date', inplace=True)

mid_high.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
mid_low.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)

returns = mid_high['Cap_VW_return'] - mid_low['Cap_VW_return']
returns = list(returns)
# Regress on a constant (to test average return)
X = np.ones_like(returns)  # Intercept only
model = sm.OLS(returns, X)
results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  
print('& '+ np.round(np.mean(returns)*100, 3).astype(str))
print('& '+ '(' + np.round(results.tvalues[0], 3).astype(str) + ')')

# size 3 High minus low
large_high = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 3) & (sorted_port_3by3['signal_decile_3'] == 3)].copy()
large_low = sorted_port_3by3[(sorted_port_3by3['size_decile_3'] == 3) & (sorted_port_3by3['signal_decile_3'] == 1)].copy()
large_high.set_index('date', inplace=True)
large_low.set_index('date', inplace=True)
large_high.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
large_low.drop(columns=['size_decile_3', 'signal_decile_3'], inplace=True)
returns = large_high['Cap_VW_return'] - large_low['Cap_VW_return']
returns = list(returns)
# Regress on a constant (to test average return)
X = np.ones_like(returns)  # Intercept only
model = sm.OLS(returns, X)
results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 6})  
print('& '+ np.round(np.mean(returns)*100, 3).astype(str))
print('& '+ '(' + np.round(results.tvalues[0], 3).astype(str) + ')')

# 3by3 Cap_vw for Factor gen

In [None]:
vit_3by3 = pd.read_csv(r'sorted_portfolio\ViT_3by3.csv')
cnn_3by3 = pd.read_csv(r'sorted_portfolio\CNN20_3by3.csv')

In [None]:
cap_vw_idx = pd.read_csv('sorted_portfolio\market.csv',index_col=0)
vw_idx = pd.read_csv('sorted_portfolio\market_VW.csv',index_col=0)
ew_idx = pd.read_csv('sorted_portfolio\market_EW.csv',index_col=0)
idx = pd.concat([cap_vw_idx,vw_idx,ew_idx],axis=1)

In [None]:
idx.columns = ['CAP_VW','VW','EW']
idx.index.name = None
idx.index = pd.to_datetime(idx.index)

start_date = pd.to_datetime('2001-01-01')
end_date = pd.to_datetime('2024-12-31')

idx = idx.loc[start_date:end_date].copy()

idx_price = np.exp(np.log(1+idx).cumsum())

idx_monthly = idx_price.resample('ME').last()
idx_month_rt = idx_monthly.pct_change()
idx_month_rt.iloc[0] = (idx_monthly.iloc[0]-1)

In [None]:
ff3 = pd.read_csv(r'data\processed_kelly\ff3.csv', index_col=0)
ff3.index = pd.to_datetime(ff3.index)
ff3 = ff3[start_date:end_date]
r_f = ff3['RF']/100
r_f.index = pd.to_datetime(idx_monthly.index)

In [None]:
temp = vit_3by3.copy()
temp.set_index('date', inplace=True)

vit_small_low = temp[(temp['size_decile_3'] == 2) & (temp['signal_decile_3'] == 1)]['Cap_VW_return']
vit_small_high = temp[(temp['size_decile_3'] == 2) & (temp['signal_decile_3'] == 3)]['Cap_VW_return']

vit_big_low = temp[(temp['size_decile_3'] == 3) & (temp['signal_decile_3'] == 1)]['Cap_VW_return']
vit_big_high = temp[(temp['size_decile_3'] == 3) & (temp['signal_decile_3'] == 3)]['Cap_VW_return']

vit = 1/2 * (vit_big_high + vit_small_high) - 1/2 * (vit_big_low + vit_small_low)

temp = cnn_3by3.copy()
temp.set_index('date', inplace=True)

cnn_small_low = temp[(temp['size_decile_3'] == 2) & (temp['signal_decile_3'] == 1)]['Cap_VW_return']
cnn_small_high = temp[(temp['size_decile_3'] == 2) & (temp['signal_decile_3'] == 3)]['Cap_VW_return']

cnn_big_low = temp[(temp['size_decile_3'] == 3) & (temp['signal_decile_3'] == 1)]['Cap_VW_return']
cnn_big_high = temp[(temp['size_decile_3'] == 3) & (temp['signal_decile_3'] == 3)]['Cap_VW_return']

cnn = 1/2 * (cnn_big_high + cnn_small_high) - 1/2 * (cnn_big_low + cnn_small_low)

vit_port = pd.concat([vit,vit_big_high,vit_big_low,vit_small_high,vit_small_low],axis=1)
vit_port.columns = ['ViT', 'Big_High', 'Big_Low', 'Small_High', 'Small_Low']  

cnn_port = pd.concat([cnn,cnn_big_high,cnn_big_low,cnn_small_high,cnn_small_low],axis=1)
cnn_port.columns = ['CNN', 'Big_High', 'Big_Low', 'Small_High', 'Small_Low']  

vit_port.index = idx_month_rt.index
cnn_port.index = idx_month_rt.index

vit_port.to_csv(r'Factor_port//ViT_Cap_VW.csv')
cnn_port.to_csv(r'Factor_port//CNN_Cap_VW.csv')

In [None]:
vit_port.loc[pd.to_datetime('2001-01-01')] = 0
vit_port.sort_index(inplace=True)

cnn_port.loc[pd.to_datetime('2001-01-01')] = 0
cnn_port.sort_index(inplace=True)

idx_month_rt.loc[pd.to_datetime('2001-01-01')] = 0
idx_month_rt.sort_index(inplace=True)

In [None]:
for_performance_table = []

In [None]:
plt.figure(figsize=(7, 4),dpi=400)
plt.ylim(-1.25,2.25)

plt.plot(vit_port.index, np.log(vit_port['ViT']+1).cumsum(), label='ViT', color='#DC343B',zorder=10,linewidth=1.5)
plt.plot(vit_port.index, np.log(vit_port['Big_High']+1).cumsum(), label='Big High', color='#0f4c81',zorder=9,linewidth=1.5)
plt.plot(vit_port.index, np.log(vit_port['Big_Low']+1).cumsum(), label='Big Low', color='#0f4c81', linestyle='--',zorder=8,linewidth=1.5)
plt.plot(vit_port.index, np.log(vit_port['Small_High']+1).cumsum(), label='Small High', color='#009b77',zorder=7,linewidth=1.5)
plt.plot(vit_port.index, np.log(vit_port['Small_Low']+1).cumsum(), label='Small Low', color='#009b77', linestyle='--',zorder=6,linewidth=1.5)

plt.plot(idx_month_rt.index, np.log(idx_month_rt+1).cumsum()['CAP_VW'], label='Market (CAP VW)', color='black',zorder=2, linewidth=1.5)
plt.plot(idx_month_rt.index, np.log(idx_month_rt+1).cumsum()['VW'], label='Market (VW)', color='black',zorder=2, linewidth=1.5, linestyle='--')
plt.plot(idx_month_rt.index, np.log(idx_month_rt+1).cumsum()['EW'], label='Market (EW)', color='black',zorder=2, linewidth=1.5, linestyle=':')

for_performance_table.append(vit_port['ViT'])
for_performance_table.append(vit_port['Big_High'])
for_performance_table.append(vit_port['Big_Low'])
for_performance_table.append(vit_port['Small_High'])
for_performance_table.append(vit_port['Small_Low'])

plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.grid()
plt.tight_layout()
plt.savefig('fig_vit_port-revision.png', dpi=400) 

In [None]:
plt.figure(figsize=(7, 4),dpi=400)
plt.ylim(-1.25,2.25)

plt.plot(cnn_port.index, np.log(cnn_port['CNN']+1).cumsum(), label='CNN', color='#FF6A13',zorder=10,linewidth=1.5)
plt.plot(cnn_port.index, np.log(cnn_port['Big_High']+1).cumsum(), label='Big High', color='#0f4c81',zorder=9,linewidth=1.5)
plt.plot(cnn_port.index, np.log(cnn_port['Big_Low']+1).cumsum(), label='Big Low', color='#0f4c81', linestyle='--',zorder=8,linewidth=1.5)
plt.plot(cnn_port.index, np.log(cnn_port['Small_High']+1).cumsum(), label='Small High', color='#009b77',zorder=7,linewidth=1.5)
plt.plot(cnn_port.index, np.log(cnn_port['Small_Low']+1).cumsum(), label='Small Low', color='#009b77', linestyle='--',zorder=6,linewidth=1.5)

plt.plot(idx_month_rt.index, np.log(idx_month_rt+1).cumsum()['CAP_VW'], label='Market (CAP VW)', color='black',zorder=2, linewidth=1.5)
plt.plot(idx_month_rt.index, np.log(idx_month_rt+1).cumsum()['VW'], label='Market (VW)', color='black',zorder=2, linewidth=1.5, linestyle='--')
plt.plot(idx_month_rt.index, np.log(idx_month_rt+1).cumsum()['EW'], label='Market (EW)', color='black',zorder=2, linewidth=1.5, linestyle=':')

for_performance_table.append(cnn_port['CNN'])
for_performance_table.append(cnn_port['Big_High'])
for_performance_table.append(cnn_port['Big_Low'])
for_performance_table.append(cnn_port['Small_High'])
for_performance_table.append(cnn_port['Small_Low'])
for_performance_table.append(idx_month_rt )

plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.grid()
plt.tight_layout()
plt.savefig('fig_cnn_port-revision.png', dpi=400) 

In [None]:
# 👇 proxy Line2D -> legend 
from matplotlib.lines import Line2D

proxy_lines = [
    Line2D([0], [0], color='#DC343B', label='Factor' + r'$_{\text{ViT}}$', linewidth=1.5),
    Line2D([0], [0], color='#FF6A13', label='Factor' + r'$_{\text{CNN}}$', linewidth=1.5),
    Line2D([0], [0], color='#0f4c81', label='Big High', linewidth=1.5),
    Line2D([0], [0], color='#0f4c81', linestyle='--', label='Big Low', linewidth=1.5),
    Line2D([0], [0], color='#009b77', label='Small High', linewidth=1.5),
    Line2D([0], [0], color='#009b77', linestyle='--', label='Small Low', linewidth=1.5),
    Line2D([0], [0], color='black', label='CAP VW', linewidth=1.5),
    Line2D([0], [0], color='black', linestyle='--', label='VW', linewidth=1.5),
    Line2D([0], [0], color='black', linestyle=':', label='EW', linewidth=1.5),
]

labels = [line.get_label() for line in proxy_lines]

# legend-only figure
fig_leg = plt.figure(figsize=(5, 0.1), dpi=400)
ax_leg = fig_leg.add_subplot(111)
ax_leg.axis('off')
fig_leg.legend(proxy_lines, labels, loc='center', ncol=len(proxy_lines), fontsize=7, frameon=True)
fig_leg.savefig(r'fig_legend_only_revision.png', dpi=400, bbox_inches='tight', transparent=True)
plt.close(fig_leg)

In [None]:
total_return_table = pd.concat(for_performance_table, axis=1).iloc[1:]

In [None]:
def cal_metric (rt):
    rt = rt.copy()
    annualized_return = f"{np.round(rt.mean()* 12,3):.3f}"
    annualized_vol = f"{np.round(rt.std() * np.sqrt(12),3):.3f}"
    annualized_excess_return = f"{np.round((rt - r_f).mean()* 12,3):.3f}"
    donw_side_vol = f"{np.round(rt[rt<0].std() * np.sqrt(12),3):.3f}"
    sharpe_ratio = f"{np.round(float(annualized_excess_return) / float(annualized_vol),3):.3f}"
    sortino_ratio = f"{np.round(float(annualized_excess_return) / float(donw_side_vol),3):.3f}"
    log_rt = np.log(rt/100+1)
    max_drawdown = f"{np.round((1 - (np.exp(log_rt.cumsum())/np.exp(log_rt.cumsum()).cummax()).min())*100 ,3):.3f}"
    calmar_ratio = f"{np.round(float(annualized_return) / float(max_drawdown),3):.3f}"
    return annualized_return,annualized_vol,donw_side_vol,sharpe_ratio,sortino_ratio,max_drawdown, calmar_ratio

In [None]:
total_return_table.std(ddof=1)

In [None]:
total_return_table.columns = ['ViT', 'ViT_Big_High', 'ViT_Big_Low', 'ViT_Small_High', 'ViT_Small_Low', 'CNN',
       'CNN_Big_High', 'CNN_Big_Low', 'CNN_Small_High', 'CNN_Small_Low', 'CAP_VW', 'VW', 'EW']

In [None]:
final_table = (total_return_table * 100).copy()

In [None]:
summary_stat = final_table.describe().T
summary_stat = summary_stat.drop(columns='count')
summary_stat['Skew'] = final_table.apply(lambda x: skew(x, bias=False))
summary_stat['Kurtosis'] = final_table.apply(lambda x: kurtosis(x, fisher=False, bias=False))
summary_stat['Standard error'] = summary_stat['std']/np.sqrt(288)
summary_stat = summary_stat[['mean', 'std', 'Standard error', 'min', '25%', '50%', '75%', 'max', 'Skew', 'Kurtosis']]

In [None]:
print(np.round(summary_stat.T,3).to_latex(float_format="%.3f" ))

In [None]:
risk_metric = final_table.apply(cal_metric)

In [None]:
risk_metric.index = ['Annualized return', 'Annualized volatility', 'Downside deviation', 'Sharpe ratio', 'Sortino ratio', 'Maximum drawdown', 'Calmar ratio']

gross_profit = final_table[(final_table > 0)].sum()
gross_loss = final_table[(final_table < 0)].sum()
profit_factor = (gross_profit / abs(gross_loss))


yearly_returns = final_table.groupby(final_table.index.year).sum()

# Count profitable vs unprofitable years
profitable_years = (yearly_returns > 0).sum()
unprofitable_years = (yearly_returns <= 0).sum()

risk_metric = risk_metric.T
risk_metric['Gross profit'] = gross_profit
risk_metric['Gross loss'] = gross_loss
risk_metric['Profit factor'] = profit_factor
risk_metric['Profitable years'] = profitable_years
risk_metric['Unprofitable years'] = unprofitable_years

risk_metric = risk_metric.T

In [None]:
print(np.round(risk_metric,3).to_latex(float_format="%.3f" ))