## LSTM Models

#### Using a Python 3.11 Kernel though a Virtual Conda Enviornment, packages must me downloaded thourgh Conda before running in this code. 

In [None]:
!pip install ipykernel
!pip install numpy pandas scikit-learn tensorflow keras matplotlib
!pip install --upgrade notebook
!pip install python-dotenv
!pip install joblib
!pip install scikit-learn
!pip install pandas
!pip install matplotlib
!pip install seaborn
!pip install threadpoolctl
!pip install tensorflow
!pip install scikit-image
!pip install scikit-learn-intelex
!pip install keras


In [None]:
import numpy as np
import pandas as pd
from datetime import date
from sklearn.metrics import mean_squared_error, accuracy_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.callbacks import EarlyStopping


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Article-level sentiment data:
finbert_articles = pd.read_csv("derived/nyt_with_finbert_sentiment.csv")
hybrid_articles = pd.read_csv("derived/nyt_with_gpt4_fallback_sentiment.csv")

plt.figure(figsize=(10,6))
sns.histplot(finbert_articles["FinBERT_score"], bins=30, color="orange", label="FinBERT-only", kde=True, stat="density", alpha=0.55)
sns.histplot(hybrid_articles["Sentiment"], bins=30, color="red", label="Hybrid (FinBERT+GPT-4)", kde=True, stat="density", alpha=0.55)

plt.title("Overlayed Distribution of Article Sentiment Scores")
plt.xlabel("Sentiment Score")
plt.ylabel("Density")
plt.legend()
plt.grid(axis='y', linestyle=':', alpha=0.5)
plt.tight_layout()
plt.show()


### FinBERT+GPT-4 LSTM Model

In [None]:
import numpy as np
import pandas as pd
from datetime import date
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.callbacks import EarlyStopping

data_path = "derived/final_merged_for_lstm.csv"
df = pd.read_csv(data_path, parse_dates=["Date"])

# Cleaning Up Columns
df = df.drop(columns=[col for col in ['key_0', 'Date_sent'] if col in df.columns])

# Ensuring all columns except Date are numeric
for col in df.columns:
    if col not in ['Date']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
df = df.dropna().reset_index(drop=True)

# Ensuring only meaningful features are used
exclude_cols = ['Date', 'Close', 'High', 'Low', 'Open', 'Volume'] # Remove prices and volume
feature_cols = ['Return', 'VIX', 'Sentiment']
# Adding lagged features below
N_LAGS = 2
for lag in range(1, N_LAGS+1):
    df[f"Return_lag{lag}"] = df["Return"].shift(lag)
    df[f"Sentiment_lag{lag}"] = df["Sentiment"].shift(lag)
feature_cols += [f"Return_lag{lag}" for lag in range(1, N_LAGS+1)]
feature_cols += [f"Sentiment_lag{lag}" for lag in range(1, N_LAGS+1)]
df = df.dropna().reset_index(drop=True)

# Train/val/test split by time
TRAIN_END = date(2021, 12, 31)
VAL_END = date(2022, 12, 31)
df["Date"] = pd.to_datetime(df["Date"])
df_train = df[df["Date"] <= pd.to_datetime(TRAIN_END)]
df_val = df[(df["Date"] > pd.to_datetime(TRAIN_END)) & (df["Date"] <= pd.to_datetime(VAL_END))]
df_test = df[df["Date"] > pd.to_datetime(VAL_END)]

# Scaling 
scaler = StandardScaler()
X_train = scaler.fit_transform(df_train[feature_cols])
X_val = scaler.transform(df_val[feature_cols])
X_test = scaler.transform(df_test[feature_cols])
y_train = df_train["Return"].values
y_val = df_val["Return"].values
y_test = df_test["Return"].values

# Creating LSTM Sequences
SEQ_LEN = 5
def create_sequences(x, y, seq_len):
    xs, ys = [], []
    for i in range(len(x) - seq_len):
        xs.append(x[i : i + seq_len])
        ys.append(y[i + seq_len])
    return np.array(xs), np.array(ys)

X_train_seq, y_train_seq = create_sequences(X_train, y_train, SEQ_LEN)
X_val_seq, y_val_seq = create_sequences(X_val, y_val, SEQ_LEN)
X_test_seq, y_test_seq = create_sequences(X_test, y_test, SEQ_LEN)

# LSTM Model
model = Sequential([
    LSTM(50, input_shape=(SEQ_LEN, X_train_seq.shape[2])),
    Dense(1),
])
model.compile(optimizer="adam", loss="mse")
early_stop = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

# Train
print("Training LSTM …")
model.fit(
    X_train_seq, y_train_seq,
    epochs=100,
    batch_size=16,
    validation_data=(X_val_seq, y_val_seq),
    callbacks=[early_stop],
    verbose=1,
)

# Evaluating
print("Evaluating …")
pred_test = model.predict(X_test_seq).flatten()
rmse = np.sqrt(mean_squared_error(y_test_seq, pred_test))
print(f"Test RMSE: {rmse:.6f}")
# Directional accuracy
actual_dir = (y_test_seq > 0)
pred_dir = (pred_test > 0)
acc = accuracy_score(actual_dir, pred_dir)
print(f"Directional accuracy: {acc:.2%}")

# Aligning index properly
df_out = df_test.iloc[SEQ_LEN:].copy().reset_index(drop=True)
df_out["Predicted_Return"] = pred_test
df_out.to_csv("derived/lstm_test_predictions.csv", index=False)

print(df_out[["Date", "Return", "Predicted_Return"]].head())
print("Final LSTM model summary:")
model.summary()

### FinBERT LSTM Model

In [None]:
import numpy as np
import pandas as pd
from datetime import date
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.callbacks import EarlyStopping

data_path = "derived/final_merged_FinBERT_for_lstm.csv"
df = pd.read_csv(data_path, parse_dates=["Date"])

# Cleaning columns 
df = df.drop(columns=[col for col in ['key_0', 'Date_sent'] if col in df.columns])

# Remove column headers like '^GSPC' in Close/Open/etc (strip out non-numeric entries)
for col in ['Close', 'High', 'Low', 'Open', 'Volume']:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')
if "VIX" in df.columns:
    df["VIX"] = pd.to_numeric(df["VIX"], errors='coerce')

# Ensure only 'Date' is not numeric
for col in df.columns:
    if col != 'Date':
        df[col] = pd.to_numeric(df[col], errors='coerce')

df = df.dropna().reset_index(drop=True)

# Feature Engineering: Adding lags for target & sentiment 
N_LAGS = 2
for lag in range(1, N_LAGS+1):
    df[f"Return_lag{lag}"] = df["Return"].shift(lag)
    df[f"FinBERT_score_lag{lag}"] = df["FinBERT_score"].shift(lag)
df = df.dropna().reset_index(drop=True)

# Train/val/test split 
TRAIN_END = date(2021, 12, 31)
VAL_END = date(2022, 12, 31)

df["Date"] = pd.to_datetime(df["Date"])
df_train = df[df["Date"] <= pd.to_datetime(TRAIN_END)]
df_val = df[(df["Date"] > pd.to_datetime(TRAIN_END)) & (df["Date"] <= pd.to_datetime(VAL_END))]
df_test = df[df["Date"] > pd.to_datetime(VAL_END)]

# Features and Scaling
FEATURES = ['Close', 'High', 'Low', 'Open', 'Volume', 'VIX', 'FinBERT_score'] + \
           [f"Return_lag{lag}" for lag in range(1, N_LAGS+1)] + \
           [f"FinBERT_score_lag{lag}" for lag in range(1, N_LAGS+1)]
X_train = df_train[FEATURES].astype(np.float32).values
X_val = df_val[FEATURES].astype(np.float32).values
X_test = df_test[FEATURES].astype(np.float32).values
y_train = df_train["Return"].values
y_val = df_val["Return"].values
y_test = df_test["Return"].values

# Scaling Features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# Creating LSTM sequences (window=5)
SEQ_LEN = 5
def create_sequences(x, y, seq_len):
    xs, ys = [], []
    for i in range(len(x) - seq_len):
        xs.append(x[i : i + seq_len])
        ys.append(y[i + seq_len])
    return np.array(xs), np.array(ys)
X_train_seq, y_train_seq = create_sequences(X_train, y_train, SEQ_LEN)
X_val_seq, y_val_seq = create_sequences(X_val, y_val, SEQ_LEN)
X_test_seq, y_test_seq = create_sequences(X_test, y_test, SEQ_LEN)

print(f"Train samples: {X_train_seq.shape[0]}, Val: {X_val_seq.shape[0]}, Test: {X_test_seq.shape[0]}")

# LSTM Model
model = Sequential([
    LSTM(50, input_shape=(SEQ_LEN, X_train_seq.shape[2])),
    Dense(1),
])
model.compile(optimizer="adam", loss="mse")
early_stop = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

# Train
print("Training LSTM …")
model.fit(
    X_train_seq, y_train_seq,
    epochs=100,
    batch_size=16,
    validation_data=(X_val_seq, y_val_seq),
    callbacks=[early_stop],
    verbose=1,
)

