# Problem: Forecasting Stock Prices using Machine Learning Algorithms



In [8]:
# Compare multiple forecasting models using the Diebold-Mariano test
# This script trains ARIMA, ANN, LSTM, EGARCH, GJR-GARCH and XGBoost models
# on the FTSE MIB dataset and performs DM tests on their one-step-ahead
# forecasts. Models are intentionally lightweight so the script remains
# a manageable example.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from scipy.stats import t
from statsmodels.tsa.arima.model import ARIMA
from tensorflow import keras
from arch import arch_model
from xgboost import XGBRegressor
from sklearn.model_selection import TimeSeriesSplit


# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------

def dm_test(actual, pred1, pred2):
    """Return DM statistic and p-value for squared-error loss."""
    e1 = actual - pred1
    e2 = actual - pred2
    d = e1 ** 2 - e2 ** 2
    T = len(d)
    stat = d.mean() / np.sqrt(d.var(ddof=1) / T)
    p = 2 * t.sf(np.abs(stat), df=T - 1)
    return stat, p


def load_data(path="dataftsemib_manual.csv"):
    """Load and clean the FTSE MIB dataset."""
    df = pd.read_csv(path)
    df["Date"] = pd.to_datetime(df["Date"], dayfirst=True)
    for col in ["Price", "Open", "High", "Low"]:
        df[col] = df[col].str.replace(",", "").astype(float)

    def parse_volume(v: str) -> float:
        v = str(v).strip()
        if v.endswith("M"):
            return float(v[:-1].replace(",", "")) * 1e6
        if v.endswith("B"):
            return float(v[:-1].replace(",", "")) * 1e9
        return float(v.replace(",", ""))

    df["Vol."] = df["Vol."].apply(parse_volume).ffill()
    df["Change %"] = (
        df["Change %"].str.replace("%", "").str.replace(",", ".").astype(float)
    )
    df.sort_values("Date", inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df


def rolling_forecast_arima(train, test, order=(5, 1, 0)):
    """One-step rolling forecast with an ARIMA model."""
    fit = ARIMA(train, order=order).fit()
    preds = []
    rolling = fit
    for obs in test:
        pred = rolling.forecast()
        preds.append(pred[0])
        rolling = rolling.append([obs], refit=False)
    return np.array(preds)


def ann_forecast(series, window=60, epochs=20):
    scaler = MinMaxScaler()
    split = int(len(series) * 0.8)
    scaler.fit(series[:split].reshape(-1, 1))
    scaled = scaler.transform(series.reshape(-1, 1))

    X, y = [], []
    for i in range(window, len(scaled)):
        X.append(scaled[i - window : i, 0])
        y.append(scaled[i, 0])
    X = np.array(X)
    y = np.array(y)
    X_train, X_test = X[: split - window], X[split - window :]
    y_train, y_test = y[: split - window], y[split - window :]

    model = keras.Sequential([
        keras.layers.Input(shape=(window,)),
        keras.layers.Dense(64, activation="relu"),
        keras.layers.Dense(32, activation="relu"),
        keras.layers.Dense(1),
    ])
    model.compile(optimizer="adam", loss="mse")
    model.fit(
        X_train,
        y_train,
        epochs=epochs,
        batch_size=32,
        verbose=0,
        validation_split=0.1,
        callbacks=[keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)],
    )
    pred_scaled = model.predict(X_test)
    return scaler.inverse_transform(pred_scaled).flatten(), scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()


def lstm_forecast(series, window=60, epochs=20):
    scaler = MinMaxScaler()
    split = int(len(series) * 0.8)
    scaler.fit(series[:split].reshape(-1, 1))
    scaled = scaler.transform(series.reshape(-1, 1))

    X, y = [], []
    for i in range(window, len(scaled)):
        X.append(scaled[i - window : i])
        y.append(scaled[i, 0])
    X = np.array(X)
    y = np.array(y)
    X_train, X_test = X[: split - window], X[split - window :]
    y_train, y_test = y[: split - window], y[split - window :]

    model = keras.Sequential([
        keras.layers.LSTM(64, return_sequences=True, input_shape=(window, 1)),
        keras.layers.LSTM(64),
        keras.layers.Dense(1),
    ])
    model.compile(optimizer="adam", loss="mse")
    model.fit(
        X_train,
        y_train,
        epochs=epochs,
        batch_size=32,
        verbose=0,
        validation_split=0.1,
        callbacks=[keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)],
    )
    pred_scaled = model.predict(X_test)
    return scaler.inverse_transform(pred_scaled).flatten(), scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()


def egarch_forecast(returns, exog, split_idx, order=(1, 1, 1), dist="t"):
    train_r, test_r = returns[:split_idx], returns[split_idx:]
    exog_train, exog_test = exog[:split_idx], exog[split_idx:]
    history_y = train_r.copy()
    history_x = exog_train.copy()
    ex_arrays = {c: exog_test[c].values for c in exog.columns}
    forecasts = []
    n_fore = min(20, len(test_r))
    for i in range(n_fore):
        am = arch_model(
            history_y,
            x=history_x,
            mean="ARX",
            lags=1,
            vol="EGARCH",
            p=order[0],
            o=order[1],
            q=order[2],
            dist=dist,
            rescale=False,
        )
        res = am.fit(disp="off")
        fc = res.forecast(horizon=1, x={c: ex_arrays[c][i : i + 1] for c in exog.columns})
        forecasts.append(np.log(fc.variance.iloc[-1, 0]))  # if DataFrame
        history_y = pd.concat([history_y, pd.Series([test_r.iloc[i]])], ignore_index=True)
        history_x = pd.concat([history_x, exog_test.iloc[i : i + 1]], ignore_index=True)
    return np.array(forecasts), returns.index[split_idx : split_idx + n_fore]


