In [None]:
!pip install keras-tuner
# import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller

import tensorflow as tf
import keras_tuner as kt
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import SimpleRNN, GRU, LSTM, Dense, Dropout
from keras.layers import *
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import backend as K

from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import mean_absolute_error as mae
from sklearn.model_selection import train_test_split

import numpy as np

In [None]:
# import data - 30 years from 1993/10/01 to 2023/10/01
url = "https://raw.githubusercontent.com/WithAnOrchid0513/VolData/main/SPY_data.csv"
df = pd.read_csv(url, index_col = 'Date', parse_dates = True)
df.head()

In [None]:
# stationary check with Augmented Dickey–Fuller test
def adf(x):
  res = adfuller(x)
  print("Test-Statistic:", res[0])
  print("P-Value:", res[1])
  if res[1] < 0.05:
    print("Stationary")
  else:
    print("Non-Stationary")

In [None]:
# check close price
adf(df.Close)

In [None]:
# calculate log returns
df['log_returns'] = np.log(df.Close/df.Close.shift(1))
df.dropna(inplace=True)

In [None]:
# check log returns
adf(df['log_returns'])

# LSTM

In [None]:
WINDOW_SIZE = 11
# convert data to 22-day np array for LSTM prediction
def windowed_dataset(x_series, y_series, lookback_window):
    dataX, dataY = [], []
    for i in range((lookback_window-1), len(x_series)):
        from_idx = x_series.index[i-lookback_window+1]
        to_idx = x_series.index[i]
        a = x_series[from_idx:to_idx].values
        dataX.append(a)
        dataY.append(y_series[to_idx])

    return np.array(dataX), np.array(dataY)

In [None]:
# calculate realized volatility
def realized_volatility_daily(series_log_return):
    return np.sqrt(np.sum(series_log_return**2)/(WINDOW_SIZE-1))

In [None]:
n_future = 22
# get current volatility
df['vol_current'] = df.log_returns.rolling(window=WINDOW_SIZE)\
                                   .apply(realized_volatility_daily)

# get future volatility
df['vol_future'] = df.log_returns.shift(n_future).rolling(window=WINDOW_SIZE)\
                                 .apply(realized_volatility_daily)

In [None]:
df.dropna(inplace=True)

In [None]:
# Base model
#train test split, 27 years training, 3 years testing
test_size = 504
split_time_1 = 0
split_time_2 = len(df) - test_size
split_time_3 = 7521
train_idx = df.index[split_time_1:split_time_2]
test_idx = df.index[split_time_2:split_time_3]

# Financial Crisis period
# split_time_1 = 800
# split_time_2 = split_time_1 + 2520
# split_time_3 = split_time_2 + 504
# train_idx = df.index[split_time_1:split_time_2]
# test_idx = df.index[split_time_2:split_time_3]

# COVID1 10yrs training
# split_time_1 = 4088
# split_time_2 = split_time_1 + 2520
# split_time_3 = split_time_2 + 504
# train_idx = df.index[split_time_1:split_time_2]
# test_idx = df.index[split_time_2:split_time_3]

# COVID2 15 yrs training
# split_time_1 = 2828
# split_time_2 = split_time_1 + 3780
# split_time_3 = split_time_2 + 504
# train_idx = df.index[split_time_1:split_time_2]
# test_idx = df.index[split_time_2:split_time_3]

In [None]:
print(f'Training \tFrom: {train_idx[0]} \tto {train_idx[-1]} \t{len(train_idx)} days')
print(f'Test \t\tFrom: {test_idx[0]} \tto {test_idx[-1]} \t{len(test_idx)} days')

In [None]:
# split train test
y_train = df.vol_future[train_idx]
y_test = df.vol_future[test_idx]
x_train = df.vol_current[train_idx]
x_test = df.vol_current[test_idx]

In [None]:
# a plot of log_return and volatility
plt.figure(figsize=(20,7))

plt.plot(df.log_returns, color='gray', label='Daily Log Returns', alpha=0.4)

plt.plot(y_train, color='blue', label='Training Volatility')
plt.plot(y_test, color='green', label='Test Volatility')