# Evaluate
print("Evaluating …")
pred_test = model.predict(X_test_seq).flatten()
rmse = np.sqrt(mean_squared_error(y_test_seq, pred_test))
print(f"Test RMSE: {rmse:.6f}")
actual_dir = (y_test_seq > 0)
pred_dir = (pred_test > 0)
acc = accuracy_score(actual_dir, pred_dir)
print(f"Directional accuracy: {acc:.2%}")


df_test = df_test.iloc[SEQ_LEN:].copy()  # align with y_test_seq
df_test["Predicted_Return"] = pred_test
df_test.to_csv("derived/lstm_FinBERT_only_test_predictions.csv", index=False)

print(df_test[["Date", "Return", "Predicted_Return"]].head())
print("Final LSTM model summary:")
model.summary()


### Diagnostics

In [None]:
import pandas as pd

for name in ["derived/lstm_test_predictions.csv",
             "derived/lstm_FinBERT_only_test_predictions.csv"]:
    df = pd.read_csv(name, parse_dates=["Date"])
    dup = df.duplicated(subset=["Date"]).sum()
    print(name, "rows:", len(df), "duplicates:", dup)

## Forecast Bias Tests

### Biasness Test for Hybrid

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
from scipy import stats
from tabulate import tabulate
from sklearn.metrics import accuracy_score


df_gpt = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

df = (
    df_gpt[["Date", "Return", "Predicted_Return"]]
        .rename(columns={"Return": "Market_Return", "Predicted_Return": "LSTM_GPT4"})
        .merge(
            df_fbert[["Date", "Predicted_Return"]].rename(columns={"Predicted_Return": "LSTM_FinBERT"}),
            on="Date", how="inner"
        )
        .merge(
            market[["Date", "Return", "VIX"]].rename(columns={"Return": "Market_True_Return"}),
            on="Date", how="inner"
        )
        .dropna(subset=["Market_True_Return", "LSTM_GPT4", "LSTM_FinBERT"])
        .reset_index(drop=True)
)

# Arrays
y_true   = df["Market_True_Return"].values
pred_gpt = df["LSTM_GPT4"].values
pred_fb  = df["LSTM_FinBERT"].values

# Forecasting R² function (explained variance)
def forecast_r2(y, yhat):
    ss_res = np.sum((y - yhat) ** 2)
    ss_tot = np.sum((y - np.mean(y)) ** 2)
    return 1 - ss_res / ss_tot

# Metric aggregation
def signal_metrics(y, yhat):
    mse  = np.mean((y - yhat) ** 2)
    mae  = np.mean(np.abs(y - yhat))
    acc  = np.mean(np.sign(y) == np.sign(yhat))
    strat_ret = np.where(yhat > 0, 1, -1) * y
    sharpe = np.mean(strat_ret) / (np.std(strat_ret) + 1e-9) * np.sqrt(252)
    r2 = forecast_r2(y, yhat)
    return dict(MSE=mse, MAE=mae, Directional_Acc=acc, Sharpe=sharpe, R2=r2)

m_gpt = signal_metrics(y_true, pred_gpt)
m_fb  = signal_metrics(y_true, pred_fb)

# Diebold–Mariano test (squared-error loss)
def dm_test(e1, e2):
    d = e1 - e2
    T = len(d)
    d_mean = np.mean(d)
    d_var = np.var(d, ddof=1)
    dm_stat = d_mean / np.sqrt(d_var / T)
    p_value = 2 * (1 - stats.norm.cdf(abs(dm_stat)))
    return dm_stat, p_value

dm_stat, dm_p = dm_test((y_true - pred_gpt) ** 2, (y_true - pred_fb) ** 2)

# Regime-dependent Sharpe (split by VIX median)
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")
vix_med = df["VIX"].median()
reg = {}
for regime_mask, label in [(df["VIX"] > vix_med, "High-VIX"), (df["VIX"] <= vix_med, "Low-VIX")]:
    idx = regime_mask.values
    reg[label] = dict(
        GPT4=signal_metrics(y_true[idx], pred_gpt[idx])["Sharpe"],
        FinBERT=signal_metrics(y_true[idx], pred_fb[idx])["Sharpe"]
    )

# Forecast-bias diagnostics (optional, still useful for regression analysis)
def bias_tests(y, yhat, label):
    err = y - yhat
    t, p = stats.ttest_1samp(err, 0.0)
    X = sm.add_constant(yhat)
    mz = sm.OLS(y, X).fit()
    R = np.eye(2)
    q = np.array([0, 1])
    ftest = mz.f_test((R, q))
    return dict(
        Model=label,
        Mean_Error=err.mean(),
        MeanErr_tstat=t,
        MeanErr_pval=p,
        MZ_alpha=mz.params[0],
        MZ_beta=mz.params[1],
        MZ_alpha_p=mz.pvalues[0],
        MZ_beta_p=mz.pvalues[1],
        MZ_F_pvalue=float(ftest.pvalue)
    )

bias_gpt = bias_tests(y_true, pred_gpt, "Hybrid (GPT-4)")
bias_fb = bias_tests(y_true, pred_fb, "FinBERT-only")

print("\n### Forecast-Evaluation Metrics")
eval_tbl = pd.DataFrame([m_gpt, m_fb], index=["Hybrid", "FinBERT"])
print(tabulate(eval_tbl, headers="keys", tablefmt="github", floatfmt=".4f"))

print("\nDiebold–Mariano statistic: {:.3f}   p-value: {:.3f}".format(dm_stat, dm_p))

print("\n### Regime-dependent Sharpe")
print(tabulate(pd.DataFrame(reg).T, headers="keys", tablefmt="github", floatfmt=".3f"))

bias_tbl = pd.DataFrame([bias_gpt, bias_fb]).set_index("Model")
print("\n### Forecast-Bias Diagnostics")
print(tabulate(bias_tbl, headers="keys", tablefmt="github", floatfmt=".4f"))

# Validation accuracy for both models (directional up/down accuracy)
print("\n### Validation Accuracy")
val_acc_gpt = accuracy_score((df["Market_True_Return"].values > 0), (df["LSTM_GPT4"].values > 0))
val_acc_fb = accuracy_score((df["Market_True_Return"].values > 0), (df["LSTM_FinBERT"].values > 0))
print(f"Hybrid (GPT-4) Validation Accuracy:  {val_acc_gpt:.2%}")
print(f"FinBERT-only Validation Accuracy:    {val_acc_fb:.2%}")

## Result Comparison and Visualizaiton

In [None]:
import matplotlib.pyplot as plt
print(plt.style.available)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, accuracy_score, precision_score,
    recall_score, f1_score, roc_auc_score, roc_curve, r2_score
)

fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
gpt4 = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
market = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# If any columns are objects (str) = to numeric for math/plotting
for df in [fbert, gpt4]:
    for col in df.columns:
        if col != "Date":
            df[col] = pd.to_numeric(df[col], errors="coerce")
    df.dropna(inplace=True)

# General settings
plt.style.use("seaborn-v0_8-darkgrid")
sns.set(font_scale=1.2)
window = 21  # 1 month rolling

# Mergeing for side-by-side plots
df = fbert[["Date", "Return", "VIX", "FinBERT_score", "Predicted_Return"]].copy()
df = df.rename(columns={"Predicted_Return": "FinBERT_Pred"})
df["GPT4_Pred"] = gpt4["Predicted_Return"].values
df["GPT4_Sentiment"] = gpt4["Sentiment"].values if "Sentiment" in gpt4 else np.nan

# Actual vs LSTM Forecasts: Time Series Overlap
# Shows daily and cumulative return performance for market, FinBERT-only, and FinBERT+GPT-4 models, which model tracks or outperforms the market over time?
plt.figure(figsize=(17,7))
plt.plot(df["Date"], df["Return"], color='black', label='Market Return', linewidth=1, linestyle='--')
plt.plot(df["Date"], df["FinBERT_Pred"], color='orange', label='FinBERT-only LSTM', alpha=0.8)
plt.plot(df["Date"], df["GPT4_Pred"], color='red', label='FinBERT+GPT-4 LSTM', alpha=0.7)
plt.ylabel("Return")
plt.title("Market Returns vs LSTM Model Forecasts")
plt.legend()
plt.show()

# Model Error: Residuals Over Time 
# Shows model residuals, identifying periods where either model systematically overshoots or undershoots
plt.figure(figsize=(17,7))
plt.plot(df["Date"], df["Return"] - df["FinBERT_Pred"], label="Error: FinBERT LSTM", color="orange", alpha=0.6)
plt.plot(df["Date"], df["Return"] - df["GPT4_Pred"], label="Error: GPT-4 LSTM", color="red", alpha=0.6)
plt.axhline(0, color="black", linewidth=1, linestyle=":")
plt.ylabel("Residual (Error)")
plt.title("Model Residuals: Market - Model Forecast")
plt.legend()
plt.show()

