# Advanced Time Series Forecasting with Neural Networks and Explainable AI (XAI)

This notebook implements an end‑to‑end deep learning pipeline for **multivariate time series forecasting** using an **LSTM** network and **Explainable AI (XAI)** with **SHAP**.

It includes:
- Programmatic generation of a complex multivariate time series dataset (≥ 1000 samples)
- Data preprocessing (scaling, sequence creation, train/validation/test splits)
- LSTM forecasting model with simple hyperparameter tuning
- Walk‑forward cross‑validation with RMSE, MAE, and MAPE metrics
- SHAP‑based feature importance analysis to interpret the trained model
- Modular, well‑documented Python code (functions with docstrings & comments)


In [None]:
# Toggle between user dataset and synthetic data
USE_USER_DATASET = True


## 1. Imports & Global Configuration

Run this cell first to install/import all required libraries.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple, Dict, List

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

import shap

np.random.seed(42)
tf.random.set_seed(42)

print("TensorFlow version:", tf.__version__)


##  Using Your Provided Dataset Instead of Synthetic Data
The following cell loads your uploaded dataset:
`multivariate_timeseries_dataset.csv`

In [None]:
import pandas as pd

# Load your dataset
df = pd.read_csv('/mnt/data/multivariate_timeseries_dataset.csv')
print('Loaded dataset shape:', df.shape)
display(df.head())


## 2. Synthetic Multivariate Time Series Data Generation

We generate a **multivariate time series** that loosely mimics real‑world patterns such as stock/energy/sensor data:

- `feature1`: noisy sine wave (e.g., cyclical demand)
- `feature2`: noisy cosine wave (e.g., seasonal offset signal)
- `feature3`: linear combination + noise (e.g., engineered feature)
- `target`: weighted combination of features + additional noise (the value we want to forecast)

This approach allows us to know there is a true relationship between features and target while keeping the dataset realistic and non‑trivial.

In [None]:
def generate_synthetic_timeseries(n_samples: int = 2000) -> pd.DataFrame:
    """Generate a synthetic multivariate time series dataset.

    Parameters
    ----------
    n_samples : int
        Number of time steps to generate (must be >= 1000 as per project spec).

    Returns
    -------
    pd.DataFrame
        DataFrame with columns [feature1, feature2, feature3, target].
    """
    time = np.arange(n_samples)

    # Base signals with noise
    feature1 = np.sin(0.02 * time) + np.random.normal(0, 0.15, n_samples)
    feature2 = np.cos(0.02 * time) + np.random.normal(0, 0.15, n_samples)
    feature3 = 0.5 * feature1 + 0.3 * feature2 + np.random.normal(0, 0.1, n_samples)

    # Target depends on current & slightly lagged features (to make forecasting meaningful)
    lag = np.roll(feature1, 1)
    target = 0.4 * feature1 + 0.4 * feature2 + 0.15 * feature3 + 0.2 * lag
    target += np.random.normal(0, 0.1, n_samples)

    df = pd.DataFrame({
        "feature1": feature1,
        "feature2": feature2,
        "feature3": feature3,
        "target": target,
    })

    return df


# Generate and inspect the dataset
df = generate_synthetic_timeseries(n_samples=2000)
display(df.head())
print("Dataset shape:", df.shape)

# Quick visualization of raw series
plt.figure(figsize=(10, 4))
plt.plot(df["target"].values, label="target")
plt.title("Synthetic Target Time Series")
plt.xlabel("Time step")
plt.ylabel("Value")
plt.legend()
plt.tight_layout()
plt.show()


## 3. Preprocessing & Sequence Creation

We now:

1. **Scale** all features to `[0, 1]` using MinMaxScaler.
2. Convert the time series into **supervised learning sequences** for the LSTM.
   - Given a **window** of `seq_len` past time steps of all features, the model predicts the **next target value**.
3. Split the data into **train**, **validation**, and **test** segments while preserving temporal order.

In [None]:
def scale_dataframe(df: pd.DataFrame) -> Tuple[pd.DataFrame, MinMaxScaler]:
    """Scale all columns of the DataFrame to [0, 1].

    Returns the scaled DataFrame and the fitted scaler.
    """
    scaler = MinMaxScaler()
    scaled_values = scaler.fit_transform(df.values)
    scaled_df = pd.DataFrame(scaled_values, columns=df.columns, index=df.index)
    return scaled_df, scaler