def gjrgarch_forecast(returns, exog, split_idx, order=(1, 1, 1), dist="t"):
    train_r, test_r = returns[:split_idx], returns[split_idx:]
    exog_train, exog_test = exog[:split_idx], exog[split_idx:]
    history_y = train_r.copy()
    history_x = exog_train.copy()
    ex_arrays = {c: exog_test[c].values for c in exog.columns}
    forecasts = []
    n_fore = min(20, len(test_r))
    for i in range(n_fore):
        am = arch_model(
            history_y,
            x=history_x,
            mean="ARX",
            lags=1,
            vol="GARCH",
            p=order[0],
            o=order[1],
            q=order[2],
            power=2.0,
            dist=dist,
            rescale=False,
        )
        res = am.fit(disp="off")
        fc = res.forecast(horizon=1, x={c: ex_arrays[c][i : i + 1] for c in exog.columns})
        forecasts.append(np.log(fc.variance.values[-1, 0]))
        history_y = pd.concat([history_y, pd.Series([test_r.iloc[i]])], ignore_index=True)
        history_x = pd.concat([history_x, exog_test.iloc[i : i + 1]], ignore_index=True)
    return np.array(forecasts), returns.index[split_idx : split_idx + n_fore]


def xgb_forecast(features, target, split_idx):
    train_X, test_X = features.iloc[:split_idx], features.iloc[split_idx:]
    train_y, test_y = target.iloc[:split_idx], target.iloc[split_idx:]
    model = XGBRegressor(
        n_estimators=400,
        learning_rate=0.05,
        max_depth=3,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="reg:squarederror",
        random_state=42,
    )
    model.fit(train_X, train_y)
    pred = model.predict(test_X)
    return pred, test_y


# ---------------------------------------------------------------------------
# Load data and prepare common series
# ---------------------------------------------------------------------------

df = load_data()
prices = df["Price"].values
split = int(len(prices) * 0.8)

# Returns and volatility features for GARCH and XGBoost
returns = np.log(df["Price"]).diff() * 100
log_var = np.log(returns.pow(2) + 1e-8)
log_vol = np.log(df["Vol."])
price_range = (np.log(df["High"]) - np.log(df["Low"])) * 100
realized = 0.5 * returns.pow(2) + 0.5 * price_range.pow(2)
exog = pd.DataFrame({
    "LogVol": log_vol,
    "RV_lag1": realized.shift(1),
    "ShockLag1": returns.abs().shift(1),
})
features_xgb = pd.DataFrame({
    "logvol_lag1": log_vol.shift(1),
    "ret_lag1": returns.shift(1),
    "rv_lag1": realized.shift(1),
    "range_lag1": price_range.shift(1),
})

df_feat = pd.concat([log_var.shift(-1).rename("log_var"), features_xgb], axis=1).dropna()
features_xgb = df_feat.drop(columns=["log_var"])
target_xgb = df_feat["log_var"]

# Drop NaNs created by differencing
returns = returns.dropna()
log_var = log_var.loc[returns.index]
exog = exog.loc[returns.index].dropna()

# ---------------------------------------------------------------------------
# Model forecasts
# ---------------------------------------------------------------------------

# ARIMA forecast of closing prices
arima_pred = rolling_forecast_arima(prices[:split], prices[split:])
prices_test = prices[split:][: len(arima_pred)]

# ANN forecast
ann_pred, ann_true = ann_forecast(prices, window=60, epochs=10)
ann_pred = ann_pred[-len(arima_pred):]
ann_true = ann_true[-len(arima_pred):]

# LSTM forecast
lstm_pred, lstm_true = lstm_forecast(prices, window=60, epochs=10)
lstm_pred = lstm_pred[-len(arima_pred):]
lstm_true = lstm_true[-len(arima_pred):]

# EGARCH forecast of log variance
egarch_pred, egarch_idx = egarch_forecast(returns, exog, split)
true_lv = log_var.iloc[split:][: len(egarch_pred)]

# GJR-GARCH forecast of log variance
gjr_pred, gjr_idx = gjrgarch_forecast(returns, exog, split)
true_lv_gjr = log_var.iloc[split:][: len(gjr_pred)]

# XGBoost forecast of log variance
xgb_pred, xgb_true = xgb_forecast(features_xgb, target_xgb, split)
xgb_pred = xgb_pred[: len(true_lv)]

# ---------------------------------------------------------------------------
# Diebold-Mariano tests
# ---------------------------------------------------------------------------

print("ARIMA vs ANN")
stat, p = dm_test(prices_test, arima_pred, ann_pred)
print(f"DM statistic: {stat:.4f}, p-value: {p:.4f}")
print()

print("ARIMA vs LSTM")
stat, p = dm_test(prices_test, arima_pred, lstm_pred)
print(f"DM statistic: {stat:.4f}, p-value: {p:.4f}")
print()

print("EGARCH vs XGBoost")
stat, p = dm_test(true_lv.values, egarch_pred, xgb_pred)
print(f"DM statistic: {stat:.4f}, p-value: {p:.4f}")
print()

print("GJR-GARCH vs XGBoost")
stat, p = dm_test(true_lv_gjr.values, gjr_pred, xgb_pred[: len(gjr_pred)])
print(f"DM statistic: {stat:.4f}, p-value: {p:.4f}")

[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 


  super().__init__(**kwargs)


[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 25ms/step
ARIMA vs ANN
DM statistic: -8.4035, p-value: 0.0000

ARIMA vs LSTM
DM statistic: -12.0016, p-value: 0.0000

EGARCH vs XGBoost
DM statistic: 2.3776, p-value: 0.0281

GJR-GARCH vs XGBoost
DM statistic: 2.5569, p-value: 0.0193


# Conclusion