# Distribution of Forecasts: Histogram & KDE 
# Distribution and density of returns/forecasts to see if model returns are over/under-dispersed
plt.figure(figsize=(14,5))
sns.histplot(df["Return"], label="Market Return", color="black", kde=True, stat="density", bins=40)
sns.histplot(df["FinBERT_Pred"], label="FinBERT LSTM", color="orange", kde=True, stat="density", bins=40, alpha=0.5)
sns.histplot(df["GPT4_Pred"], label="GPT-4 LSTM", color="dodgerblue", kde=True, stat="density", bins=40, alpha=0.5)
plt.legend()
plt.title("Distribution of Market Returns vs LSTM Model Forecasts")
plt.show()

# Actual vs Predicted: Scatter Plots and Regression Fit
# Scatter/regression of predicted vs actual, with R² value (not shown but you can print it), to test linear forecast strength
fig, axs = plt.subplots(1, 2, figsize=(16,6), sharey=True)
sns.regplot(x=df["Return"], y=df["FinBERT_Pred"], ax=axs[0], line_kws={"color": "orange"})
axs[0].set_title("FinBERT LSTM: Actual vs Predicted")
axs[0].set_xlabel("Actual Return")
axs[0].set_ylabel("Predicted Return")
sns.regplot(x=df["Return"], y=df["GPT4_Pred"], ax=axs[1], line_kws={"color": "dodgerblue"})
axs[1].set_title("GPT-4 LSTM: Actual vs Predicted")
axs[1].set_xlabel("Actual Return")
plt.show()

# Rolling Model RMSE (21d window)
# Rolling RMSE: when is a model more/less accurate? Is one model consistently better?
rmse_fbert = (df["Return"] - df["FinBERT_Pred"]).rolling(window).apply(lambda x: np.sqrt(np.mean(x**2)))
rmse_gpt4 = (df["Return"] - df["GPT4_Pred"]).rolling(window).apply(lambda x: np.sqrt(np.mean(x**2)))
plt.figure(figsize=(17,6))
plt.plot(df["Date"], rmse_fbert, label="Rolling RMSE: FinBERT LSTM", color="orange")
plt.plot(df["Date"], rmse_gpt4, label="Rolling RMSE: GPT-4 LSTM", color="dodgerblue")
plt.ylabel("RMSE")
plt.title("21-Day Rolling RMSE: Model Performance Over Time")
plt.legend()
plt.show()


import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    precision_recall_curve, average_precision_score,
    roc_curve, roc_auc_score
)

# Aligning both models rows
roc_df = df.dropna(subset=["Return", "FinBERT_Pred", "GPT4_Pred"]).copy()

# Directional Classification: Up/Down Day target
y_true        = (roc_df["Return"].values > 0).astype(int)   # 1 = up-day
score_fbert   = roc_df["FinBERT_Pred"].values               # signed predictions
score_gpt4    = roc_df["GPT4_Pred"].values

# Base precision for GPT-4 sign-only model
gpt_sign      = (score_gpt4 > 0).astype(int)           # 1 if model predicts “up”
base_prec_gpt = precision_score(y_true, gpt_sign)     # precision at threshold 0
base_rec_gpt  = recall_score   (y_true, gpt_sign)     # recall at threshold 0

# Precision-Recall Curve: How well does prediction magnitude separate correct from incorrect sign?
prec_fbert, rec_fbert, _ = precision_recall_curve(y_true, score_fbert)
prec_gpt4, rec_gpt4, _   = precision_recall_curve(y_true, score_gpt4)
ap_fbert = average_precision_score(y_true, score_fbert)
ap_gpt4  = average_precision_score(y_true, score_gpt4)

plt.figure(figsize=(8, 6))
plt.plot(rec_fbert, prec_fbert, label=f"FinBERT LSTM (AP = {ap_fbert:.2f})",  alpha=0.9,  color="orange")
plt.plot(rec_gpt4,  prec_gpt4, label=f"GPT-4 LSTM (AP = {ap_gpt4:.2f})",     alpha=0.9,  color="dodgerblue")
plt.hlines(y_true.mean(), 0, 1, linestyle="--", color="grey",
           label="Random baseline (π = P(up))")
plt.hlines(base_prec_gpt, 0, 1,
           linestyle=":",  color="red",
           label=f"GPT-4 sign-only baseline (precision = {base_prec_gpt:.2f})")

# Marking the exact (recall, precision) point of the sign-only model
plt.plot(base_rec_gpt, base_prec_gpt, marker="o", color="red")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision–Recall Curve: Direction Classification")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)
plt.tight_layout()
plt.show()

# ROC Curve:
fpr_fbert, tpr_fbert, _ = roc_curve(y_true, score_fbert)
fpr_gpt4,  tpr_gpt4,  _ = roc_curve(y_true, score_gpt4)
auc_fbert = roc_auc_score(y_true, score_fbert)
auc_gpt4  = roc_auc_score(y_true, score_gpt4)

plt.figure(figsize=(8, 6))
plt.plot(fpr_fbert, tpr_fbert,
         label=f"FinBERT LSTM (AUC = {auc_fbert:.2f})",  alpha=0.9, color="orange")
plt.plot(fpr_gpt4,  tpr_gpt4,
         label=f"GPT-4 LSTM (AUC = {auc_gpt4:.2f})", alpha=0.9, color="dodgerblue")
plt.plot([0, 1], [0, 1], 'k--', label="Random classifier")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve: Direction Classification")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)
plt.tight_layout()
plt.show()

# Feature Importance (Permutation, quick-and-dirty)
# Feature importance via Random Forest, as a check on which signals the market return is most related to
from sklearn.inspection import permutation_importance
from sklearn.ensemble import RandomForestRegressor
# Quick look at which features most drive market returns, for context
X = df[[col for col in df.columns if col not in ['Date', 'Return', 'FinBERT_Pred', 'GPT4_Pred']]]
rf = RandomForestRegressor(n_estimators=50)
rf.fit(X, df["Return"])
perm = permutation_importance(rf, X, df["Return"], n_repeats=10)
importances = pd.Series(perm.importances_mean, index=X.columns).sort_values(ascending=False)
plt.figure(figsize=(12,6))
importances.plot(kind='bar')
plt.ylabel("Permutation Importance")
plt.title("Feature Importance for Market Return (Random Forest Proxy)")
plt.tight_layout()
plt.show()

# plot the two autocorrelation plots inside the same figure
# define autocorrelation plot function
def autocorrelation_plot(series, ax=None):
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 5))
    import statsmodels.api as sm
    sm.graphics.tsa.plot_acf(series, lags=40, ax=ax)
    ax.set_xlabel("Lags")
    ax.set_ylabel("Autocorrelation")
    return ax
fig, axs = plt.subplots(1, 2, figsize=(16, 6))
autocorrelation_plot(df["Return"] - df["FinBERT_Pred"], ax=axs[0])
axs[0].set_title("Error Autocorrelation: FinBERT LSTM")
autocorrelation_plot(df["Return"] - df["GPT4_Pred"], ax=axs[1])
axs[1].set_title("Error Autocorrelation: GPT-4 LSTM")
plt.tight_layout()
plt.show()

# vix volatility plot
plt.figure(figsize=(14, 6))
plt.plot(df["Date"], df["VIX"], label="VIX", color="purple")
plt.title("VIX Volatility Over Time")
plt.xlabel("Date")
plt.ylabel("VIX")
plt.legend()    
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

df_gpt = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market_df = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# Merge on Date
df = df_gpt[["Date", "Return", "Predicted_Return"]].rename(
    columns={"Return": "Market_Return", "Predicted_Return": "LSTM_GPT4"}
).merge(
    df_fbert[["Date", "Predicted_Return"]].rename(
        columns={"Predicted_Return": "LSTM_FinBERT"}
    ),
    on="Date", how="inner"
).merge(
    market_df[["Date", "Return", "VIX"]].rename(
        columns={"Return": "Market_True_Return"}
    ),
    on="Date", how="left"
)
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

# Signal-based returns
def trading_signal_returns(true_returns, predicted_returns):
    signal = np.where(predicted_returns > 0, 1, -1)
    return signal * true_returns

# Compounded cumulative return (not just cumsum)
def compound_cumret(returns):
    return np.cumprod(1 + returns) - 1

# Masks for valid rows (overlapping dates)
mask = df["Market_True_Return"].notnull() & df["LSTM_GPT4"].notnull() & df["LSTM_FinBERT"].notnull()
if not mask.any():
    raise ValueError("No valid rows after masking. Check merge.")

df_valid = df[mask].copy().reset_index(drop=True)

# Strategy returns
df_valid["LSTM_GPT4_Strategy"] = trading_signal_returns(df_valid["Market_True_Return"], df_valid["LSTM_GPT4"])
df_valid["FinBERT_Strategy"] = trading_signal_returns(df_valid["Market_True_Return"], df_valid["LSTM_FinBERT"])

# Cumulative compounded returns
df_valid["Market_CumReturn"] = compound_cumret(df_valid["Market_True_Return"])
df_valid["LSTM_GPT4_CumReturn"] = compound_cumret(df_valid["LSTM_GPT4_Strategy"])
df_valid["FinBERT_CumReturn"] = compound_cumret(df_valid["FinBERT_Strategy"])

