<a href="https://colab.research.google.com/github/Rossiee1/multimodal-stock-vix-prediction/blob/main/Multimodal_Stock_Market_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# !pip install yfinance xgboost scikit-learn tensorflow matplotlib seaborn pandas numpy scipy optuna

# Title: Multimodal Stock Market Prediction Using the VIX Index and Historical Price Data
#### Models: Random Forest, XGBoost, LSTM
#### Goal: Compare "with VIX" vs "without VIX" model performance with advanced EDA and visualizations

In [None]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, accuracy_score
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import cross_val_score

import yfinance as yf
import xgboost as xgb
import optuna
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.api import VAR

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (LSTM, Dense, Dropout, Bidirectional, LayerNormalization, Input)

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
plt.style.use('bmh')
# sns.set_palette('tab10')


mpl.rcParams.update({
    # figure & axes text
    'text.color':           'black',
    'axes.labelcolor':      'black',
    'axes.titlecolor':      'black',
    # tick labels
    'xtick.color':          'black',
    'ytick.color':          'black',
    # legend
    'legend.edgecolor':     'black',
    'legend.facecolor':     'white',
    'legend.frameon':       True,
    # any other annotations, etc.
})


In [None]:
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

TICKERS = ["AAPL", "MSFT", "MAR", "KO", "COST", "DAL", "EFX", "EQIX", "XOM"]
VIX_TICKER = '^VIX'
START_DATE = '2013-01-01'
END_DATE = '2024-12-31'

In [None]:
# DATA FETCHING
def get_data():
    df_all = yf.download(TICKERS + [VIX_TICKER], start=START_DATE, end=END_DATE)['Close']
    df = df_all[TICKERS].copy()
    df['VIX'] = df_all[VIX_TICKER]
    return df.dropna()

In [None]:
# STATIONARITY (ADF) TEST
def adf_test(series, name):
    result = adfuller(series.dropna())
    print(f"ADF Test for {name}")
    print(f"  ADF Statistic: {result[0]:.4f}, p-value: {result[1]:.4f}")
    for k, v in result[4].items():
        print(f"    {k}: {v:.4f}")
    print(f"  => {'stationary' if result[1]<0.05 else 'non-stationary'}\n")

In [None]:
# FEATURE ENGINEERING + VIX INTERACTIONS + REGIME FLAG
def create_features(df, ticker, use_vix=True):
    df_t = df[[ticker]].rename(columns={ticker: 'Close'}).copy()
    # Returns & volatility
    df_t['LogReturn'] = np.log(df_t['Close']/df_t['Close'].shift(1))
    df_t['Return']    = df_t['Close'].pct_change()
    df_t['Volatility']= df_t['Return'].rolling(5).std()
    for lag in [1,5]:
        df_t[f'Return_lag{lag}'] = df_t['Return'].shift(lag)
    df_t['Return_Mean5'] = df_t['Return'].rolling(5).mean()
    df_t['Return_Z5']    = (df_t['Return'] - df_t['Return_Mean5']) / df_t['Return'].rolling(5).std()
    # RSI & Bollinger
    delta = df_t['Close'].diff()
    up, down = delta.clip(lower=0), -delta.clip(upper=0)
    df_t['RSI']   = 100 - 100/(1 + up.rolling(14).mean()/down.rolling(14).mean())
    df_t['MA20']  = df_t['Close'].rolling(20).mean()
    df_t['STD20'] = df_t['Close'].rolling(20).std()
    df_t['UpperBB']= df_t['MA20'] + 2*df_t['STD20']
    df_t['LowerBB']= df_t['MA20'] - 2*df_t['STD20']
    if use_vix:
        df_t['VIX'] = df['VIX']
        # VIX features
        df_t['VIX_MA5']       = df_t['VIX'].rolling(5).mean()
        df_t['VIX_Mom5']      = df_t['VIX'] - df_t['VIX'].shift(5)
        for lag in [1,5]: df_t[f'VIX_lag{lag}'] = df_t['VIX'].shift(lag)
        # Cross-interactions
        df_t['Ret1_x_VIX1'] = df_t['Return_lag1'] * df_t['VIX_lag1']
        df_t['Ret5_x_VIX5'] = df_t['Return_lag5'] * df_t['VIX_lag5']
        # Regime flag: high-vol if VIX above rolling 75th pct
        df_t['VIX_P75'] = df_t['VIX'].rolling(252).quantile(0.75)
        df_t['HighVIX'] = (df_t['VIX'] > df_t['VIX_P75']).astype(int)
    return df_t.dropna()

