In [1]:
import torch
import torch.nn as nn
import gdown
from google.colab import drive
from google.colab import files
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import xgboost as xgb
import lightgbm as lgb
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
class LSTMModel(nn.Module):
    """
    LSTM-based neural network for time series forecasting.

    Parameters:
    - input_size (int): Number of input features per time step.
    - hidden_size (int, optional): Number of features in the hidden state. Default is 64.
    - num_layers (int, optional): Number of stacked LSTM layers. Default is 2.
    - bidirectional (bool, optional): If True, uses a bidirectional LSTM. Default is False.

    """

    def __init__(self, input_size, hidden_size=64, num_layers=2, bidirectional=False):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional)
        direction_factor = 2 if bidirectional else 1
        self.fc = nn.Linear(hidden_size * direction_factor, 1)

    def forward(self, x):
      """
      Process:
        1. Pass input through the LSTM layers, which returns outputs for all time steps.
        2. Extract the output corresponding to the last time step of the sequence (out[:, -1, :]).
        3. Pass this final output through a fully connected layer to produce the prediction.
        4. Use squeeze() to remove any singleton dimensions, resulting in a 1D tensor.

      """
      out, _ = self.lstm(x)
      out = out[:, -1, :]
      return self.fc(out).squeeze()

In [4]:
class TimeSeriesDataset(Dataset):
  """
  Dataset wraps input and target sequnces for the DataLoader.
  """
  def __init__(self, X, y):
      self.X = torch.tensor(X, dtype=torch.float32)
      self.y = torch.tensor(y, dtype=torch.float32)

  def __len__(self):
    """
    Returns the number of samples in the dataset.
    """
    return len(self.X)

  def __getitem__(self, idx):
    """
    Returns the input and target sequences for a given index.
    """
    return self.X[idx], self.y[idx]

In [5]:
def evaluate_model(model, dataloader, scaler, target_index):
  """
  Evaluate the performance of the trained LSTM model on the given dataset.

  Parameters:
  1. model : nn.Module
    The trained LSTM model.

  2. dataloader : DataLoader
    The DataLoader containing the evaluation data.

  3. scaler : MinMaxScaler
    The MinMaxScaler used to scale the data.

  4. target_index (int):
    The index of the target column in the dataset.

  Returns None.
  prints RSME MAE R^2 Score and plots the actual vs predicted values.
  """
  model.eval()
  predictions, actuals = [], []
  with torch.no_grad():
      for batch_X, batch_y in dataloader:
          outputs = model(batch_X).unsqueeze(1)
          predictions.append(outputs.numpy())
          actuals.append(batch_y.numpy())

  predictions = np.concatenate(predictions).reshape(-1, 1)
  actuals = np.concatenate(actuals).reshape(-1, 1)

In [None]:
def preprocess_and_create_sequences(data: pd.DataFrame, target_column: str, sequence_length: int, forecast_days: int):
    """
    To preprocess the input dataframe and create sequences for time series modeling.

    Parameters:
    1. data (pd.DataFrame): dataframe containing all the features and target columns.
    2. target_column (str): name of column to be forecasted
    3. sequence_length (int): number of past time steps in each input sequence
    4. forecast_days (int): number of future days to forecast (output sequence length)

    Returns:
    1. xs (np.ndarray): array of input sequences of shape (num_sequences, sequence_length, num_features)
    2. ys (np.ndarray): array of target sequences of shape (num_sequences, forecast_days)
    3. scaler (MinMaxScaler): MinMaxScaler object used to scale the data

    """
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data)

    xs, ys = [], []
    for i in range(len(scaled_data) - sequence_length - forecast_days + 1):
        x = scaled_data[i:i+sequence_length, :]
        y = scaled_data[i+sequence_length:i+sequence_length+forecast_days, data.columns.get_loc(target_column)]
        xs.append(x)
        ys.append(y)

    return np.array(xs), np.array(ys), scaler