plt.figure(figsize=(12, 6))
plt.plot(df_valid["Date"], df_valid["Market_CumReturn"], label="Market Cumulative Return", color='black', linewidth=2)
plt.plot(df_valid["Date"], df_valid["LSTM_GPT4_CumReturn"], label="Hybrid LSTM-GPT4 Cumulative Return", color='red')
plt.plot(df_valid["Date"], df_valid["FinBERT_CumReturn"], label="FinBERT-only Cumulative Return", color='darkorange')

# Add shaded areas
start1 = pd.to_datetime("2023-06-01")
end1 = pd.to_datetime("2023-07-01")
start2 = pd.to_datetime("2023-11-15")
end2 = pd.to_datetime("2023-12-01")
start3 = pd.to_datetime("2023-03-28")
end3 = pd.to_datetime("2023-04-01")
plt.axvspan(start1, end1, color='grey', alpha=0.15, label='Strong Divergence Period')
plt.axvspan(start2, end2, color='grey', alpha=0.15)
plt.axvspan(start3, end3, color='grey', alpha=0.15)

plt.grid(True, which='both', linestyle='--', linewidth=0.4, color='black', alpha=0.7)
plt.axhline(0, color='black', linestyle='-', linewidth=0.9)
plt.gca().set_facecolor("#f0f0f0e1")
plt.title("Cumulative Compounded Returns Over Time")
plt.xlabel("Date")
plt.ylabel("Cumulative Return (Compounded)")
plt.legend()
plt.show()



### Cumulative Compounding Returns

In [None]:
import pandas as pd
import numpy as np


df_gpt = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market_df = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# Merge on Date
df = df_gpt[["Date", "Return", "Predicted_Return"]].rename(
    columns={"Return": "Market_Return", "Predicted_Return": "LSTM_GPT4"}
).merge(
    df_fbert[["Date", "Predicted_Return"]].rename(
        columns={"Predicted_Return": "LSTM_FinBERT"}
    ),
    on="Date", how="inner"
).merge(
    market_df[["Date", "Return", "VIX"]].rename(
        columns={"Return": "Market_True_Return"}
    ),
    on="Date", how="left"
)
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

# Computing daily returns for each strategy
# Hybrid strategy: go long if LSTM_GPT4 > 0, short if < 0, flat if == 0
df['Hybrid_Strategy_Return'] = np.sign(df['LSTM_GPT4']) * df['Market_True_Return']
# FinBERT-only strategy
df['FinBERT_Strategy_Return'] = np.sign(df['LSTM_FinBERT']) * df['Market_True_Return']
# Buy-and-hold: just the market return itself
df['Market_Strategy_Return'] = df['Market_True_Return']

# Computing cumulative returns (wealth paths)
def to_cum_wealth(returns):
    return np.cumprod(1 + returns)

df['Hybrid_Wealth'] = to_cum_wealth(df['Hybrid_Strategy_Return'])
df['FinBERT_Wealth'] = to_cum_wealth(df['FinBERT_Strategy_Return'])
df['Market_Wealth'] = to_cum_wealth(df['Market_Strategy_Return'])

# Final values and cumulative returns
start_value = 1.0
hybrid_final = df['Hybrid_Wealth'].iloc[-1]
finbert_final = df['FinBERT_Wealth'].iloc[-1]
market_final = df['Market_Wealth'].iloc[-1]

hybrid_cum_return = (hybrid_final - start_value) / start_value * 100
finbert_cum_return = (finbert_final - start_value) / start_value * 100
market_cum_return = (market_final - start_value) / start_value * 100

# Print formatted results for your paragraph
print(f"Hybrid strategy’s final wealth: ${hybrid_final:.2f} (cumulative return: {hybrid_cum_return:.1f}%)")
print(f"S&P 500 (buy-and-hold) final wealth: ${market_final:.2f} (cumulative return: {market_cum_return:.1f}%)")
print(f"FinBERT-only strategy final wealth: ${finbert_final:.2f} (cumulative return: {finbert_cum_return:.1f}%)")


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, accuracy_score,
    precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
)
import seaborn as sns


def compounded_cum_return(returns):
    """Compute compounded cumulative return time series."""
    return pd.Series(np.cumprod(1 + returns) - 1)

def compute_all_metrics(y_true, y_pred):
    # Regression metrics
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    # Classification (directional)
    y_true_dir = (y_true > 0).astype(int)
    y_pred_dir = (y_pred > 0).astype(int)
    accuracy = accuracy_score(y_true_dir, y_pred_dir)
    precision = precision_score(y_true_dir, y_pred_dir, zero_division=0)
    recall = recall_score(y_true_dir, y_pred_dir, zero_division=0)
    f1 = f1_score(y_true_dir, y_pred_dir, zero_division=0)
    roc_auc = roc_auc_score(y_true_dir, y_pred_dir)
    # Strategy returns
    signal = np.where(y_pred > 0, 1, -1)
    daily_pnl = signal * y_true
    rolling_cum = compounded_cum_return(daily_pnl)
    sharpe = np.mean(daily_pnl) / (np.std(daily_pnl) + 1e-9) * np.sqrt(252)
    # Max drawdown (from compounded cumulative return)
    equity = rolling_cum + 1
    running_max = np.maximum.accumulate(equity)
    drawdown = running_max - equity
    max_dd = drawdown.max()
    return {
        "MSE": mse,
        "MAE": mae,
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1": f1,
        "ROC AUC": roc_auc,
        "Rolling_CumReturn": rolling_cum,
        "Sharpe": sharpe,
        "Max_Drawdown": max_dd,
        "Daily_PnL": daily_pnl,
        "Signal": signal
    }

def rolling_directional_acc(true_ret, pred_ret, win=63):
    """Percentage of days inside a rolling window where sign(pred)==sign(true)."""
    hit = (np.sign(true_ret) == np.sign(pred_ret)).astype(int)
    return pd.Series(hit).rolling(win, min_periods=1).mean() * 100

def rolling_sharpe(returns, win=63):
    """Rolling annualized Sharpe ratio over a specified window."""
    returns = pd.Series(returns)
    mean_ret = returns.rolling(win).mean()
    std_ret  = returns.rolling(win).std(ddof=0)
    return (mean_ret / (std_ret + 1e-9)) * np.sqrt(252)

def plot_conf_mat(y_true, y_pred, title, ax):
    cm = confusion_matrix(y_true > 0, y_pred > 0)
    sns.heatmap(cm, annot=True, fmt='d', cmap="Blues", cbar=False,
                xticklabels=["Down", "Up"], yticklabels=["Down", "Up"], ax=ax)
    ax.set_xlabel("Model Signal")
    ax.set_ylabel("Market Direction")
    ax.set_title(title)

def turnover(signal):
    """Average fraction of days where direction flips."""
    return (np.abs(np.diff(signal)) / 2).mean()


df_gpt = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market_df = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# Merging and Align Data by Date
df = pd.DataFrame({
    "Date": df_gpt["Date"],
    "LSTM_GPT4": df_gpt["Predicted_Return"],
    "LSTM_FinBERT": df_fbert["Predicted_Return"],
})

# Using "Return" from market_df as the ground truth for all performance metrics
df = df.merge(market_df[["Date", "Return", "VIX"]].rename(columns={"Return": "Market_True_Return"}), on="Date", how="left")
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

# Apply a joint mask: all series must be valid
mask = df["Market_True_Return"].notnull() & df["LSTM_GPT4"].notnull() & df["LSTM_FinBERT"].notnull()
df_valid = df[mask].copy().reset_index(drop=True)
print(f"\nValid rows for all models: {len(df_valid)}")

# Metrics and Cumulative Return Series

metrics_gpt = compute_all_metrics(df_valid["Market_True_Return"].values, df_valid["LSTM_GPT4"].values)
metrics_finbert = compute_all_metrics(df_valid["Market_True_Return"].values, df_valid["LSTM_FinBERT"].values)
market_cum = compounded_cum_return(df_valid["Market_True_Return"].values)

# Rolling Directional Accuracy and Sharpe

win = 63
mkt_true = df_valid["Market_True_Return"].values
pred_gpt = df_valid["LSTM_GPT4"].values
pred_fbt = df_valid["LSTM_FinBERT"].values
dates_all = df_valid["Date"].values

da_gpt   = rolling_directional_acc(mkt_true, pred_gpt, win)
da_fbt   = rolling_directional_acc(mkt_true, pred_fbt, win)
sharpe_gpt   = rolling_sharpe(metrics_gpt["Daily_PnL"], win)
sharpe_fbert = rolling_sharpe(metrics_finbert["Daily_PnL"], win)

# Trim first (win-1) observations
trim = win - 1
dates_trim = dates_all[trim:]
sharpe_gpt_trim = sharpe_gpt.iloc[trim:]
sharpe_fbt_trim = sharpe_fbert.iloc[trim:]
da_gpt_trim = da_gpt.iloc[trim:]
da_fbt_trim = da_fbt.iloc[trim:]