In [None]:
def plot_train_test_split(df, split_date, ticker, use_vix):
    plt.figure(figsize=(14, 5))
    plt.plot(df.index, df['Return'], label='Returns', alpha=0.6)
    plt.axvspan(split_date, df.index[-1], color='black', alpha=0.2, label='Test Period')
    plt.title(f"{ticker} Return Over Time - {'With' if use_vix else 'Without'} VIX (Train/Test Split)")
    plt.xlabel("Date")
    plt.ylabel("Return")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

In [None]:
# CORRELATION HEATMAP
def plot_corr_heatmap(df_feat, ticker, use_vix):
    feats = [c for c in df_feat.columns if c not in ['Close']]
    corr = df_feat[feats].corr()
    plt.figure(figsize=(20,14))
    sns.heatmap(corr, annot=True, fmt='.2f', cmap='coolwarm')
    plt.title(f"Correlation Heatmap - {ticker} {'with' if use_vix else 'without'} VIX")
    plt.tight_layout(); plt.show()

In [None]:
# DATA PREP for RF/XGB
def prepare_data(df_feat, use_vix=True):
    drop_cols = ['Close'] + (['HighVIX','VIX_P75'] if use_vix else [])
    X = df_feat.drop(columns=drop_cols + ['Return']).copy()
    y = df_feat['Return'].shift(-1).dropna()
    X = X.iloc[:-1]
    scaler = RobustScaler()
    Xs = scaler.fit_transform(X)
    split = int(0.8 * len(Xs))
    dates = df_feat.index[split+1:]
    return Xs[:split], Xs[split:], y[:split], y[split:], dates

In [None]:
# REGIME SPLIT
def split_by_regime(df_feat):
    high = df_feat[df_feat['HighVIX']==1]
    low  = df_feat[df_feat['HighVIX']==0]
    return low, high

In [None]:
# XGBOOST TUNING via Optuna
def tune_xgb(X_train, y_train):
    def objective(trial):
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'learning_rate': trial.suggest_loguniform('learning_rate', 0.01, 0.3),
            'max_depth': trial.suggest_int('max_depth', 3, 10)
        }
        model = xgb.XGBRegressor(**params, random_state=SEED)
        score = cross_val_score(model, X_train, y_train, cv=3,
                                scoring='neg_mean_squared_error').mean()
        return score
    study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED))
    study.optimize(objective, n_trials=20)
    best = study.best_params
    print("Best XGBoost params:", best)
    model = xgb.XGBRegressor(**best, random_state=SEED)
    model.fit(X_train, y_train)
    return model

In [None]:
# VECTOR AUTOREGRESSION (VAR)
def run_var(df_raw, maxlags=5):
    df_lr = np.log(df_raw/df_raw.shift(1)).dropna()
    model = VAR(df_lr)
    sel = model.select_order(maxlags)
    p = sel.selected_orders['aic']
    print(f"VAR lag order (AIC): {p}")
    res = model.fit(p)
    print(res.summary())
    fc = res.forecast(df_lr.values[-p:], steps=5)
    idx = pd.date_range(df_lr.index[-1], periods=6, freq='B')[1:]
    fc_df = pd.DataFrame(fc, index=idx, columns=df_lr.columns)
    print("VAR 5-day forecast (log-returns):\n", fc_df)
    return res, fc_df

In [None]:
# EVALUATION & PLOTTING
def evaluate(y_test, y_pred, model_name, dates, context):
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae  = mean_absolute_error(y_test, y_pred)
    r2   = r2_score(y_test, y_pred)
    acc  = accuracy_score((y_test>0).astype(int), (y_pred>0).astype(int))
    print(f"{model_name} ({context}) -> RMSE: {rmse:.5f}, MAE: {mae:.5f}, R2: {r2:.5f}, DirAcc: {acc:.3f}")
    plt.figure(figsize=(12,5))
    plt.plot(dates, y_test, label='Actual'); plt.plot(dates, y_pred, label='Pred');
    plt.title(f"{model_name} Predictions vs Actual ({context})");
    plt.legend(); plt.tight_layout(); plt.show()
    plt.figure(figsize=(8,4))
    sns.histplot(y_test - y_pred, bins=30, kde=True)
    plt.title(f"{model_name} Residuals ({context})"); plt.tight_layout(); plt.show()
    return {'Model':model_name,'Context':context,'RMSE':rmse,'MAE':mae,'R2':r2,'DirAcc':acc}