In [None]:
def train_model(model, dataloader, criterion, optimizer, epochs=50):
  """
  Train the LSTM model on the given dataset.

  Parameters:
  1. model : nn.Module
    The LSTM model to be trained.

  2. dataloader : DataLoader
    The DataLoader containing the training data.

  3. criterion : nn.Module
    The loss function to be used during training.

  4. optimizer : torch.optim.Optimizer
    The optimizer to be used for updating the model's parameters.

  5. epochs (int, optional):
    The number of training epochs. Default is 50.

  Returns None.
  """
  model.train()
  for epoch in range(epochs):
      epoch_loss = 0
      for batch_X, batch_y in dataloader:
          optimizer.zero_grad()
          outputs = model(batch_X)
          loss = criterion(outputs, batch_y.squeeze())
          loss.backward()
          optimizer.step()
          epoch_loss += loss.item()
      print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss/len(dataloader):.4f}")

In [None]:
def train_lstm_model(
    data: pd.DataFrame,
    target_column: str,
    sequence_length: int = 30,
    forecast_days: int = 1,
    hidden_size: int = 64,
    num_layers: int = 2,
    bidirectional: bool = False,
    batch_size: int = 32,
    learning_rate: float = 0.001,
    epochs: int = 50
):
    # Step 1: Preprocess
    X, y, scaler = preprocess_and_create_sequences(data, target_column, sequence_length, forecast_days)
    X, y = X.to(device), y.to(device)  # ADD THIS CHANGE

    # Step 2: Dataset & Dataloader
    dataset = TimeSeriesDataset(X, y)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Step 3: Model Initialization
    input_size = X.shape[2]
    model = LSTMModel(input_size, hidden_size, num_layers, bidirectional).to(device)
    # model = model.to(device)  # ADD THIS CHANGE

    # Step 4: Loss & Optimizer
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Step 5: Train
    train_model(model, dataloader, criterion, optimizer, epochs)

    return model, scaler

In [None]:
def train_regression_model(df, target_column,
                           use_xgb=True, use_lgb=False, use_rf=False,
                           feature_columns=None, split=0.8, estimators=50,
                           random_state=42):
    """
    Trains XGBoost, LightGBM, and/or Random Forest on the given DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        The full dataset containing features and target.
    target_column : str
        The name of the target column.
    use_xgb : bool
        Whether to train an XGBoost model.
    use_lgb : bool
        Whether to train a LightGBM model.
    use_rf : bool
        Whether to train a Random Forest model.
    feature_columns : list of str, optional
        Which columns to use as features. Defaults to all columns except target.
    split : float
        Fraction of data to use for training (time-based split).
    estimators : int
        Number of estimators/trees for boosting or random forest.
    random_state : int
        Random seed for reproducibility.

    Returns
    -------
    trained_models : dict
        Dictionary with model names as keys and fitted model instances as values.
    scaler : StandardScaler
        The fitted scaler object (for scaling test data the same way).
    """

    if split <= 0 or split >= 1:
        raise ValueError("Split ratio should be between 0 and 1.")

    if sum([use_xgb, use_lgb, use_rf]) == 0:
        raise ValueError("At least one model (XGBoost, LightGBM, Random Forest) must be selected.")

    if feature_columns is None:
        feature_columns = [col for col in df.columns if col != target_column]

    X = df[feature_columns]
    y = df[target_column]

    # Time-based train/test split
    split_index = int(len(df) * split)
    X_train, X_test = X.iloc[:split_index], X.iloc[split_index:]
    y_train, y_test = y.iloc[:split_index], y.iloc[split_index:]

    # Standardize features
    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns, index=X_train.index)
    X_test_scaled = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns, index=X_test.index)

    trained_models = {}

    if use_xgb:
        xgb_model = xgb.XGBRegressor(objective='reg:squarederror',
                                     n_estimators=estimators,
                                     random_state=random_state)
        xgb_model.fit(X_train_scaled, y_train)
        trained_models['xgboost'] = xgb_model

    if use_lgb:
        lgb_model = lgb.LGBMRegressor(objective='regression',
                                      n_estimators=estimators,
                                      random_state=random_state)
        lgb_model.fit(X_train_scaled, y_train)
        trained_models['lightgbm'] = lgb_model

    if use_rf:
        rf_model = RandomForestRegressor(n_estimators=estimators,
                                         random_state=random_state,
                                         n_jobs=-1)
        rf_model.fit(X_train_scaled, y_train)
        trained_models['random_forest'] = rf_model

    return trained_models, scaler


In [None]:
def evaluate(model, X_test, y_test):
    y_pred = model.predict(X_test)

    if len(y_test) == 0 or np.all(y_test == y_test[0]):
        return y_pred, {'MAE': np.nan, 'RMSE': np.nan, 'R^2': np.nan}

    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)

    return y_pred, {'MAE': mae, 'RMSE': rmse, 'R^2': r2}