# Plot Rolling Sharpe Ratio
plt.figure(figsize=(15,5))
plt.plot(dates_trim, sharpe_gpt_trim, label="Hybrid rolling Sharpe",  c="red")
plt.plot(dates_trim, sharpe_fbt_trim, label="FinBERT rolling Sharpe", c="darkorange")
plt.axhline(0, ls='--', c='k')
plt.title(f"Rolling Sharpe Ratio ({win}-day Window)")
plt.ylabel("Sharpe Ratio (annualized)")
plt.xlabel("Date")
plt.legend()
plt.tight_layout()
plt.show()

# Plot Rolling Directional Accuracy
plt.figure(figsize=(15,5))
plt.plot(dates_trim, da_gpt_trim, label="Hybrid directional accuracy",  c="red")
plt.plot(dates_trim, da_fbt_trim, label="FinBERT directional accuracy", c="darkorange")
plt.axhline(50, ls='--', c='k', lw=0.8)       # coin-flip baseline
plt.ylim(0, 100)
plt.title(f"Rolling Directional Accuracy ({win}-day Window)")
plt.ylabel("Accuracy (%)")
plt.xlabel("Date")
plt.legend()
plt.tight_layout()
plt.show()

# Confusion Matrix
fig, axes = plt.subplots(1,2,figsize=(10,4))
plot_conf_mat(df_valid["Market_True_Return"], df_valid["LSTM_GPT4"], "Hybrid", axes[0])
plot_conf_mat(df_valid["Market_True_Return"], df_valid["LSTM_FinBERT"], "FinBERT-only", axes[1])
plt.tight_layout()
plt.show()

# Turnover & Net-Long Ratio
sig_gpt   = metrics_gpt["Signal"]
sig_fbert = metrics_finbert["Signal"]
turn = [turnover(sig_gpt), turnover(sig_fbert)]
netL = [(sig_gpt == 1).mean(), (sig_fbert == 1).mean()]   # net-long ratios

labels = ["Hybrid\n(FinBERT+GPT-4)", "FinBERT-only"]
x = np.arange(len(labels))

fig, ax1 = plt.subplots(figsize=(8, 5))
width = 0.35
bars1 = ax1.bar(x - width/2, turn, width, color="steelblue", alpha=.85, label="Turnover")
ax1.set_ylabel("Turnover (% of days)", color="steelblue")
ax1.set_ylim(0, 1)
ax1.set_xticks(x)
ax1.set_xticklabels(labels, fontsize=11)
ax1.tick_params(axis='y', labelcolor="steelblue")
ax2 = ax1.twinx()
bars2 = ax2.bar(x + width/2, netL, width, color="darkorange", alpha=.85, label="Net-Long Ratio")
ax2.set_ylabel("Net-Long Exposure", color="darkorange")
ax2.set_ylim(0, 1)
ax2.tick_params(axis='y', labelcolor="darkorange")
for rect in list(bars1) + list(bars2):
    height = rect.get_height()
    ax = ax1 if rect in bars1 else ax2
    ax.text(rect.get_x() + rect.get_width()/2, height + 0.02,
            f"{height:.2%}", ha='center', va='bottom', fontsize=9)
plt.title("Turnover vs Net-Long Ratio", pad=18)
plt.tight_layout()
plt.show()


## Comparative Measurements:

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, accuracy_score,
    precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
)
from scipy.stats import norm

# Diebold-Mariano test function
def diebold_mariano_test(e1, e2, h=1, alternative='two-sided'):
    """
    Diebold-Mariano test for equal predictive accuracy.
    e1, e2: forecast errors (arrays)
    h: forecast horizon (1 for one-step ahead)
    alternative: 'two-sided', 'less', 'greater'
    """
    d = e1 - e2
    mean_d = np.mean(d)
    T = len(d)
    # Newey-West variance estimator with lag h-1
    gamma = [np.cov(d[:-lag], d[lag:])[0,1] if lag != 0 else np.var(d, ddof=1) for lag in range(h)]
    V_d = gamma[0] + 2 * np.sum(gamma[1:])
    dm_stat = mean_d / np.sqrt(V_d / T)
    if alternative == 'two-sided':
        p_value = 2 * (1 - norm.cdf(np.abs(dm_stat)))
    elif alternative == 'less':
        p_value = norm.cdf(dm_stat)
    else:
        p_value = 1 - norm.cdf(dm_stat)
    return dm_stat, p_value

df_gpt = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market_df = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

df = pd.DataFrame({
    "Date": df_gpt["Date"],
    "Market_Return": df_gpt["Return"],
    "LSTM_GPT4": df_gpt["Predicted_Return"],
    "LSTM_FinBERT": df_fbert["Predicted_Return"],
})
df = df.merge(market_df[["Date", "Return", "VIX"]].rename(columns={"Return": "Market_True_Return"}), on="Date", how="left")
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

# Signal-based trading returns
def trading_signal_returns(true_returns, predicted_returns):
    signal = np.where(predicted_returns > 0, 1, -1)
    return signal * true_returns

# Computing all metrics for each model (with rolling mean/std)
def compute_all_metrics(true_returns, predicted_returns, rolling_window=60):
    if len(true_returns) == 0 or len(predicted_returns) == 0:
        raise ValueError("Empty input arrays! Check your mask and input data.")
    mse = mean_squared_error(true_returns, predicted_returns)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(true_returns, predicted_returns)

    true_up = (true_returns > 0).astype(int)
    pred_up = (predicted_returns > 0).astype(int)

    acc = accuracy_score(true_up, pred_up)
    prec = precision_score(true_up, pred_up, zero_division=0)
    rec = recall_score(true_up, pred_up, zero_division=0)
    f1 = f1_score(true_up, pred_up, zero_division=0)
    try:
        roc = roc_auc_score(true_up, predicted_returns)
    except:
        roc = np.nan

    cm = confusion_matrix(true_up, pred_up)
    strat_returns = trading_signal_returns(true_returns, predicted_returns)
    cum_return = np.cumprod(1 + strat_returns)[-1] - 1 if len(strat_returns) > 0 else np.nan
    sharpe = np.mean(strat_returns) / (np.std(strat_returns) + 1e-9) * np.sqrt(252)
    roll_sharpe = pd.Series(strat_returns).rolling(rolling_window).apply(
        lambda x: np.mean(x) / (np.std(x) + 1e-9) * np.sqrt(252), raw=True)
    roll_acc = pd.Series(pred_up == true_up).rolling(rolling_window).mean()
    roll_cum_return = (1 + pd.Series(strat_returns)).cumprod() - 1
    rolling_sharpe_mean, rolling_sharpe_std = roll_sharpe.mean(), roll_sharpe.std()
    rolling_acc_mean, rolling_acc_std = roll_acc.mean(), roll_acc.std()
    return {
        "MSE": mse, "RMSE": rmse, "MAE": mae,
        "Direction_Acc": acc, "Precision": prec, "Recall": rec, "F1": f1, "ROC_AUC": roc,
        "Sharpe": sharpe, "Cumulative_Return": cum_return,
        "Rolling_Sharpe_Mean": rolling_sharpe_mean, "Rolling_Sharpe_Std": rolling_sharpe_std,
        "Rolling_Acc_Mean": rolling_acc_mean, "Rolling_Acc_Std": rolling_acc_std,
        "Confusion_Matrix": cm,
        "Rolling_Sharpe": roll_sharpe,
        "Rolling_Acc": roll_acc,
        "Rolling_CumReturn": roll_cum_return,
        "Signal_Returns": strat_returns
    }

# Valid data masks and calculate for both models
mask_gpt = df['Market_True_Return'].notnull() & df['LSTM_GPT4'].notnull()
mask_fbert = df['Market_True_Return'].notnull() & df['LSTM_FinBERT'].notnull()

print("\nValid rows for GPT:", mask_gpt.sum())
print("Valid rows for FinBERT:", mask_fbert.sum())

if mask_gpt.sum() == 0 or mask_fbert.sum() == 0:
    raise ValueError("No valid data rows for at least one model. Check input data and merges!")

metrics_gpt = compute_all_metrics(
    df.loc[mask_gpt, 'Market_True_Return'].values,
    df.loc[mask_gpt, 'LSTM_GPT4'].values
)
metrics_finbert = compute_all_metrics(
    df.loc[mask_fbert, 'Market_True_Return'].values,
    df.loc[mask_fbert, 'LSTM_FinBERT'].values
)

# Tables for thesis (including rolling mean/std)
comparison_table = pd.DataFrame({
    "Hybrid (FinBERT+GPT-4)": {k: v for k, v in metrics_gpt.items() if not isinstance(v, (np.ndarray, pd.Series, list))},
    "FinBERT-only": {k: v for k, v in metrics_finbert.items() if not isinstance(v, (np.ndarray, pd.Series, list))}
})
print("Model Comparison Table")
print(comparison_table)
comparison_table.to_csv("model_performance_comparison.csv")

# Confusion matrices for appendix
print("Confusion Matrices")
print("Hybrid (FinBERT+GPT-4):\n", metrics_gpt["Confusion_Matrix"])
print("FinBERT-only:\n", metrics_finbert["Confusion_Matrix"])
np.savetxt("hybrid_confusion_matrix.csv", metrics_gpt["Confusion_Matrix"], delimiter=",")
np.savetxt("finbert_confusion_matrix.csv", metrics_finbert["Confusion_Matrix"], delimiter=",")