In [None]:
# MODELS: RF, XGB, LSTM
def run_rf(X_train, X_test, y_train, y_test, dates, context):
    m = RandomForestRegressor(n_estimators=100, random_state=SEED)
    m.fit(X_train, y_train)
    return evaluate(y_test, m.predict(X_test), 'RandomForest', dates, context)

def run_xgb(X_train, X_test, y_train, y_test, dates, context):
    print("Tuning XGBoost...")
    m = tune_xgb(X_train, y_train)
    return evaluate(y_test, m.predict(X_test), 'XGBoost', dates, context)

def create_lstm_sequences(X, y, seq_len=20):
    Xs, ys = [], []
    for i in range(seq_len, len(X)):
        Xs.append(X[i-seq_len:i]); ys.append(y[i])
    return np.array(Xs), np.array(ys)

def run_lstm(X_train, X_test, y_train, y_test, dates, context):
    seq = 20
    Xtr, ytr = create_lstm_sequences(X_train, y_train.values, seq)
    Xte, yte = create_lstm_sequences(X_test,  y_test.values,  seq)
    dt = dates[seq:]
    # build model
    inp = Input(shape=(seq, X_train.shape[1]))
    x = LayerNormalization()(inp)
    x = Bidirectional(LSTM(64, return_sequences=True))(x)
    x = Dropout(0.3)(x)
    x = Bidirectional(LSTM(32))(x)
    x = Dropout(0.3)(x)
    out = Dense(1)(x)
    model = Model(inp, out)
    model.compile(optimizer='adam', loss='mse')
    hist = model.fit(Xtr, ytr, validation_split=0.2, epochs=30,
                     batch_size=32, verbose=0)
    # training curves
    plt.figure();
    plt.plot(hist.history['loss'], label='train');
    plt.plot(hist.history['val_loss'], label='val');
    plt.title(f"LSTM Loss ({context})"); plt.legend(); plt.tight_layout(); plt.show()
    preds = model.predict(Xte).flatten()
    return evaluate(pd.Series(yte), pd.Series(preds), 'LSTM', dt, context)

In [None]:
df_raw = get_data()

In [None]:
print("\n=== VAR PANEL ANALYSIS ===")
run_var(df_raw)

In [None]:
results = []

for ticker in TICKERS:
    for use_vix in [True, False]:
        ctx_base = f"{ticker} - {'With' if use_vix else 'Without'} VIX"
        print(f"\n>>> Processing {ctx_base}")

        df_feat = create_features(df_raw, ticker, use_vix)

        # EDA
        adf_test(df_feat['Close'], f"{ticker} Close")
        adf_test(df_feat['Return'], f"{ticker} Return")
        plot_corr_heatmap(df_feat, ticker, use_vix)

        # Regime-aware modeling
        if use_vix:
            low_df, high_df = split_by_regime(df_feat)
            for df_reg, label in [(low_df, 'LowVIX'), (high_df, 'HighVIX')]:
                Xtr, Xte, ytr, yte, dates = prepare_data(df_reg, True)
                ctx = f"{ctx_base} - {label}"

                # ✅ FIX: use df_reg.index instead of df_feat.index
                split_date = df_reg.index[int(0.8 * len(df_reg))]
                plot_train_test_split(df_reg, split_date, ticker, use_vix)

                results.append(run_rf(Xtr, Xte, ytr, yte, dates, ctx))
                results.append(run_xgb(Xtr, Xte, ytr, yte, dates, ctx))
                results.append(run_lstm(Xtr, Xte, ytr, yte, dates, ctx))
        else:
            Xtr, Xte, ytr, yte, dates = prepare_data(df_feat, False)
            ctx = ctx_base

            split_date = df_feat.index[int(0.8 * len(df_feat))]  # ✅ Correct
            plot_train_test_split(df_feat, split_date, ticker, use_vix)

            results.append(run_rf(Xtr, Xte, ytr, yte, dates, ctx))
            results.append(run_xgb(Xtr, Xte, ytr, yte, dates, ctx))
            results.append(run_lstm(Xtr, Xte, ytr, yte, dates, ctx))


In [None]:
df_res = pd.DataFrame(results)
df_res.to_csv('model_comparison_regime.csv', index=False)

In [None]:
print("\n=== SUMMARY RESULTS ===")
df_res