In [None]:
def plot_evaluate_boost(
    data_df,
    target_col,
    trained_models,
    scaler,
    index_ranges,
    context_len,
    forecast_horizon=1,
    step_size=1,
    plot=True
):
    """
    Evaluates and optionally plots forecasts of pre-trained models over specified date ranges using a sliding window approach.

    Parameters
    ----------
    data_df : pd.DataFrame
        The full dataset containing features and the target variable.
        The index of this DataFrame **must be a pd.DatetimeIndex** representing the dates.
    target_col : str
        The name of the target variable column in data_df.
    trained_models : dict
        A dictionary of trained models, where keys are model names (str) and values are model objects.
        **These models must be trained beforehand using the `train_boost_model` function.**
    scaler : sklearn.preprocessing.StandardScaler
        The scaler fitted on the training data used to transform features before prediction.
    index_ranges : list of tuples
        List of tuples specifying date ranges to evaluate, e.g. [(start_date1, end_date1), (start_date2, end_date2)],
        where each date is a pd.Timestamp or a string convertible to datetime.
        The function converts these date ranges internally to integer index slices.
    context_len : int
        The number of time steps (days) to use as the historical context window for making forecasts.
    forecast_horizon : int, optional, default=1
        The number of future time steps (days) to predict.
    step_size : int, optional, default=1
        Step size to move the sliding window forward between evaluations.
    plot : bool, optional, default=True
        If True, plots actual vs predicted values with shaded regions indicating context and forecast periods.

    Returns
    -------
    results : dict
        A dictionary with keys as tuples (start_idx, end_idx, model_name) and values as dictionaries
        containing lists of evaluation metrics ('MAE', 'RMSE', 'R^2') over the sliding windows.

    Notes
    -----
    - This function expects `data_df` to have a DatetimeIndex and uses date ranges (converted to indices)
      to slice the data for evaluation.
    - The models provided in `trained_models` must already be fitted; this function does not perform any training.
    - The scaler must be consistent with the data the models were trained on.
    - The sliding window uses the context length as the training window and then evaluates the forecast horizon.
    - The plotting includes informative titles with date ranges, context length, and forecast horizon.
    """

    # Convert date ranges to integer index ranges
    index_all = data_df.index  # DatetimeIndex
    index_ranges_int = []
    for start_date, end_date in index_ranges:
        start_date = pd.to_datetime(start_date)
        end_date = pd.to_datetime(end_date)

        if start_date not in index_all or end_date not in index_all:
            raise ValueError(f"Start or end date ({start_date}, {end_date}) not found in data_df index.")

        start_idx = index_all.get_loc(start_date)
        end_idx = index_all.get_loc(end_date) + 1  # inclusive range, so +1 for Python slicing

        index_ranges_int.append((start_idx, end_idx))

    X_all = data_df.drop(columns=[target_col]).values
    y_all = data_df[target_col].values

    results = {}

    for (start_idx, end_idx) in index_ranges_int:
        print(f"\nEvaluating on rows {start_idx} to {end_idx} (dates {index_all[start_idx].date()} to {index_all[end_idx-1].date()}):")

        for model_name, model in trained_models.items():
            print(f"\nModel: {model_name}")
            metrics_all = {'MAE': [], 'RMSE': [], 'R^2': []}

            for i in range(start_idx, end_idx - context_len - forecast_horizon + 1, step_size):
                train_start = i
                train_end = i + context_len
                test_start = train_end
                test_end = test_start + forecast_horizon

                X_test = X_all[test_start:test_end]
                y_test = y_all[test_start:test_end]

                X_test_scaled = scaler.transform(X_test)
                y_pred, metrics = evaluate(model, X_test_scaled, y_test)

                for key in metrics:
                    metrics_all[key].append(metrics[key])

                if plot:
                    plt.figure(figsize=(12, 5))

                    # Convert sample indices to dates
                    forecast_dates = index_all[test_start:test_end]
                    context_dates = index_all[train_start:train_end]

                    # Plot actual vs predicted
                    plt.plot(forecast_dates, y_test, label='Actual', marker='o')
                    plt.plot(forecast_dates, y_pred, label=f'{model_name} Prediction', marker='x')

                    # Highlight context and forecast regions
                    plt.axvspan(context_dates[0], context_dates[-1], color='lightblue', alpha=0.3, label='Context Range')
                    plt.axvspan(forecast_dates[0], forecast_dates[-1], color='lightgreen', alpha=0.3, label='Forecast Horizon')

                    # Title and labels
                    plt.title(
                        f'{model_name} Forecast\n'
                        f'Context Length: {context_len} | Forecast Horizon: {forecast_horizon}\n'
                        f'Train: {context_dates[0].date()} → {context_dates[-1].date()} | '
                        f'Forecast: {forecast_dates[0].date()} → {forecast_dates[-1].date()}'
                    )
                    plt.xlabel('Date')
                    plt.ylabel(target_col)
                    plt.legend()
                    plt.grid(True)
                    plt.tight_layout()
                    plt.show()

            print("Average Metrics:")
            for key in metrics_all:
                print(f"{key}: {np.mean(metrics_all[key]):.4f}")

            results[(start_idx, end_idx, model_name)] = metrics_all

    return results