# Diebold-Mariano test (MSE loss differential)
e1 = (df.loc[mask_gpt, "Market_True_Return"].values - df.loc[mask_gpt, "LSTM_GPT4"].values) ** 2
e2 = (df.loc[mask_fbert, "Market_True_Return"].values - df.loc[mask_fbert, "LSTM_FinBERT"].values) ** 2
dm_stat, dm_pvalue = diebold_mariano_test(e1, e2, h=1, alternative='two-sided')
print("Diebold-Mariano Test")
print(f"DM statistic: {dm_stat:.3f}, p-value: {dm_pvalue:.3g}")

# Regime split (by VIX) show model dominance in high/low volatility
vix_median = df["VIX"].median()
df["VIX_regime"] = np.where(df["VIX"] > vix_median, "High_VIX", "Low_VIX")
def regime_metrics(regime):
    idx = df["VIX_regime"] == regime
    gpt_metrics = compute_all_metrics(
        df.loc[idx & mask_gpt, "Market_True_Return"].values, df.loc[idx & mask_gpt, "LSTM_GPT4"].values
    )
    finbert_metrics = compute_all_metrics(
        df.loc[idx & mask_fbert, "Market_True_Return"].values, df.loc[idx & mask_fbert, "LSTM_FinBERT"].values
    )
    return gpt_metrics, finbert_metrics
for regime in ["High_VIX", "Low_VIX"]:
    gpt_metrics, finbert_metrics = regime_metrics(regime)
    print(f"\n=== {regime} Regime ===")
    print("Hybrid Sharpe:", gpt_metrics["Sharpe"], "Cumulative:", gpt_metrics["Cumulative_Return"])
    print("FinBERT-only Sharpe:", finbert_metrics["Sharpe"], "Cumulative:", finbert_metrics["Cumulative_Return"])

# Distribution plots of strategy returns (for tail risk/outlier events)
plt.figure(figsize=(10,6))
sns.histplot(metrics_gpt['Signal_Returns'], bins=50, kde=True, color="blue", stat="density", label="Hybrid")
sns.histplot(metrics_finbert['Signal_Returns'], bins=50, kde=True, color="orange", stat="density", label="FinBERT-only", alpha=0.7)
plt.legend()
plt.title("Distribution of Strategy Returns")
plt.xlabel("Strategy Return per Period")
plt.ylabel("Density")
plt.tight_layout()
plt.show()
print("\nHybrid strategy return quantiles:\n", pd.Series(metrics_gpt["Signal_Returns"]).quantile([.01, .05, .95, .99]))
print("\nFinBERT-only strategy return quantiles:\n", pd.Series(metrics_finbert["Signal_Returns"]).quantile([.01, .05, .95, .99]))



## Robustness Appendix Code: Log-Returns Target

##### If differences between metrics (e.g., RMSE, Sharpe, accuracy) are small, you can claim that your results are robust to the choice of return definition

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix

df_gpt   = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market   = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# Compute close-to-close log-return
market = market.sort_values("Date")  # Already loaded above

# Convert Close column to float if not already
market["Close"] = pd.to_numeric(market["Close"], errors="coerce")

# Calculate close-to-close log return
market["LogReturn"] = np.log(market["Close"] / market["Close"].shift(1))

# Align test set with log-return target
df = df_gpt[["Date", "Predicted_Return"]].rename(columns={"Predicted_Return": "LSTM_GPT4"})
df["LSTM_FinBERT"] = df_fbert["Predicted_Return"].values
df = df.merge(market[["Date", "Return", "LogReturn", "VIX"]], on="Date", how="left")
df.dropna(subset=["LogReturn", "LSTM_GPT4", "LSTM_FinBERT"], inplace=True)

# Metric Functions
def trading_signal_returns(true_returns, predicted_returns):
    signal = np.where(predicted_returns > 0, 1, -1)
    return signal * true_returns

from sklearn.metrics import average_precision_score

def compute_all_metrics(true_returns, predicted_returns, rolling_window=60):
    mse = mean_squared_error(true_returns, predicted_returns)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(true_returns, predicted_returns)
    true_up = (true_returns > 0).astype(int)
    pred_up = (predicted_returns > 0).astype(int)
    acc = accuracy_score(true_up, pred_up)
    prec = precision_score(true_up, pred_up, zero_division=0)
    rec = recall_score(true_up, pred_up, zero_division=0)
    f1 = f1_score(true_up, pred_up, zero_division=0)
    try:
        roc = roc_auc_score(true_up, predicted_returns)
    except:
        roc = np.nan
    try:
        ap = average_precision_score(true_up, predicted_returns)
    except:
        ap = np.nan
    strat_returns = trading_signal_returns(true_returns, predicted_returns)
    sharpe = np.mean(strat_returns) / (np.std(strat_returns) + 1e-9) * np.sqrt(252)
    return {
        "MSE": mse, "RMSE": rmse, "MAE": mae,
        "Direction_Acc": acc, "Precision": prec, "Recall": rec, "F1": f1,
        "ROC_AUC": roc, "P-R AP": ap,
        "Sharpe": sharpe,
    }

# Metrics: Simple vs Log-Returns as targets
metrics_simple = compute_all_metrics(df["Return"].values, df["LSTM_GPT4"].values)
metrics_log = compute_all_metrics(df["LogReturn"].values, df["LSTM_GPT4"].values)

metrics_simple_fb = compute_all_metrics(df["Return"].values, df["LSTM_FinBERT"].values)
metrics_log_fb = compute_all_metrics(df["LogReturn"].values, df["LSTM_FinBERT"].values)

# Creating Summary table
summary = pd.DataFrame({
    "Hybrid_LSTM_SimpleReturn": metrics_simple,
    "Hybrid_LSTM_LogReturn": metrics_log,
    "FinBERT_LSTM_SimpleReturn": metrics_simple_fb,
    "FinBERT_LSTM_LogReturn": metrics_log_fb,
})
print("\n===== Robustness Appendix: Model Performance (Simple vs Log-Returns) =====\n")
print(summary)

summary.to_csv("robustness_appendix_lstm_log_vs_simple.csv")


## Weak Stationarity Tests

### Augmented Dickey-Fuller (ADF) and KPSS Tests

### ARCH-LM Test (for conditional variance stationarity)

In [None]:
import pandas as pd
import numpy as np