def create_sequences(
    df: pd.DataFrame,
    seq_len: int = 30,
) -> Tuple[np.ndarray, np.ndarray]:
    """Create input/output sequences for LSTM.

    Parameters
    ----------
    df : pd.DataFrame
        Scaled multivariate time series including the target column.
    seq_len : int
        Number of past time steps used as input.

    Returns
    -------
    X : np.ndarray
        Input tensor with shape (n_samples - seq_len, seq_len, n_features-1).
    y : np.ndarray
        Target vector with shape (n_samples - seq_len, ).
    """
    values = df.values
    X, y = [], []

    for i in range(len(values) - seq_len):
        # Use all feature columns except the last (target)
        X.append(values[i : i + seq_len, :-1])
        # Predict the target at the next time step
        y.append(values[i + seq_len, -1])

    return np.array(X), np.array(y)


def train_val_test_split(
    X: np.ndarray,
    y: np.ndarray,
    train_ratio: float = 0.7,
    val_ratio: float = 0.15,
) -> Tuple[np.ndarray, ...]:
    """Split sequences into train/validation/test sets while preserving order."""
    n = len(X)
    train_end = int(n * train_ratio)
    val_end = int(n * (train_ratio + val_ratio))

    X_train, y_train = X[:train_end], y[:train_end]
    X_val, y_val = X[train_end:val_end], y[train_end:val_end]
    X_test, y_test = X[val_end:], y[val_end:]

    return X_train, y_train, X_val, y_val, X_test, y_test


# Apply preprocessing
scaled_df, scaler = scale_dataframe(df)
SEQ_LEN = 30
X, y = create_sequences(scaled_df, seq_len=SEQ_LEN)
X_train, y_train, X_val, y_val, X_test, y_test = train_val_test_split(X, y)

X_train.shape, X_val.shape, X_test.shape


## 4. LSTM Model Definition

We create a configurable **LSTM** model. Hyperparameters such as number of units, dropout, and dense units can be tuned later.

In [None]:
def build_lstm_model(
    input_shape: Tuple[int, int],
    lstm_units: int = 64,
    dense_units: int = 32,
    dropout_rate: float = 0.1,
    learning_rate: float = 1e-3,
) -> tf.keras.Model:
    """Build and compile an LSTM forecasting model.

    Parameters
    ----------
    input_shape : tuple
        (seq_len, n_features)
    lstm_units : int
        Number of LSTM units.
    dense_units : int
        Units in the hidden dense layer.
    dropout_rate : float
        Dropout rate after the LSTM layer.
    learning_rate : float
        Learning rate for Adam optimizer.
    """
    model = Sequential([
        LSTM(lstm_units, input_shape=input_shape, return_sequences=False),
        Dropout(dropout_rate),
        Dense(dense_units, activation="relu"),
        Dense(1),  # Forecast next target value
    ])

    model.compile(
        loss="mse",
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        metrics=["mae"],
    )

    return model


def compute_regression_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
    """Compute RMSE, MAE, and MAPE between true and predicted values."""
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100
    return {"rmse": rmse, "mae": mae, "mape": mape}


## 5. Walk‑Forward Cross‑Validation & Hyperparameter Tuning

To evaluate robustness over time, we use **walk‑forward validation**:

1. Split the training+validation portion of the data into `k` folds over time.
2. For each fold, train on all data *up to* that fold and test on the next chunk.
3. Aggregate metrics across folds.

We repeat this process for a small grid of hyperparameters and choose the configuration with the best average RMSE.