plt.plot()
plt.title(f'TRAIN TEST SPLIT \n(Daily Volatility Using {WINDOW_SIZE}-Day Interval)', fontsize=15)
plt.legend()
plt.show();

In [None]:
# the forecasting function
n_past = 22

def val_forecast(model):
    forecast = []
    idx = df.index

    for i in range(len(test_idx)):
        # get the data at the previous n_past (22) time steps
        from_idx = idx[split_time_2 + i - n_past + 1]
        to_idx = idx[split_time_2 + i]
        # make prediction
        pred = model.predict(df.vol_current[from_idx:to_idx].values[np.newaxis])
        forecast.append(pred)

    forecast = np.array(forecast)[:, 0, 0]
    preds = pd.Series(forecast, index=test_idx)
    return preds

In [None]:
# record model error
perf_df = pd.DataFrame(columns=['Model', 'Validation RMSPE', 'Validation RMSE', 'Validation MAE'])
def log_perf(y_true, y_pred, model_name):
    perf_df.loc[len(perf_df.index)] = [model_name,
                                       RMSPE(y_true, y_pred),
                                       RMSE(y_true, y_pred),
                                       MAE(y_true, y_pred)]
    return perf_df

# define Root Mean Squared Percentage Error function
def RMSPE(y_true, y_pred):
    output = np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))
    return output


# define Root Mean Squared Error function
def RMSE(y_true, y_pred):
    output = np.sqrt(mse(y_true, y_pred))
    return output

def MAE(y_true, y_pred):
    output = mae(y_true, y_pred)
    return output

# rmspe for tensorflow
def rmspe(y_true, y_pred):
    output = ksqrt(kmean(ksquare((y_true - y_pred) / y_true)))
    return output

# plotting model predictions vs. target values
def viz_model(y_true, y_pred, model_name):
    plt.figure(figsize=(20,7))
    plt.plot(y_true, color='blue', label='Real Volatility')
    plt.plot(y_pred, color='orange', lw=3, label=f'Forecasted Volatility')

    plt.xlabel('Time')
    plt.ylabel('Volatility')
    plt.legend(loc='best');
    plt.title(f'{model_name} \non Test Data', fontsize=15)
    plt.legend(loc='best');

In [None]:
# these are depreciated in Keras
def ksqrt(x):
    zero = tf.convert_to_tensor(0.0, x.dtype)
    x = tf.maximum(x, zero)
    return tf.sqrt(x)

def kmean(x, axis=None, keepdims=False):
    if x.dtype.base_dtype == tf.bool:
        x = tf.cast(x, backend.floatx())
    return tf.reduce_mean(x, axis, keepdims)

def ksquare(x):
    return tf.square(x)

In [None]:
# initialize for LSTM
tf.keras.backend.clear_session()

batch_size = 64

# reshape the data for LSTM
mat_X_train, mat_y_train = windowed_dataset(x_train, y_train, n_past)
mat_X_test, mat_y_test = windowed_dataset(x_test, y_test, n_past)

In [None]:
# LSTM model
def build_model(hp):
    # Dropout setup
    dropout_rate = hp.Float('dropout_rate', min_value=0.1, max_value=0.3, step=0.1)

    # a two layered model, with a potential third layer
    model = tf.keras.models.Sequential([
        tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1), input_shape=[None]),
    ])

    model.add(tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(
            units=hp.Int('units_layer_1', min_value=64, max_value=128, step=32),
            return_sequences=True)
    ))
    model.add(tf.keras.layers.Dropout(rate=dropout_rate))

    model.add(tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(
            units=hp.Int('units_layer_2', min_value=32, max_value=64, step=32),
            return_sequences=(hp.Int('num_layers', 2, 3) == 3)
        )
    ))
    model.add(tf.keras.layers.Dropout(rate=dropout_rate))

    if hp.Int('num_layers', 2, 3) == 3:
        model.add(tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(
                units=hp.Int('units_layer_3', min_value=16, max_value=32, step=16))
        ))
        model.add(tf.keras.layers.Dropout(rate=dropout_rate))

    # Output layer
    model.add(tf.keras.layers.Dense(1))

    model.compile(
        optimizer=Adam(),
        loss='mse',
        metrics=[rmspe]
    )

    return model