df_gpt = pd.read_csv("derived/lstm_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market_df = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# Align
df = pd.DataFrame({
    "Date": df_gpt["Date"],
    "Market_Return": df_gpt["Return"],
    "LSTM_GPT4": df_gpt["Predicted_Return"],
    "LSTM_FinBERT": df_fbert["Predicted_Return"],
})
df = df.merge(
    market_df[["Date", "Return", "VIX"]].rename(columns={"Return": "Market_True_Return"}),
    on="Date", how="left"
)
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

returns = df["Market_True_Return"].dropna().values

# Running staitonarity and ARCH tests
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.stats.diagnostic import het_arch

# Augmented Dickey-Fuller
adf_result = adfuller(returns)
print("Augmented Dickey-Fuller (ADF) Test")
print(f"ADF Statistic: {adf_result[0]:.3f}")
print(f"p-value: {adf_result[1]:.4f}")
print("Null hypothesis: series is non-stationary")
print("If p < 0.05, reject null (series is stationary)")

# KPSS Test
kpss_result = kpss(returns, regression='c', nlags='auto')
print("KPSS Test")
print(f"KPSS Statistic: {kpss_result[0]:.3f}")
print(f"p-value: {kpss_result[1]:.4f}")
print("Null hypothesis: series is stationary")
print("If p < 0.05, reject null (series is non-stationary)")

# ARCH-LM Test
arch_test = het_arch(returns)
print("ARCH-LM Test (for volatility clustering)")
print(f"ARCH-LM Stat: {arch_test[0]:.2f}")
print(f"p-value: {arch_test[1]:.4f}")
print("Null hypothesis: no ARCH effects (homoskedasticity)")
print("If p < 0.05, volatility is time-varying (common in finance)")

import pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.stats.diagnostic import het_arch

# Run all tests on returns (as before)
returns = df["Market_True_Return"].dropna().values

adf_stat, adf_p, _, _, _, _ = adfuller(returns)
kpss_stat, kpss_p, _, _ = kpss(returns, regression='c', nlags='auto')
arch_stat, arch_p, _, _ = het_arch(returns)

# Building summary tables as DataFrames
stationarity_table = pd.DataFrame({
    "Test": ["ADF", "KPSS"],
    "Statistic": [adf_stat, kpss_stat],
    "p-value": [adf_p, kpss_p],
    "Null Hypothesis": [
        "Non-stationary (unit root present)",
        "Stationary (no unit root)"
    ],
    "Interpretation": [
        "Reject if p < 0.05 (series is stationary)",
        "Reject if p < 0.05 (series is non-stationary)"
    ]
})

arch_table = pd.DataFrame({
    "Test": ["ARCH-LM"],
    "Statistic": [arch_stat],
    "p-value": [arch_p],
    "Null Hypothesis": ["No ARCH effects (homoskedasticity)"],
    "Interpretation": ["Reject if p < 0.05 (volatility is time-varying)"]
})

print("Stationarity Test Results")
print(stationarity_table.to_string(index=False))

print("Conditional Heteroskedasticity (ARCH) Test")
print(arch_table.to_string(index=False))

## Practical Application

### Practical Applicaiton: Hybrid Method

In [None]:
import numpy as np
import pandas as pd
from datetime import date
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.callbacks import EarlyStopping

data_path = "derived/final_merged_for_lstm.csv"
df = pd.read_csv(data_path, parse_dates=["Date"])

# Cleaning Up Columns
df = df.drop(columns=[col for col in ['key_0', 'Date_sent'] if col in df.columns])

# Ensuring all columns except Date are numeric
for col in df.columns:
    if col not in ['Date']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
df = df.dropna().reset_index(drop=True)

# Using only meaningful features
exclude_cols = ['Date', 'Close', 'High', 'Low', 'Open', 'Volume'] # Remove prices and volume
feature_cols = ['Return', 'VIX', 'Sentiment']
# adding lagged features
N_LAGS = 2
for lag in range(1, N_LAGS+1):
    df[f"Return_lag{lag}"] = df["Return"].shift(lag)
    df[f"Sentiment_lag{lag}"] = df["Sentiment"].shift(lag)
feature_cols += [f"Return_lag{lag}" for lag in range(1, N_LAGS+1)]
feature_cols += [f"Sentiment_lag{lag}" for lag in range(1, N_LAGS+1)]
df = df.dropna().reset_index(drop=True)

# Train/val/test split by time
TRAIN_END = date(2022, 12, 31)
VAL_END = date(2024, 5, 31)
df["Date"] = pd.to_datetime(df["Date"])
df_train = df[df["Date"] <= pd.to_datetime(TRAIN_END)]
df_val = df[(df["Date"] > pd.to_datetime(TRAIN_END)) & (df["Date"] <= pd.to_datetime(VAL_END))]
df_test = df[df["Date"] > pd.to_datetime(VAL_END)]

# Scaling (IMPORTANT) 
scaler = StandardScaler()
X_train = scaler.fit_transform(df_train[feature_cols])
X_val = scaler.transform(df_val[feature_cols])
X_test = scaler.transform(df_test[feature_cols])
y_train = df_train["Return"].values
y_val = df_val["Return"].values
y_test = df_test["Return"].values

# Create LSTM Sequences
SEQ_LEN = 5
def create_sequences(x, y, seq_len):
    xs, ys = [], []
    for i in range(len(x) - seq_len):
        xs.append(x[i : i + seq_len])
        ys.append(y[i + seq_len])
    return np.array(xs), np.array(ys)

X_train_seq, y_train_seq = create_sequences(X_train, y_train, SEQ_LEN)
X_val_seq, y_val_seq = create_sequences(X_val, y_val, SEQ_LEN)
X_test_seq, y_test_seq = create_sequences(X_test, y_test, SEQ_LEN)

# LSTM Model
model = Sequential([
    LSTM(50, input_shape=(SEQ_LEN, X_train_seq.shape[2])),
    Dense(1),
])
model.compile(optimizer="adam", loss="mse")
early_stop = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

# Train
print("Training LSTM …")
model.fit(
    X_train_seq, y_train_seq,
    epochs=100,
    batch_size=16,
    validation_data=(X_val_seq, y_val_seq),
    callbacks=[early_stop],
    verbose=1,
)

# Evaluate
print("Evaluating …")
pred_test = model.predict(X_test_seq).flatten()
rmse = np.sqrt(mean_squared_error(y_test_seq, pred_test))
print(f"Test RMSE: {rmse:.6f}")
# Directional accuracy
actual_dir = (y_test_seq > 0)
pred_dir = (pred_test > 0)
acc = accuracy_score(actual_dir, pred_dir)
print(f"Directional accuracy: {acc:.2%}")

df_out = df_test.iloc[SEQ_LEN:].copy().reset_index(drop=True)
df_out["Predicted_Return"] = pred_test
df_out.to_csv("derived/lstm_prac_test_predictions.csv", index=False)

print(df_out[["Date", "Return", "Predicted_Return"]].head())
print("Final LSTM model summary:")
model.summary()

### Practical Applicaiton: FinBERT-only Method

In [None]:
import numpy as np
import pandas as pd
from datetime import date
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.callbacks import EarlyStopping

data_path = "derived/final_merged_FinBERT_for_lstm.csv"
df = pd.read_csv(data_path, parse_dates=["Date"])

# Cleaning columns 
df = df.drop(columns=[col for col in ['key_0', 'Date_sent'] if col in df.columns])

for col in ['Close', 'High', 'Low', 'Open', 'Volume']:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')
if "VIX" in df.columns:
    df["VIX"] = pd.to_numeric(df["VIX"], errors='coerce')

for col in df.columns:
    if col != 'Date':
        df[col] = pd.to_numeric(df[col], errors='coerce')

df = df.dropna().reset_index(drop=True)

# Feature Engineering: Add lags for target & sentiment 
N_LAGS = 2
for lag in range(1, N_LAGS+1):
    df[f"Return_lag{lag}"] = df["Return"].shift(lag)
    df[f"FinBERT_score_lag{lag}"] = df["FinBERT_score"].shift(lag)
df = df.dropna().reset_index(drop=True)

# Train/val/test split 
TRAIN_END = date(2022, 12, 31)
VAL_END = date(2024, 5, 31)

df["Date"] = pd.to_datetime(df["Date"])
df_train = df[df["Date"] <= pd.to_datetime(TRAIN_END)]
df_val = df[(df["Date"] > pd.to_datetime(TRAIN_END)) & (df["Date"] <= pd.to_datetime(VAL_END))]
df_test = df[df["Date"] > pd.to_datetime(VAL_END)]

# Features and Scaling
FEATURES = ['Close', 'High', 'Low', 'Open', 'Volume', 'VIX', 'FinBERT_score'] + \
           [f"Return_lag{lag}" for lag in range(1, N_LAGS+1)] + \
           [f"FinBERT_score_lag{lag}" for lag in range(1, N_LAGS+1)]
X_train = df_train[FEATURES].astype(np.float32).values
X_val = df_val[FEATURES].astype(np.float32).values
X_test = df_test[FEATURES].astype(np.float32).values
y_train = df_train["Return"].values
y_val = df_val["Return"].values
y_test = df_test["Return"].values

# Scale Features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# Creating LSTM sequences (window=5)
SEQ_LEN = 5
def create_sequences(x, y, seq_len):
    xs, ys = [], []
    for i in range(len(x) - seq_len):
        xs.append(x[i : i + seq_len])
        ys.append(y[i + seq_len])
    return np.array(xs), np.array(ys)
X_train_seq, y_train_seq = create_sequences(X_train, y_train, SEQ_LEN)
X_val_seq, y_val_seq = create_sequences(X_val, y_val, SEQ_LEN)
X_test_seq, y_test_seq = create_sequences(X_test, y_test, SEQ_LEN)

print(f"Train samples: {X_train_seq.shape[0]}, Val: {X_val_seq.shape[0]}, Test: {X_test_seq.shape[0]}")

# LSTM Model
model = Sequential([
    LSTM(50, input_shape=(SEQ_LEN, X_train_seq.shape[2])),
    Dense(1),
])
model.compile(optimizer="adam", loss="mse")
early_stop = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

# Train
print("Training LSTM …")
model.fit(
    X_train_seq, y_train_seq,
    epochs=100,
    batch_size=16,
    validation_data=(X_val_seq, y_val_seq),
    callbacks=[early_stop],
    verbose=1,
)

# Evaluate
print("Evaluating …")
pred_test = model.predict(X_test_seq).flatten()
rmse = np.sqrt(mean_squared_error(y_test_seq, pred_test))
print(f"Test RMSE: {rmse:.6f}")
actual_dir = (y_test_seq > 0)
pred_dir = (pred_test > 0)
acc = accuracy_score(actual_dir, pred_dir)
print(f"Directional accuracy: {acc:.2%}")

df_test = df_test.iloc[SEQ_LEN:].copy()  # align with y_test_seq
df_test["Predicted_Return"] = pred_test
df_test.to_csv("derived/lstm_prac_FinBERT_only_test_predictions.csv", index=False)

print(df_test[["Date", "Return", "Predicted_Return"]].head())
print("Final LSTM model summary:")
model.summary()

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, accuracy_score,
    precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
)
from scipy.stats import norm

# Align data by date
df = pd.DataFrame({
    "Date": df_gpt["Date"],
    "Market_Return": df_gpt["Return"],
    "LSTM_GPT4": df_gpt["Predicted_Return"],
    "LSTM_FinBERT": df_fbert["Predicted_Return"],
})
df = df.merge(market_df[["Date", "Return", "VIX"]].rename(columns={"Return": "Market_True_Return"}), on="Date", how="left")
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