In [None]:
# Comparison plots
plt.figure(figsize=(14, 6))
sns.barplot(data=df_res, x='Model', y='RMSE', hue='Context')
plt.title('RMSE Comparison')
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(14, 6))
sns.barplot(data=df_res, x='Model', y='DirAcc', hue='Context')
plt.title('Directional Accuracy Comparison')
plt.tight_layout()
plt.show()

In [None]:
df_res.head()

In [None]:
def plot_return_split_and_vix_regime(df: pd.DataFrame, split_date: pd.Timestamp, ticker: str, use_vix: bool):
    """
    Plots return and VIX over time with shaded test region. Skips plot if VIX is missing.
    """
    required_cols = ['Return', 'VIX']
    if not all(col in df.columns for col in required_cols):
        print(f"⚠️ Skipping regime plot for {ticker} - VIX not present.")
        return

    df = df.copy()
    df = df.dropna(subset=required_cols)

    title = f"{ticker} Return & VIX Regime Comparison - {'With VIX' if use_vix else 'Without VIX'}"

    fig, ax1 = plt.subplots(figsize=(14, 5))

    ax1.plot(df.index, df['Return'], label='Daily Return', color='tab:blue', linewidth=1.5)
    ax1.set_ylabel("Return", color='tab:blue')
    ax1.tick_params(axis='y', labelcolor='tab:blue')

    ax1.axvspan(split_date, df.index[-1], color='gray', alpha=0.15, label='Test Period')

    ax2 = ax1.twinx()
    ax2.plot(df.index, df['VIX'], label='VIX Index', color='tab:red', linestyle='--')
    ax2.set_ylabel("VIX", color='tab:red')
    ax2.tick_params(axis='y', labelcolor='tab:red')

    fig.suptitle(title, fontsize=14)
    fig.legend(loc='upper left', bbox_to_anchor=(0.1, 0.9))
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:
# Example for AAPL with VIX
ticker = 'AAPL'
use_vix = True

# Recreate feature set
df_feat = create_features(df_raw, ticker, use_vix)

# Drop NA (if needed) and calculate 80/20 split
split_date = df_feat.index[int(0.8 * len(df_feat))]

# Call the plot function
plot_return_split_and_vix_regime(df_feat, split_date, ticker, use_vix)

In [None]:
df_feat.head(10)

In [None]:
df_feat.index.min()  # Check first date

## Results

In [None]:
results = []

for ticker in TICKERS:
    for use_vix in [True, False]:
        ctx_base = f"{ticker} - {'With' if use_vix else 'Without'} VIX"
        print(f"\n>>> Processing {ctx_base}")

        df_feat = create_features(df_raw, ticker, use_vix)

        # EDA
        adf_test(df_feat['Close'], f"{ticker} Close")
        adf_test(df_feat['Return'], f"{ticker} Return")
        plot_corr_heatmap(df_feat, ticker, use_vix)

        # Regime-aware modeling
        if use_vix:
            low_df, high_df = split_by_regime(df_feat)
            for df_reg, label in [(low_df, 'LowVIX'), (high_df, 'HighVIX')]:
                Xtr, Xte, ytr, yte, dates = prepare_data(df_reg, True)
                ctx = f"{ctx_base} - {label}"

                # ✅ FIX: use df_reg.index instead of df_feat.index
                split_date = df_reg.index[int(0.8 * len(df_reg))]
                plot_train_test_split(df_reg, split_date, ticker, use_vix)

                results.append(run_rf(Xtr, Xte, ytr, yte, dates, ctx))
                results.append(run_xgb(Xtr, Xte, ytr, yte, dates, ctx))
                results.append(run_lstm(Xtr, Xte, ytr, yte, dates, ctx))
        else:
            Xtr, Xte, ytr, yte, dates = prepare_data(df_feat, False)
            ctx = ctx_base

            split_date = df_feat.index[int(0.8 * len(df_feat))]  # ✅ Correct
            plot_train_test_split(df_feat, split_date, ticker, use_vix)

            results.append(run_rf(Xtr, Xte, ytr, yte, dates, ctx))
            results.append(run_xgb(Xtr, Xte, ytr, yte, dates, ctx))
            results.append(run_lstm(Xtr, Xte, ytr, yte, dates, ctx))


In [None]:
# Example for AAPL with VIX
ticker = 'AAPL'
use_vix = True

# Recreate feature set
df_feat = create_features(df_raw, ticker, use_vix)

# Drop NA (if needed) and calculate 80/20 split
split_date = df_feat.index[int(0.8 * len(df_feat))]

# Call the plot function
plot_return_split_and_vix_regime(df_feat, split_date, ticker, use_vix)