# early stopping
checkpoint_cb = ModelCheckpoint('lstm_1.keras', save_best_only=True, monitor='loss')
early_stopping_cb = EarlyStopping(patience=20, restore_best_weights=True, monitor='val_rmspe', mode='min')

In [None]:
# hyperparameter search
tuner = kt.RandomSearch(
    build_model,
    objective=kt.Objective('val_rmspe', direction='min'),
    max_trials=20,
    executions_per_trial=5,
    directory='lstm_tuning',
    project_name='lstm'
)

tuner.search(
    mat_X_train, mat_y_train,
    epochs=10,
    validation_split = 0.2,
    callbacks=[checkpoint_cb, early_stopping_cb],
)


In [None]:
# record the best permutation
lstm_best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
lstm_best_model = tuner.hypermodel.build(lstm_best_hps)

In [None]:
print(f"""
Best hyperparameters:
units_layer_1: {lstm_best_hps.get('units_layer_1')}
units_layer_2: {lstm_best_hps.get('units_layer_2')}
dropout_rate: {lstm_best_hps.get('dropout_rate')}
num_layers:{lstm_best_hps.get('num_layers')}
""")
if lstm_best_hps.get('num_layers') == 3:
  print(f"""units_layer_3: {lstm_best_hps.get('units_layer_3')}""")

In [None]:
# fit LSTM
lstm_res = lstm_best_model.fit(mat_X_train, mat_y_train, epochs=200, validation_data=(mat_X_test, mat_y_test))

In [None]:
# predict
lstm_preds = val_forecast(lstm_best_model)

In [None]:
# annualize
y_test = y_test * np.sqrt(252)
lstm_preds = lstm_preds * np.sqrt(252)

# store result
log_perf(y_test, lstm_preds, 'LSTM')

In [None]:
#make graph
viz_model(y_test, lstm_preds, "2-Layered LSTM")

In [None]:
# show model
from tensorflow.keras.utils import plot_model
plot_model(lstm_best_model, show_shapes=True)

# Implied Volatility

In [None]:
# load vix as implied volatility data
vix_url = "https://raw.githubusercontent.com/WithAnOrchid0513/VolData/main/VIX_data.csv"
vix = pd.read_csv(vix_url, index_col = 'DATE', parse_dates = True)
vix.tail()

In [None]:
iv = vix.CLOSE[test_idx] * 0.01
iv

In [None]:
# store result
log_perf(y_test, iv, 'Implied Volatility')

In [None]:
# make graph
viz_model(y_test, iv, "Implied Volatility")

# GARCH

In [None]:
pip install arch

In [None]:
from arch import arch_model
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
pd.options.mode.chained_assignment = None

In [None]:
# rolling forecast
rolling_forecasts = []
idx = df.index

# iterate over each time step in the validation set
for i in range(len(test_idx)+n_future):

    if i < n_future:
      idx = train_idx[-i-1]
    else:
      idx = test_idx[i-n_future]
    # scale the data for GARCH convergance issue
    train = df.log_returns[:idx] * 100

    model = arch_model(train, vol='GARCH', p=1, q=1,
                       dist='normal')
    model_fit = model.fit(disp='off')

    # make predictions
    vaR = model_fit.forecast(horizon=n_future,
                             reindex=False).variance.values
    pred = np.sqrt(np.mean(vaR))

    rolling_forecasts.append(pred)

gm_preds = pd.Series(rolling_forecasts, index=df.index[split_time_2-n_future:split_time_3])
# annualize and unscale the data
gm_preds = gm_preds * np.sqrt(252) / 100

In [None]:
gm_preds = gm_preds.shift(n_future)
gm_preds = gm_preds.dropna()

In [None]:
# store result
log_perf(y_test, gm_preds, 'GARCH(1,1)')

