In [None]:
# Loading Libraries and Visualizing Data
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dropout, Dense, Conv1D, MaxPooling1D, Bidirectional
from tensorflow.keras.metrics import RootMeanSquaredError
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from ta.momentum import RSIIndicator

random.seed(7)
np.random.seed(7)
tf.compat.v1.set_random_seed(7)
tf.random.set_seed(7)

os.chdir("C:/Programming/Stock prediction") #set your own working directory

# load & preprocess data function
def load_stock_data(filename="AAPL.csv"):
    df = pd.read_csv(filename)
    df['date'] = pd.to_datetime(df['date'])
    df['log_return'] = np.log(df['close'] / df['close'].shift(1))
    df['volatility_5d'] = np.log(df['close'] / df['close'].shift(1)).rolling(window=5).std()
    df['volatility_21d'] = np.log(df['close'] / df['close'].shift(1)).rolling(window=21).std()
    df = df.dropna().reset_index(drop=True)
    return df

def plot_returns_and_volatility(df):
    plt.figure(figsize=(12, 6))
    plt.plot(df['date'], df['log_return'], label="Daily Log Return", color='skyblue', alpha=0.7)
    plt.title("Apple Daily Log Returns")
    plt.xlabel("Date")
    plt.ylabel("Log Return")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(12, 6))
    plt.plot(df['date'], df['volatility_5d'], label="5-Day Volatility", color='blue', alpha=0.8)
    plt.plot(df['date'], df['volatility_21d'], label="21-Day Volatility",color='orange', alpha=0.8)
    plt.title("Realized Volatility (5-day vs 21-day)")
    plt.xlabel("Date")
    plt.ylabel("Volatility")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

df = load_stock_data("AAPL.csv")
plot_returns_and_volatility(df)


In [None]:
# CNN-BiLSTM hybrid model
# feature engineering function
def add_features(df):
    df = df.copy()
    df['log_return'] = np.log(df['close'] / df['close'].shift(1))
    df['lag1'] = df['log_return'].shift(1)
    df['lag2'] = df['log_return'].shift(2)
    df['r2'] = df['log_return']**2
    df['z_log_return'] = (df['log_return'] - df['log_return'].rolling(21).mean()) / df['log_return'].rolling(21).std()
    df['price_shock'] = (df['log_return'] > 2 * df['log_return'].rolling(21).std()).astype(int)
    df['rsi'] = RSIIndicator(close=df['close'], window=14).rsi()
    df['log_volume_change'] = np.log(df['volume'] / df['volume'].shift(1))
    df['log_range'] = np.log(df['high'] / df['low'])
    df['vol_5'] = df['log_return'].rolling(window=5).std()
    df['volatility_7'] = df['log_return'].rolling(window=7).std()
    df['volatility_21'] = df['log_return'].rolling(window=21).std()
    df['future_vol'] = df['log_return'].rolling(window=5).std().shift(-5)    
    return df.dropna().reset_index(drop=True)

data_cl = add_features(df)

# sequence generator
def create_sequences(train_x_scaled_cl,
                     train_y_scaled_cl,
                     seq_length):
    train_seq_x_cl, train_seq_y_cl = [], []
    for i in range(seq_length, len(train_x_scaled_cl)):
        train_seq_x_cl.append(train_x_scaled_cl[i - seq_length:i])
        train_seq_y_cl.append(train_y_scaled_cl[i])
    return np.array(train_seq_x_cl), np.array(train_seq_y_cl)

# build model
def build_model_cl(input_shape_cl):
    model_cl = Sequential([
        Conv1D(filters=128, kernel_size=4, input_shape=input_shape_cl),
        MaxPooling1D(pool_size=3),
        Bidirectional(LSTM(80, return_sequences=True, input_shape=input_shape_cl)),
        Dropout(0.2),
        Bidirectional(LSTM(120, return_sequences=True)),
        Dropout(0.2),
        LSTM(200),
        Dropout(0.2),
        Dense(64),      
        Dense(1)])
    return model_cl

# compile model
def compile_model(model_cl):
    model_cl.compile(optimizer='adam',
                    loss='mean_squared_error',
                    metrics=[RootMeanSquaredError()])
    return model_cl

# load and process dataset
seq_length = 30
x_cl = ['log_return','lag1', 'lag2', 'r2',
    'z_log_return', 'price_shock','rsi',
    'log_volume_change', 'log_range',
    'vol_5', 'volatility_7', 'volatility_21']