In [None]:
def evaluate_and_plot_nn_forecast(
    model,
    test_loader,
    scaler,
    test_dates,
    plot_start_date,
    plot_end_date,
    target_name="Target",
    plot=True
):
    """
    Evaluate a trained PyTorch neural network model on test data and plot forecast results over a specified date range.

    Parameters
    ----------
    model : torch.nn.Module
        Trained PyTorch model.
    test_loader : torch.utils.data.DataLoader
        DataLoader yielding batches of (features, targets) for the test set.
    scaler : sklearn.preprocessing scaler
        Fitted scaler used to inverse transform the target values.
    test_dates : pd.DatetimeIndex
        The datetime index corresponding to all samples in the test set (must match order in test_loader).
    plot_start_date : pd.Timestamp or str
        Start date of the range to plot.
    plot_end_date : pd.Timestamp or str
        End date of the range to plot.
    target_name : str, optional
        Name of the target variable for labeling (default "Target").
    plot : bool, optional
        Whether to plot the actual vs predicted data for the date range (default True).

    Returns
    -------
    dict
        Dictionary with 'MAE', 'RMSE', and 'R2' metrics computed over the selected date range.

    ----
    NOTES:
    -MAKE SURE TO INPUT A TRAINED NN Model
    """

    model.eval()
    predictions = []
    actuals = []

    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            output = model(batch_X)
            predictions.append(output.squeeze().numpy())
            actuals.append(batch_y.numpy())

    # Flatten all batches
    predictions = np.concatenate(predictions)
    actuals = np.concatenate(actuals)

    # Inverse scale
    pred_original = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()
    actual_original = scaler.inverse_transform(actuals.reshape(-1, 1)).flatten()

    # Select indices in test_dates within the plotting range
    if isinstance(plot_start_date, str):
        plot_start_date = pd.to_datetime(plot_start_date)
    if isinstance(plot_end_date, str):
        plot_end_date = pd.to_datetime(plot_end_date)

    mask = (test_dates >= plot_start_date) & (test_dates <= plot_end_date)
    indices = np.where(mask)[0]

    if len(indices) == 0:
        raise ValueError("No data points found in the specified date range.")

    y_true = actual_original[indices]
    y_pred = pred_original[indices]
    dates = test_dates[indices]

    # Metrics
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2 = r2_score(y_true, y_pred)

    if plot:
        plt.figure(figsize=(12, 6))
        plt.plot(dates, y_true, label="Actual", marker='o', color='black')
        plt.plot(dates, y_pred, label="Prediction", marker='x', color='blue')
        plt.title(f"{target_name} Forecast from {plot_start_date.date()} to {plot_end_date.date()}")
        plt.xlabel("Date")
        plt.ylabel(target_name)
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    print(f"Evaluation Metrics for {plot_start_date.date()} to {plot_end_date.date()}:")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"R2 Score: {r2:.4f}")

    return {"MAE": mae, "RMSE": rmse, "R2": r2}


In [None]:
# FOR PLOTTING BOOSTING MODELS
# -> TRAIN MODEL using train_boost_model
# -> call plot_evaluate_boost using trained model and appropriate arguments

# FOR PLOTTING NN MODELS
# -> TRAIN MODEL using train_lstm_model
# -> call evaluate_and_plot_nn_forecast using approriate arguments