In [None]:
# make graph
viz_model(y_test, gm_preds, 'GARCH(1,1)')

# Transformer

In [None]:
# reshape the data
transformer_x_train, transformer_y_train = windowed_dataset(x_train, y_train, n_past)
transformer_x_test, transformer_y_test = windowed_dataset(x_test, y_test, n_past)

transformer_x_train = transformer_x_train.reshape((transformer_x_train.shape[0], transformer_x_train.shape[1], 1))
transformer_x_test = transformer_x_test.reshape((transformer_x_test.shape[0], transformer_x_test.shape[1], 1))

In [None]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):

    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(inputs, inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    res = x + inputs

    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    return x + res

In [None]:
def build_model(hp):
    input_shape = transformer_x_train.shape[1:]
    inputs = keras.Input(shape=input_shape)
    x = inputs

    # Hyperparameters to tune
    head_size = hp.Choice('head_size', values=[64, 128, 256])
    num_heads = hp.Choice('num_heads', values=[2, 4, 8])
    ff_dim = hp.Choice('ff_dim', values=[4, 8, 16, 32])
    num_transformer_blocks = hp.Choice('num_transformer_blocks', values=[4, 8])
    mlp_units = hp.Int('mlp_units', min_value=64, max_value=256, step=64)
    dropout = hp.Float('dropout', 0.1, 0.3, step=0.1)
    mlp_dropout = hp.Float('mlp_dropout', 0.1, 0.3, step=0.1)

    # Build Transformer Blocks
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    x = layers.Dense(mlp_units, activation="relu")(x)
    x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(1, activation="linear")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        loss='mse',
        optimizer=keras.optimizers.Adam(),
        metrics=[rmspe]
    )
    return model

In [None]:
# hyperparameter search
tuner = kt.RandomSearch(
    build_model,
    objective=kt.Objective('val_rmspe', direction='min'),
    max_trials=20,
    executions_per_trial=5,
    directory='transformer_tuning',
    project_name='transformer'
)
callbacks = [keras.callbacks.EarlyStopping(patience=20, restore_best_weights=True)]


In [None]:
tuner.search(
    transformer_x_train,
    transformer_y_train,
    validation_split=0.2,
    epochs=10,
    callbacks=callbacks,
)