y_cl = 'future_vol'

# split data
num_samples_cl = len(data_cl)
train_cutoff_cl = int(num_samples_cl * 0.6)
test_cutoff_cl = int(num_samples_cl * 0.8)

train_data_cl = data_cl.iloc[:train_cutoff_cl]
test_data_cl = data_cl.iloc[train_cutoff_cl:test_cutoff_cl]
out_of_sample_data_cl = data_cl.iloc[test_cutoff_cl:]

train_y_cl = train_data_cl[[y_cl]].values
test_y_cl = test_data_cl[[y_cl]].values

# scale features and target
x_scaler_cl = MinMaxScaler()
y_scaler_cl = MinMaxScaler()

train_x_scaled_cl = x_scaler_cl.fit_transform(train_data_cl[x_cl])
test_x_scaled_cl = x_scaler_cl.transform(test_data_cl[x_cl])
train_y_scaled_cl = y_scaler_cl.fit_transform(train_y_cl)
test_y_scaled_cl = y_scaler_cl.transform(test_y_cl)

# create training sequences
train_seq_x_cl, train_seq_y_cl = create_sequences(train_x_scaled_cl,
                                                  train_y_scaled_cl,
                                                  seq_length)
train_seq_x_cl = train_seq_x_cl.reshape((-1, seq_length, len(x_cl)))

# model training
input_shape_cl = (seq_length, len(x_cl))
model_cl = build_model_cl(input_shape_cl)
model_cl = compile_model(model_cl)

early_stop_cl = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr_cl = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1)

history_cl = model_cl.fit(
    train_seq_x_cl, train_seq_y_cl,
    epochs=100,
    batch_size=32,
    validation_split=0.1,
    callbacks=[early_stop_cl, reduce_lr_cl],
    verbose=1)

model_cl.save('cnn_lstm_volatility_model.keras')

# plot training history
plt.figure(figsize=(10, 5))
plt.plot(history_cl.history['loss'], label='Training Loss', linewidth=2)
plt.plot(history_cl.history['val_loss'], label='Validation Loss', linestyle='--', linewidth=2)
plt.title('Training vs. Validation Loss (Volatility Forecasting)')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# CNN-BiLSTM Model Testing and Prediction
def generate_test_sequences(train_x_scaled_cl,
                            test_x_scaled_cl,
                            test_y_scaled_cl,
                            seq_length):
    combined_cl = np.concatenate([train_x_scaled_cl[-seq_length:], test_x_scaled_cl],
                                       axis=0)
    
    x_test_cl, y_test_scaled_cl = [], []
    for i in range(seq_length, len(combined_cl)):
        x_test_cl.append(combined_cl[i - seq_length:i])
        y_test_scaled_cl.append(test_y_scaled_cl[i - seq_length])    
    return np.array(x_test_cl), np.array(y_test_scaled_cl)

# create sequences for test data
x_test_cl, y_test_scaled_cl = generate_test_sequences(
    train_x_scaled_cl,
    test_x_scaled_cl,
    test_y_scaled_cl,
    seq_length)

# prediction
y_pred_scaled_cl = model_cl.predict(x_test_cl)

# inverse transform to original scale
y_pred_actual_cl = y_scaler_cl.inverse_transform(y_pred_scaled_cl)
y_test_actual_cl = y_scaler_cl.inverse_transform(y_test_scaled_cl.reshape(-1, 1))

# plotting function
def plot_forecast_vs_actual_cl(y_test_actual_cl, y_pred_actual_cl, date_series_cl):
    plt.figure(figsize=(12, 6))
    plt.plot(date_series_cl, y_test_actual_cl, label="Actual Volatility", color='blue')
    plt.plot(date_series_cl, y_pred_actual_cl, label="Predicted Volatility", color='red')
    plt.title("CNN-BiLSTM Forecast vs. Actual (Test Set)")
    plt.xlabel("Date")
    plt.ylabel("Volatility")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# align dates
aligned_test_dates_cl = test_data_cl['date'].iloc[-len(y_test_actual_cl):].reset_index(drop=True)

# plot the forecast
plot_forecast_vs_actual_cl(y_test_actual_cl.flatten(), y_pred_actual_cl.flatten(), aligned_test_dates_cl)