In [None]:
def walk_forward_validation(
    X: np.ndarray,
    y: np.ndarray,
    n_splits: int,
    build_model_fn,
    epochs: int = 15,
    batch_size: int = 32,
) -> Dict[str, float]:
    """Perform walk‑forward cross‑validation.

    Parameters
    ----------
    X, y : np.ndarray
        Input sequences and corresponding targets.
    n_splits : int
        Number of temporal folds.
    build_model_fn : callable
        Function that returns a compiled Keras model.
    epochs : int
        Number of epochs for each fold.
    batch_size : int
        Training batch size.
    """
    fold_size = len(X) // n_splits
    metrics_list: List[Dict[str, float]] = []

    for fold in range(n_splits):
        print(f"\n▶ Fold {fold + 1}/{n_splits}")
        end_train = (fold + 1) * fold_size
        start_test = end_train
        end_test = start_test + fold_size

        if start_test >= len(X):
            break

        X_train_fold = X[:end_train]
        y_train_fold = y[:end_train]
        X_test_fold = X[start_test:end_test]
        y_test_fold = y[start_test:end_test]

        model = build_model_fn()
        history = model.fit(
            X_train_fold,
            y_train_fold,
            epochs=epochs,
            batch_size=batch_size,
            verbose=0,
        )

        y_pred_fold = model.predict(X_test_fold, verbose=0).flatten()
        metrics = compute_regression_metrics(y_test_fold, y_pred_fold)
        metrics_list.append(metrics)
        print(f"Fold {fold+1} metrics: RMSE={metrics['rmse']:.4f}, MAE={metrics['mae']:.4f}, MAPE={metrics['mape']:.2f}%")

    # Aggregate metrics
    agg = {
        "rmse": np.mean([m["rmse"] for m in metrics_list]),
        "mae": np.mean([m["mae"] for m in metrics_list]),
        "mape": np.mean([m["mape"] for m in metrics_list]),
    }
    print("\nAverage cross‑validation metrics:")
    print("RMSE={:.4f}, MAE={:.4f}, MAPE={:.2f}%".format(agg["rmse"], agg["mae"], agg["mape"]))

    return agg


# === Hyperparameter search over a small grid ===
input_shape = (X_train.shape[1], X_train.shape[2])

param_grid = [
    {"lstm_units": 32, "dense_units": 16, "dropout": 0.1, "lr": 1e-3},
    {"lstm_units": 64, "dense_units": 32, "dropout": 0.2, "lr": 1e-3},
    {"lstm_units": 64, "dense_units": 64, "dropout": 0.3, "lr": 5e-4},
]

cv_results = []
N_SPLITS = 4

for i, params in enumerate(param_grid):
    print("\n============================")
    print(f"Hyperparameter set {i+1}/{len(param_grid)}: {params}")

    def model_builder():
        return build_lstm_model(
            input_shape=input_shape,
            lstm_units=params["lstm_units"],
            dense_units=params["dense_units"],
            dropout_rate=params["dropout"],
            learning_rate=params["lr"],
        )

    metrics = walk_forward_validation(
        X_train,
        y_train,
        n_splits=N_SPLITS,
        build_model_fn=model_builder,
        epochs=12,
        batch_size=32,
    )

    cv_results.append({"params": params, "metrics": metrics})

# Select best hyperparameters based on RMSE
best_result = min(cv_results, key=lambda r: r["metrics"]["rmse"])
best_params = best_result["params"]
print("\nBest hyperparameters based on CV RMSE:")
print(best_params)
print("Corresponding metrics:", best_result["metrics"])


## 6. Train Final Model & Evaluate on Test Set

We now train a final LSTM model using the **best hyperparameters** found above on the combined **train + validation** data and evaluate it on the **unseen test set**.

In [None]:
# Combine train + validation for final training
X_final_train = np.concatenate([X_train, X_val], axis=0)
y_final_train = np.concatenate([y_train, y_val], axis=0)

final_model = build_lstm_model(
    input_shape=input_shape,
    lstm_units=best_params["lstm_units"],
    dense_units=best_params["dense_units"],
    dropout_rate=best_params["dropout"],
    learning_rate=best_params["lr"],
)

history = final_model.fit(
    X_final_train,
    y_final_train,
    epochs=20,
    batch_size=32,
    validation_data=(X_test, y_test),
    verbose=1,
)

# Evaluate on test set
y_test_pred = final_model.predict(X_test).flatten()
test_metrics = compute_regression_metrics(y_test, y_test_pred)
print("\nTest set metrics:")
for k, v in test_metrics.items():
    print(f"{k.upper()}: {v:.4f}")

# Plot predictions vs actuals on a subset of the test set
plt.figure(figsize=(10, 4))
plt.plot(y_test[:200], label="True")
plt.plot(y_test_pred[:200], label="Predicted", linestyle="--")
plt.title("LSTM Forecast vs True Target (Test Subset)")
plt.xlabel("Time step (relative in test set)")
plt.ylabel("Scaled target value")
plt.legend()
plt.tight_layout()
plt.show()


## 7. Explainable AI (XAI) with SHAP