# Diagnostics
print("Data Diagnostics")
print("df columns:", list(df.columns))
for col in ["Market_True_Return", "LSTM_GPT4", "LSTM_FinBERT"]:
    print(f"{col} has {df[col].notnull().sum()} valid rows out of {len(df)}")
print("Rows in df:", len(df))
print(df.head())

# Signal-based trading returns
def trading_signal_returns(true_returns, predicted_returns):
    signal = np.where(predicted_returns > 0, 1, -1)
    return signal * true_returns

# Compute all metrics for each model (with rolling mean/std)
def compute_all_metrics(true_returns, predicted_returns, rolling_window=60):
    if len(true_returns) == 0 or len(predicted_returns) == 0:
        raise ValueError("Empty input arrays! Check your mask and input data.")
    mse = mean_squared_error(true_returns, predicted_returns)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(true_returns, predicted_returns)

    true_up = (true_returns > 0).astype(int)
    pred_up = (predicted_returns > 0).astype(int)

    acc = accuracy_score(true_up, pred_up)
    prec = precision_score(true_up, pred_up, zero_division=0)
    rec = recall_score(true_up, pred_up, zero_division=0)
    f1 = f1_score(true_up, pred_up, zero_division=0)
    try:
        roc = roc_auc_score(true_up, predicted_returns)
    except:
        roc = np.nan

    cm = confusion_matrix(true_up, pred_up)
    strat_returns = trading_signal_returns(true_returns, predicted_returns)
    cum_return = np.cumprod(1 + strat_returns)[-1] - 1 if len(strat_returns) > 0 else np.nan
    sharpe = np.mean(strat_returns) / (np.std(strat_returns) + 1e-9) * np.sqrt(252)
    roll_sharpe = pd.Series(strat_returns).rolling(rolling_window).apply(
        lambda x: np.mean(x) / (np.std(x) + 1e-9) * np.sqrt(252), raw=True)
    roll_acc = pd.Series(pred_up == true_up).rolling(rolling_window).mean()
    roll_cum_return = (1 + pd.Series(strat_returns)).cumprod() - 1
    rolling_sharpe_mean, rolling_sharpe_std = roll_sharpe.mean(), roll_sharpe.std()
    rolling_acc_mean, rolling_acc_std = roll_acc.mean(), roll_acc.std()
    return {
        "MSE": mse, "RMSE": rmse, "MAE": mae,
        "Direction_Acc": acc, "Precision": prec, "Recall": rec, "F1": f1, "ROC_AUC": roc,
        "Sharpe": sharpe, "Cumulative_Return": cum_return,
        "Rolling_Sharpe_Mean": rolling_sharpe_mean, "Rolling_Sharpe_Std": rolling_sharpe_std,
        "Rolling_Acc_Mean": rolling_acc_mean, "Rolling_Acc_Std": rolling_acc_std,
        "Confusion_Matrix": cm,
        "Rolling_Sharpe": roll_sharpe,
        "Rolling_Acc": roll_acc,
        "Rolling_CumReturn": roll_cum_return,
        "Signal_Returns": strat_returns
    }

# Create valid data masks and calculate for both models
mask_gpt = df['Market_True_Return'].notnull() & df['LSTM_GPT4'].notnull()
mask_fbert = df['Market_True_Return'].notnull() & df['LSTM_FinBERT'].notnull()

print("\nValid rows for GPT:", mask_gpt.sum())
print("Valid rows for FinBERT:", mask_fbert.sum())

if mask_gpt.sum() == 0 or mask_fbert.sum() == 0:
    raise ValueError("No valid data rows for at least one model. Check input data and merges!")

metrics_gpt = compute_all_metrics(
    df.loc[mask_gpt, 'Market_True_Return'].values,
    df.loc[mask_gpt, 'LSTM_GPT4'].values
)
metrics_finbert = compute_all_metrics(
    df.loc[mask_fbert, 'Market_True_Return'].values,
    df.loc[mask_fbert, 'LSTM_FinBERT'].values
)

# Table for thesis (include rolling mean/std)
comparison_table = pd.DataFrame({
    "Hybrid (FinBERT+GPT-4)": {k: v for k, v in metrics_gpt.items() if not isinstance(v, (np.ndarray, pd.Series, list))},
    "FinBERT-only": {k: v for k, v in metrics_finbert.items() if not isinstance(v, (np.ndarray, pd.Series, list))}
})
print("Model Comparison Table")
print(comparison_table)
comparison_table.to_csv("model_performance_comparison.csv")

# Confusion matrices for appendix
print("Confusion Matrices")
print("Hybrid (FinBERT+GPT-4):\n", metrics_gpt["Confusion_Matrix"])
print("FinBERT-only:\n", metrics_finbert["Confusion_Matrix"])
np.savetxt("hybrid_confusion_matrix.csv", metrics_gpt["Confusion_Matrix"], delimiter=",")
np.savetxt("finbert_confusion_matrix.csv", metrics_finbert["Confusion_Matrix"], delimiter=",")

# Regime split (by VIX): show model dominance in high/low volatility
vix_median = df["VIX"].median()
df["VIX_regime"] = np.where(df["VIX"] > vix_median, "High_VIX", "Low_VIX")
def regime_metrics(regime):
    idx = df["VIX_regime"] == regime
    gpt_metrics = compute_all_metrics(
        df.loc[idx & mask_gpt, "Market_True_Return"].values, df.loc[idx & mask_gpt, "LSTM_GPT4"].values
    )
    finbert_metrics = compute_all_metrics(
        df.loc[idx & mask_fbert, "Market_True_Return"].values, df.loc[idx & mask_fbert, "LSTM_FinBERT"].values
    )
    return gpt_metrics, finbert_metrics
for regime in ["High_VIX", "Low_VIX"]:
    gpt_metrics, finbert_metrics = regime_metrics(regime)
    print(f"\n=== {regime} Regime")
    print("Hybrid Sharpe:", gpt_metrics["Sharpe"], "Cumulative:", gpt_metrics["Cumulative_Return"])
    print("FinBERT-only Sharpe:", finbert_metrics["Sharpe"], "Cumulative:", finbert_metrics["Cumulative_Return"])

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

df_gpt = pd.read_csv("derived/lstm_prac_test_predictions.csv", parse_dates=["Date"])
df_fbert = pd.read_csv("derived/lstm_prac_FinBERT_only_test_predictions.csv", parse_dates=["Date"])
market_df = pd.read_csv("sp500_vix_data.csv", parse_dates=["Date"])

# Merge on Date
df = df_gpt[["Date", "Return", "Predicted_Return"]].rename(
    columns={"Return": "Market_Return", "Predicted_Return": "LSTM_GPT4"}
).merge(
    df_fbert[["Date", "Predicted_Return"]].rename(
        columns={"Predicted_Return": "LSTM_FinBERT"}
    ),
    on="Date", how="inner"
).merge(
    market_df[["Date", "Return", "VIX"]].rename(
        columns={"Return": "Market_True_Return"}
    ),
    on="Date", how="left"
)
df["VIX"] = pd.to_numeric(df["VIX"], errors="coerce")

# Signal-based returns
def trading_signal_returns(true_returns, predicted_returns):
    signal = np.where(predicted_returns > 0, 1, -1)
    return signal * true_returns

def compound_cumret(returns):
    return np.cumprod(1 + returns) - 1

# Masking for valid rows (overlapping dates)
mask = df["Market_True_Return"].notnull() & df["LSTM_GPT4"].notnull() & df["LSTM_FinBERT"].notnull()
if not mask.any():
    raise ValueError("No valid rows after masking. Check merge.")

df_valid = df[mask].copy().reset_index(drop=True)

# Strategy returns
df_valid["LSTM_GPT4_Strategy"] = trading_signal_returns(df_valid["Market_True_Return"], df_valid["LSTM_GPT4"])
df_valid["FinBERT_Strategy"] = trading_signal_returns(df_valid["Market_True_Return"], df_valid["LSTM_FinBERT"])

# Cumulative compounded returns
df_valid["Market_CumReturn"] = compound_cumret(df_valid["Market_True_Return"])
df_valid["LSTM_GPT4_CumReturn"] = compound_cumret(df_valid["LSTM_GPT4_Strategy"])
df_valid["FinBERT_CumReturn"] = compound_cumret(df_valid["FinBERT_Strategy"])

# Plot
plt.figure(figsize=(12, 6))
plt.plot(df_valid["Date"], df_valid["Market_CumReturn"], label="Market Cumulative Return", color='black', linewidth=2)
plt.plot(df_valid["Date"], df_valid["LSTM_GPT4_CumReturn"], label="Hybrid LSTM-GPT4 Cumulative Return", color='red')
plt.plot(df_valid["Date"], df_valid["FinBERT_CumReturn"], label="FinBERT-only Cumulative Return", color='darkorange')
plt.axhline(0, color='grey', linestyle=':', linewidth=0.7)
plt.title("Cumulative Compounded Returns Over Time")
plt.xlabel("Date")
plt.ylabel("Cumulative Return (Compounded)")
plt.legend()
plt.tight_layout()
plt.show()