In [None]:
# record the best permutation
transformer_best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""Best hyperparameters:
head_size: {transformer_best_hps.get('head_size')}
num_heads: {transformer_best_hps.get('num_heads')}
ff_dim: {transformer_best_hps.get('ff_dim')}
num_transformer_blocks: {transformer_best_hps.get('num_transformer_blocks')}
mlp_units: {transformer_best_hps.get('mlp_units')}
dropout: {transformer_best_hps.get('dropout')}
mlp_dropout: {transformer_best_hps.get('mlp_dropout')}
""")

In [None]:
model = tuner.hypermodel.build(transformer_best_hps)

# fit Transformer
transformer_res = model.fit(
    transformer_x_train,
    transformer_y_train,
    validation_split=0.2,
    epochs=200,
    callbacks=callbacks,
)

In [None]:
# predict
t_preds = val_forecast(model)

In [None]:
t_preds = t_preds * np.sqrt(252)

In [None]:
# make graph
viz_model(y_test, t_preds, "Transformer")

In [None]:
# store result
log_perf(y_test, t_preds, 'Transformer')

In [None]:
fig, axs = plt.subplots(4, 1, figsize=(20, 18))

# labels
labels = ['Realized Volatility', 'Implied Volatility', 'GARCH(1,1) Predicted Volatility', 'LSTM Predicted Volatility', 'Transformer Predicted Volatility']

# First plot: Real vs Implied Volatility
axs[0].plot(y_test, color='blue', label=labels[0])
axs[0].plot(iv, color='purple', label=labels[1])
axs[0].set_title('Realized vs. Implied Volatility', fontsize=20)
axs[0].legend(fontsize=20)

# Second plot: Real vs GARCH(1,1) Forecasted Volatility
axs[1].plot(y_test, color='blue', label=labels[0])
axs[1].plot(gm_preds, color='orange', label=labels[2])
axs[1].set_title('Real vs. GARCH(1,1) Predicted', fontsize=20)
axs[1].legend(fontsize=20)

# Third plot: Real vs LSTM Forecasted Volatility
axs[2].plot(y_test, color='blue', label=labels[0])
axs[2].plot(lstm_preds, color='green', label=labels[3])
axs[2].set_title('Real vs. LSTM Predicted', fontsize=20)
axs[2].legend(fontsize=20)

# Fourth plot: Real vs Transformer Forecasted Volatility
axs[3].plot(y_test, color='blue', label=labels[0])
axs[3].plot(t_preds, color='red', label=labels[4])
axs[3].set_title('Real vs. Transformer Predicted w/ 15 yrs training', fontsize=20)
axs[3].legend(fontsize=20)

for ax in axs:
    ax.set_xlabel('Time', fontsize=20)
    ax.set_ylabel('Volatility', fontsize=20)

plt.tight_layout()

plt.show()

# Additional models: GRU and RNN

In [None]:
#y_test = y_test * np.sqrt(252)
gru_X_train = mat_X_train.reshape((mat_X_train.shape[0], mat_X_train.shape[1], 1))
gru_X_test = mat_X_test.reshape((mat_X_test.shape[0], mat_X_test.shape[1], 1))

In [None]:
def build_gru_model(hp):
    model = Sequential()
    # Dropout setup
    dropout_rate = hp.Float('dropout_rate', min_value=0.1, max_value=0.3, step=0.1)

    for i in range(hp.Int('num_layers', 1, 3)):
        return_sequences = i < hp.Int('num_layers', 1, 3) - 1
        model.add(GRU(
            units=hp.Int(f'gru_units_{i}', min_value=32, max_value=128, step=32),
            return_sequences=return_sequences
        ))
        model.add(tf.keras.layers.Dropout(rate=dropout_rate))

    model.add(Dense(hp.Int('dense_units', 16, 128, step=16), activation='relu'))
    model.add(Dense(1))

    model.compile(
        optimizer=Adam(),
        loss='mse',
        metrics=[rmspe]
    )

    return model

In [None]:
tuner = kt.RandomSearch(
    build_gru_model,
    objective=kt.Objective('val_rmspe', direction='min'),
    max_trials=20,
    executions_per_trial=5,
    directory='gru_tuning',
    project_name='gru_hp'
)

In [None]:
tuner.search(gru_X_train, mat_y_train,
             validation_split = 0.2,
             epochs=10,
             callbacks=[EarlyStopping(patience=5)])

In [None]:
gru = tuner.get_best_models(num_models=1)[0]
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print("Best Hyperparameters:")
for key in best_hps.values.keys():
    print(f"{key}: {best_hps.get(key)}")

In [None]:
gru.fit(gru_X_train, mat_y_train, validation_split=0.2, epochs=200, batch_size=32)

In [None]:
gru_preds = val_forecast(gru)

In [None]:
gru_preds = gru_preds * np.sqrt(252)

# store result
log_perf(y_test, gru_preds, 'GRU')

In [None]:
viz_model(y_test, gru_preds, "GRU")

In [None]:
rnn_X_train = mat_X_train.reshape((mat_X_train.shape[0], mat_X_train.shape[1], 1))

In [None]:
def build_rnn_model(rnn_units=32, dropout_rate=0.2, output_dim=1):
    model = Sequential([
        SimpleRNN(rnn_units, return_sequences=False),
        Dropout(dropout_rate),
        Dense(output_dim)
    ])
    model.compile(optimizer='adam', loss='mse', metrics=[rmspe])
    return model

In [None]:
rnn = build_rnn_model()
rnn.fit(rnn_X_train, mat_y_train, validation_split=0.2, epochs=200, batch_size=32)

In [None]:
rnn_preds = val_forecast(rnn)

In [None]:
rnn_preds = rnn_preds * np.sqrt(252)

# store result
log_perf(y_test, rnn_preds, 'RNN')

In [None]:
viz_model(y_test, gru_preds, "RNN")