We apply **SHAP (SHapley Additive exPlanations)** to interpret the trained LSTM model.

Steps:
1. Choose a **background dataset** (a small sample from the training data).
2. Use `shap.DeepExplainer` to compute SHAP values for a subset of sequences.
3. Aggregate absolute SHAP values over time steps to estimate the **global importance of each feature**.

Note: DeepExplainer works best with certain TensorFlow/Keras versions. If you encounter issues, consider installing a compatible SHAP version or using KernelExplainer on a flattened representation.

In [None]:
# Select a small background sample for SHAP (to keep computation reasonable)
background_size = min(200, len(X_final_train))
background = X_final_train[:background_size]

# Select instances to explain from the test set
explain_size = min(200, len(X_test))
to_explain = X_test[:explain_size]

print("Computing SHAP values (this may take a few minutes)...")
explainer = shap.DeepExplainer(final_model, background)
shap_values = explainer.shap_values(to_explain)

# shap_values is a list with one array (regression output)
shap_values_arr = shap_values[0]  # shape: (samples, seq_len, n_features)
print("SHAP values shape:", shap_values_arr.shape)

# Aggregate absolute SHAP values over time and samples to get global feature importance
abs_shap = np.abs(shap_values_arr)
feature_importance = abs_shap.mean(axis=(0, 1))  # mean over samples and time

feature_names = df.columns[:-1]  # exclude target
importance_df = pd.DataFrame({
    "feature": feature_names,
    "mean_abs_shap": feature_importance,
}).sort_values("mean_abs_shap", ascending=False)

print("\nGlobal feature importance based on SHAP:")
display(importance_df)

# Optional: SHAP summary plot (uncomment to visualize)
# shap.summary_plot(shap_values_arr, features=to_explain, feature_names=feature_names)


## 8. Text Summary of Results & XAI Insights

The following cell prints a concise, text‑based report describing:

- Final model architecture and best hyperparameters
- Walk‑forward cross‑validation metrics
- Test‑set performance
- SHAP‑derived feature importance ranking

You can copy this section directly into your project report.

In [None]:
def generate_text_report(
    best_params: Dict,
    cv_results: List[Dict],
    test_metrics: Dict[str, float],
    importance_df: pd.DataFrame,
) -> str:
    """Generate a human‑readable summary of experiment results."""
    avg_rmse = best_result["metrics"]["rmse"]
    avg_mae = best_result["metrics"]["mae"]
    avg_mape = best_result["metrics"]["mape"]

    lines = []
    lines.append("MODEL & TRAINING SUMMARY")
    lines.append("- Final model: Single‑layer LSTM followed by a dense hidden layer and linear output.")
    lines.append(
        f"- Best hyperparameters (from walk‑forward CV): LSTM units={best_params['lstm_units']}, "
        f"Dense units={best_params['dense_units']}, Dropout={best_params['dropout']}, "
        f"Learning rate={best_params['lr']}."
    )
    lines.append(
        f"- Average walk‑forward validation performance: RMSE={avg_rmse:.4f}, MAE={avg_mae:.4f}, MAPE={avg_mape:.2f}% ."
    )
    lines.append(
        f"- Held‑out test set performance: RMSE={test_metrics['rmse']:.4f}, MAE={test_metrics['mae']:.4f}, MAPE={test_metrics['mape']:.2f}% ."
    )
    lines.append("")
    lines.append("XAI / FEATURE IMPORTANCE INSIGHTS (SHAP)")
    lines.append("- Global importance ranking (most to least influential features):")

    for i, row in importance_df.iterrows():
        lines.append(
            f"  {row['feature']}: mean |SHAP| = {row['mean_abs_shap']:.6f}"
        )

    lines.append(
        "- Features with larger mean |SHAP| values have stronger influence on the model's "
        "predictions across time."
    )

    report = "\n".join(lines)
    return report


report_text = generate_text_report(best_params, cv_results, test_metrics, importance_df)
print(report_text)


---
### How to Use This Notebook in Your Project

- Treat Sections **2–7** as the full implementation of the pipeline.
- Use Section **8** as the basis for your written report.
- You can further extend this notebook by:
  - Trying different model architectures (e.g., stacked LSTMs, TCNs)
  - Adding early stopping / learning‑rate schedulers
  - Performing more extensive hyperparameter tuning
  - Exporting the trained model with `model.save()` for deployment.