mae_cl, mse_cl = mean_absolute_error(y_test_actual_cl, y_pred_actual_cl), mean_squared_error(y_test_actual_cl, y_pred_actual_cl)
rmse_cl = np.sqrt(mse_cl)
print(f"CNN-BiLSTM - MAE: {mae_cl:.6f} | MSE: {mse_cl:.6f} | RMSE: {rmse_cl:.6f}")


In [None]:
# CNN-BiLSTM Model Out of Sample Forecasting
def forecast_out_of_sample_cl(model_cl,
                           test_x_scaled_cl, 
                           oos_x_scaled_cl,
                           oos_y_scaled_cl,
                           seq_length, 
                           y_scaler_cl):
    combined_input_cl = np.concatenate([test_x_scaled_cl[-seq_length:],oos_x_scaled_cl], 
                                             axis=0)

    x_oos_cl, y_oos_scaled_cl  = [], []
    for i in range(seq_length, len(combined_input_cl)):
        x_oos_cl.append(combined_input_cl[i - seq_length:i])
        y_oos_scaled_cl.append(oos_y_scaled_cl[i - seq_length])
        
    x_oos_cl = np.array(x_oos_cl)
    y_oos_scaled_cl  = np.array(y_oos_scaled_cl)
    y_pred_scaled_cl = model_cl.predict(x_oos_cl)
    y_pred_cl = y_scaler_cl.inverse_transform(y_pred_scaled_cl)
    y_actual_cl = y_scaler_cl.inverse_transform(y_oos_scaled_cl.reshape(-1, 1))    
    return y_actual_cl, y_pred_cl

x_cl = ['log_return','lag1', 'lag2','r2',
    'z_log_return', 'price_shock','rsi',
    'log_volume_change', 'log_range',
    'vol_5', 'volatility_7', 'volatility_21']
y_cl = 'future_vol'  

# scale out of sample features and target
oos_x_scaled_cl = x_scaler_cl.transform(out_of_sample_data_cl[x_cl])
oos_y_cl = out_of_sample_data_cl[[y_cl]].values
oos_y_scaled_cl = y_scaler_cl.transform(oos_y_cl)

# forecast
y_oos_actual_cl, y_oos_pred_cl = forecast_out_of_sample_cl(
    model_cl,
    test_x_scaled_cl,
    oos_x_scaled_cl,
    oos_y_scaled_cl,
    seq_length,
    y_scaler_cl)

# plot forecast
dates_oos_cl = out_of_sample_data_cl['date'].iloc[seq_length:].reset_index(drop=True)
y_oos_actual_cl = y_oos_actual_cl[:len(dates_oos_cl)]  
y_oos_pred_cl = y_oos_pred_cl[:len(dates_oos_cl)]     

plt.figure(figsize=(12, 6))
plt.plot(dates_oos_cl, y_oos_actual_cl, label='Actual Volatility', color='blue')
plt.plot(dates_oos_cl, y_oos_pred_cl, label='Predicted Volatility', color='red')
plt.title('CNN-BiLSTM Out-of-Sample Forecast')
plt.xlabel('Date')
plt.ylabel('Volatility')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

mae_cl_, mse_cl_ = mean_absolute_error(y_oos_actual_cl, y_oos_pred_cl),mean_squared_error(y_oos_actual_cl, y_oos_pred_cl)
rmse_cl_ = np.sqrt(mse_cl_)
print(f"CNN-BiLSTM - MAE: {mae_cl_:.6f} | MSE: {mse_cl_:.6f} | RMSE: {rmse_cl_:.6f}")

In [None]:
with open("test_metrics_cnn_bilstm.txt", "w") as f:
    f.write(f"CNN-BiLSTM Test Metrics:\n")
    f.write(f"MAE: {mae_cl:.6f}\n")
    f.write(f"MSE: {mse_cl:.6f}\n")
    f.write(f"RMSE: {rmse_cl:.6f}\n")
    
    
with open("oos_metrics_cnn_bilstm.txt", "w") as f:
    f.write(f"CNN-BiLSTM Out-of-Sample Forecast Metrics:\n")
    f.write(f"MAE: {mae_cl_:.6f}\n")
    f.write(f"MSE: {mse_cl_:.6f}\n")
    f.write(f"RMSE: {rmse_cl_:.6f}\n")