In [None]:
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Standalone Model Testing Script\n",
    "\n",
    "This notebook allows you to test a pre-trained PyTorch model (FNN, GRU, or LSTM) from the `vestim` project on a given dataset. \n",
    "\n",
    "**Instructions:**\n",
    "1. Fill in the parameters in the 'Configuration Parameters' cell below.\n",
    "2. Run the cells sequentially."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import torch\n",
    "import torch.nn as nn\n",
    "from torch.utils.data import DataLoader, TensorDataset\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import joblib # For loading scalers\n",
    "from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score\n",
    "import matplotlib.pyplot as plt\n",
    "import os\n",
    "from datetime import datetime"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Model Class Definitions\n",
    "\n",
    "These are the model definitions copied from the `vestim` project to ensure this notebook is self-contained. Make sure these match the definitions used during training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class FNNModel(nn.Module):\n",
    "    def __init__(self, input_size, output_size, hidden_layer_sizes, dropout_prob=0.0):\n",
    "        super(FNNModel, self).__init__()\n",
    "        layers = []\n",
    "        current_dim = input_size\n",
    "        for hidden_dim in hidden_layer_sizes:\n",
    "            layers.append(nn.Linear(current_dim, hidden_dim))\n",
    "            layers.append(nn.ReLU())\n",
    "            if dropout_prob > 0:\n",
    "                layers.append(nn.Dropout(dropout_prob))\n",
    "            current_dim = hidden_dim\n",
    "        layers.append(nn.Linear(current_dim, output_size))\n",
    "        self.network = nn.Sequential(*layers)\n",
    "\n",
    "    def forward(self, x):\n",
    "        # For FNN, if input is (batch, seq_len, features), flatten seq_len*features\n",
    "        if x.ndim == 3:\n",
    "            x = x.view(x.size(0), -1)\n",
    "        return self.network(x)\n",
    "\n",
    "class GRUModel(nn.Module):\n",
    "    def __init__(self, input_size, hidden_units, num_layers, output_size=1, dropout_prob=0.0, device='cpu'):\n",
    "        super(GRUModel, self).__init__()\n",
    "        self.hidden_units = hidden_units\n",
    "        self.num_layers = num_layers\n",
    "        self.device = device\n",
    "\n",
    "        self.gru = nn.GRU(\n",
    "            input_size=input_size,\n",
    "            hidden_size=hidden_units,\n",
    "            num_layers=num_layers,\n",
    "            batch_first=True,\n",
    "            dropout=dropout_prob if num_layers > 1 else 0\n",
    "        )\n",
    "        self.fc = nn.Linear(hidden_units, output_size)\n",
    "\n",
    "    def forward(self, x, h_0=None):\n",
    "        if h_0 is None:\n",
    "            h_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_units).to(self.device)\n",
    "        \n",
    "        out, _ = self.gru(x, h_0)\n",
    "        # Select the output of the last time step\n",
    "        out = self.fc(out[:, -1, :])\n",
    "        return out\n",
    "\n",
    "class LSTMModel(nn.Module):\n",
    "    def __init__(self, input_size, hidden_units, num_layers, device, output_size=1, dropout_prob=0.0):\n",
    "        super(LSTMModel, self).__init__()\n",
    "        self.hidden_units = hidden_units\n",
    "        self.num_layers = num_layers\n",
    "        self.device = device\n",
    "\n",
    "        self.lstm = nn.LSTM(\n",
    "            input_size=input_size,\n",
    "            hidden_size=hidden_units,\n",
    "            num_layers=num_layers,\n",
    "            batch_first=True,\n",
    "            dropout=dropout_prob if num_layers > 1 else 0\n",
    "        )\n",
    "        self.fc = nn.Linear(hidden_units, output_size)\n",
    "\n",
    "    def forward(self, x, h_s=None, h_c=None):\n",
    "        if h_s is None or h_c is None:\n",
    "            h_s = torch.zeros(self.num_layers, x.size(0), self.hidden_units).to(self.device)\n",
    "            h_c = torch.zeros(self.num_layers, x.size(0), self.hidden_units).to(self.device)\n",
    "        \n",
    "        out, _ = self.lstm(x, (h_s, h_c))\n",
    "        # Select the output of the last time step\n",
    "        out = self.fc(out[:, -1, :])\n",
    "        return out"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Configuration Parameters\n",
    "\n",
    "**IMPORTANT:** Update these parameters with your specific paths, column names, and model architecture details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- Configuration Parameters ---\n",
    "\n",
    "# Data Parameters\n",
    "TEST_DATA_PATH = \"path/to/your/test_data.csv\"  # Path to your single test CSV file\n",
    "MODEL_PATH = \"path/to/your/model.pt\"          # Path to your saved .pt or .pth model file\n",
    "SCALER_TARGET_PATH = None # e.g., \"path/to/your/scaler_target.joblib\" or None if no target scaling\n",
    "SCALER_FEATURES_PATH = None # e.g., \"path/to/your/scaler_features.joblib\" or None if no feature scaling\n",
    "OUTPUT_DIR = \".\" # Directory to save predictions CSV and plot (default: current directory)\n",
    "\n",
    "# Feature and Target Columns\n",
    "FEATURE_COLUMNS = ['feature1', 'feature2', 'feature3'] # List of feature column names used for training\n",
    "TARGET_COLUMN = 'target_value'                       # Target column name used for training\n",
    "\n",
    "# If features were scaled, provide the list of columns that were scaled (must match scaler_features)\n",
    "# This is important for inverse transforming if SCALER_FEATURES_PATH is provided.\n",
    "FEATURE_COLUMNS_TO_SCALE = None # e.g., ['feature1', 'feature2'] or None. \n",
    "                                # If None and SCALER_FEATURES_PATH is set, it will try to use all FEATURE_COLUMNS.\n",
    "\n",
    "# Model Parameters\n",
    "MODEL_TYPE = 'LSTM' # Choose from 'LSTM', 'GRU', or 'FNN'\n",
    "LOOKBACK = 20       # Sequence length / lookback window used during training\n",
    "\n",
    "# Common Model Architecture\n",
    "# input_dim will be derived from len(FEATURE_COLUMNS)\n",
    "OUTPUT_DIM = 1      # Typically 1 for regression tasks\n",
    "\n",
    "# FNN Specific Architecture (only fill if MODEL_TYPE is 'FNN')\n",
    "FNN_HIDDEN_DIMS = [128, 64] # List of hidden layer sizes for FNN\n",
    "FNN_DROPOUT_RATE = 0.0      # Dropout rate for FNN\n",
    "\n",
    "# RNN (LSTM/GRU) Specific Architecture (only fill if MODEL_TYPE is 'LSTM' or 'GRU')\n",
    "RNN_HIDDEN_UNITS = 64\n",
    "RNN_NUM_LAYERS = 2\n",
    "RNN_DROPOUT_PROB = 0.0\n",
    "\n",
    "# DataLoader Parameters\n",
    "BATCH_SIZE = 32     # Batch size for testing (can be 1 for sequential processing)\n",
    "\n",
    "# Device Configuration\n",
    "DEVICE = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
    "\n",
    "print(f\"Using device: {DEVICE}\")\n",
    "# --- End of Configuration ---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Helper Functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_sequences(data_df, feature_cols, target_col, lookback):\n",
    "    \"\"\"Creates sequences from a DataFrame.\"\"\"\n",
    "    X_list, y_list = [], []\n",
    "    df_features = data_df[feature_cols].values\n",
    "    df_target = data_df[target_col].values\n",
    "\n",
    "    for i in range(len(data_df) - lookback):\n",
    "        X_list.append(df_features[i:(i + lookback), :])\n",
    "        y_list.append(df_target[i + lookback])\n",
    "    \n",
    "    return np.array(X_list), np.array(y_list)\n",
    "\n",
    "def load_test_data(file_path, feature_cols, target_col, lookback, batch_size):\n",
    "    \"\"\"Loads data, creates sequences, and returns a DataLoader.\"\"\"\n",
    "    try:\n",
    "        df = pd.read_csv(file_path)\n",
    "    except FileNotFoundError:\n",
    "        print(f\"Error: Test data file not found at {file_path}\")\n",
    "        return None\n",
    "    except Exception as e:\n",
    "        print(f\"Error reading test data file: {e}\")\n",
    "        return None\n",
    "\n",
    "    # Ensure all feature and target columns exist\n",
    "    missing_cols = [col for col in feature_cols + [target_col] if col not in df.columns]\n",
    "    if missing_cols:\n",
    "        print(f\"Error: Missing columns in test data: {missing_cols}\")\n",
    "        return None\n",
    "\n",
    "    X_np, y_np = create_sequences(df, feature_cols, target_col, lookback)\n",
    "\n",
    "    if X_np.size == 0 or y_np.size == 0:\n",
    "        print(f\"Error: No sequences created. Check lookback ({lookback}) vs data length ({len(df)}).\")\n",
    "        return None\n",
    "\n",
    "    X_tensor = torch.tensor(X_np, dtype=torch.float32)\n",
    "    y_tensor = torch.tensor(y_np, dtype=torch.float32).unsqueeze(1) # Ensure y is [samples, 1]\n",
    "\n",
    "    dataset = TensorDataset(X_tensor, y_tensor)\n",
    "    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)\n",
    "    return data_loader\n",
    "\n",
    "def load_model_from_path(model_type, model_path, input_dim, output_dim, device, \n",
    "                           fnn_hidden_dims, fnn_dropout, \n",
    "                           rnn_hidden_units, rnn_num_layers, rnn_dropout_prob, lookback_for_fnn_input):\n",
    "    \"\"\"Loads the model based on type and parameters.\"\"\"\n",
    "    if model_type == 'FNN':\n",
    "        # For FNN, input_dim is features * lookback if sequence is flattened\n",
    "        # Assuming the FNNModel handles flattening if x.ndim == 3, \n",
    "        # or expects flattened input if x.ndim == 2.\n",
    "        # The create_sequences function prepares X as (samples, lookback, features)\n",
    "        # So, FNNModel's forward will flatten it. input_size for FNN should be num_features * lookback\n",
    "        model_input_dim = input_dim * lookback_for_fnn_input\n",
    "        model = FNNModel(input_size=model_input_dim, \n",
    "                         output_size=output_dim, \n",
    "                         hidden_layer_sizes=fnn_hidden_dims, \n",
    "                         dropout_prob=fnn_dropout)\n",
    "    elif model_type == 'LSTM':\n",
    "        model = LSTMModel(input_size=input_dim, \n",
    "                          hidden_units=rnn_hidden_units, \n",
    "                          num_layers=rnn_num_layers, \n",
    "                          output_size=output_dim,\n",
    "                          dropout_prob=rnn_dropout_prob, \n",
    "                          device=device)\n",
    "    elif model_type == 'GRU':\n",
    "        model = GRUModel(input_size=input_dim, \n",
    "                         hidden_units=rnn_hidden_units, \n",
    "                         num_layers=rnn_num_layers, \n",
    "                         output_size=output_dim,\n",
    "                         dropout_prob=rnn_dropout_prob, \n",
    "                         device=device)\n",
    "    else:\n",
    "        print(f\"Error: Unknown model type '{model_type}'\")\n",
    "        return None\n",
    "    \n",
    "    try:\n",
    "        model.load_state_dict(torch.load(model_path, map_location=device))\n",
    "        print(f\"Model loaded successfully from {model_path}\")\n",
    "    except FileNotFoundError:\n",
    "        print(f\"Error: Model file not found at {model_path}\")\n",
    "        return None\n",
    "    except Exception as e:\n",
    "        print(f\"Error loading model state_dict: {e}\")\n",
    "        return None\n",
    "        \n",
    "    model.to(device)\n",
    "    model.eval()\n",
    "    return model\n",
    "\n",
    "def calculate_mape(y_true, y_pred):\n",
    "    y_true, y_pred = np.array(y_true), np.array(y_pred)\n",
    "    non_zero_true = y_true != 0\n",
    "    if not np.any(non_zero_true):\n",
    "        return np.nan # Avoid division by zero if all true values are zero\n",
    "    return np.mean(np.abs((y_true[non_zero_true] - y_pred[non_zero_true]) / y_true[non_zero_true])) * 100"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Main Testing Logic"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 1. Load Data\n",
    "print(\"Loading test data...\")\n",
    "test_loader = load_test_data(TEST_DATA_PATH, FEATURE_COLUMNS, TARGET_COLUMN, LOOKBACK, BATCH_SIZE)\n",
    "\n",
    "if test_loader is None:\n",
    "    print(\"Halting execution due to data loading error.\")\n",
    "else:\n",
    "    print(\"Test data loaded successfully.\")\n",
    "\n",
    "    # 2. Load Model\n",
    "    print(\"\\nLoading model...\")\n",
    "    input_dim = len(FEATURE_COLUMNS)\n",
    "    model = load_model_from_path(MODEL_TYPE, MODEL_PATH, input_dim, OUTPUT_DIM, DEVICE,\n",
    "                                 FNN_HIDDEN_DIMS, FNN_DROPOUT_RATE,\n",
    "                                 RNN_HIDDEN_UNITS, RNN_NUM_LAYERS, RNN_DROPOUT_PROB, LOOKBACK)\n",
    "\n",
    "    if model is None:\n",
    "        print(\"Halting execution due to model loading error.\")\n",
    "    else:\n",
    "        # 3. Load Scalers (if provided)\n",
    "        scaler_target = None\n",
    "        scaler_features = None\n",
    "        if SCALER_TARGET_PATH:\n",
    "            try:\n",
    "                scaler_target = joblib.load(SCALER_TARGET_PATH)\n",
    "                print(f\"Target scaler loaded from {SCALER_TARGET_PATH}\")\n",
    "            except Exception as e:\n",
    "                print(f\"Warning: Could not load target scaler from {SCALER_TARGET_PATH}: {e}\")\n",
    "        if SCALER_FEATURES_PATH:\n",
    "            try:\n",
    "                scaler_features = joblib.load(SCALER_FEATURES_PATH)\n",
    "                print(f\"Features scaler loaded from {SCALER_FEATURES_PATH}\")\n",
    "                if FEATURE_COLUMNS_TO_SCALE is None:\n",
    "                    print(f\"FEATURE_COLUMNS_TO_SCALE is None, assuming all FEATURE_COLUMNS were scaled by scaler_features.\")\n",
    "                    _feature_cols_for_scaling = FEATURE_COLUMNS\n",
    "                else:\n",
    "                    _feature_cols_for_scaling = FEATURE_COLUMNS_TO_SCALE\n",
    "            except Exception as e:\n",
    "                print(f\"Warning: Could not load features scaler from {SCALER_FEATURES_PATH}: {e}\")\n",
    "        \n",
    "        # 4. Perform Inference\n",
    "        print(\"\\nPerforming inference...\")\n",
    "        all_predictions = []\n",
    "        all_actuals = []\n",
    "\n",
    "        with torch.no_grad():\n",
    "            for X_batch, y_batch in test_loader:\n",
    "                X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE)\n",
    "                \n",
    "                if MODEL_TYPE == 'FNN':\n",
    "                    # FNNModel's forward handles flattening if X_batch is (batch, seq, features)\n",
    "                    predictions = model(X_batch)\n",
    "                elif MODEL_TYPE in ['LSTM', 'GRU']:\n",
    "                    # RNNs expect (batch, seq_len, features)\n",
    "                    predictions = model(X_batch) \n",
    "                else:\n",
    "                    print(f\"Model type {MODEL_TYPE} not handled in inference loop.\")\n",
    "                    break\n",
    "                \n",
    "                all_predictions.extend(predictions.cpu().numpy())\n",
    "                all_actuals.extend(y_batch.cpu().numpy())\n",
    "        \n",
    "        all_predictions = np.array(all_predictions).flatten()\n",
    "        all_actuals = np.array(all_actuals).flatten()\n",
    "\n",
    "        # 5. Inverse Transform (if scalers were used)\n",
    "        predictions_final = all_predictions.copy()\n",
    "        actuals_final = all_actuals.copy()\n",
    "\n",
    "        if scaler_target:\n",
    "            print(\"\\nInverse transforming predictions and actuals using target scaler...\")\n",
    "            # Scaler expects 2D array [n_samples, n_features=1]\n",
    "            predictions_final = scaler_target.inverse_transform(predictions_final.reshape(-1, 1)).flatten()\n",
    "            actuals_final = scaler_target.inverse_transform(actuals_final.reshape(-1, 1)).flatten()\n",
    "            # Note: If features were also scaled and influenced the target scale (e.g. diffs),\n",
    "            # this simple inverse transform might not be enough. This assumes target was scaled independently.\n",
    "\n",
    "        # 6. Calculate Metrics\n",
    "        print(\"\\nCalculating metrics...\")\n",
    "        mae = mean_absolute_error(actuals_final, predictions_final)\n",
    "        mse = mean_squared_error(actuals_final, predictions_final)\n",
    "        rmse = np.sqrt(mse)\n",
    "        r2 = r2_score(actuals_final, predictions_final)\n",
    "        mape = calculate_mape(actuals_final, predictions_final)\n",
    "\n",
    "        print(f\"\\n--- Test Metrics ---\")\n",
    "        print(f\"MAE:  {mae:.4f}\")\n",
    "        print(f\"MSE:  {mse:.4f}\")\n",
    "        print(f\"RMSE: {rmse:.4f}\")\n",
    "        print(f\"R²:   {r2:.4f}\")\n",
    "        print(f\"MAPE: {mape:.2f}%\")\n",
    "\n",
    "        # 7. Save Predictions\n",
    "        print(\"\\nSaving predictions...\")\n",
    "        timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
    "        model_name_part = os.path.splitext(os.path.basename(MODEL_PATH))[0]\n",
    "        predictions_df = pd.DataFrame({\n",
    "            'Actual': actuals_final,\n",
    "            'Predicted': predictions_final\n",
    "        })\n",
    "        predictions_filename = os.path.join(OUTPUT_DIR, f\"predictions_{model_name_part}_{timestamp}.csv\")\n",
    "        try:\n",
    "            predictions_df.to_csv(predictions_filename, index=False)\n",
    "            print(f\"Predictions saved to {predictions_filename}\")\n",
    "        except Exception as e:\n",
    "            print(f\"Error saving predictions: {e}\")\n",
    "\n",
    "        # 8. Plot Results (optional)\n",
    "        print(\"\\nPlotting results...\")\n",
    "        plt.figure(figsize=(12, 6))\n",
    "        plt.plot(actuals_final, label='Actual Values', color='blue', alpha=0.7)\n",
    "        plt.plot(predictions_final, label='Predicted Values', color='red', linestyle='--', alpha=0.7)\n",
    "        plt.title(f'Actual vs. Predicted Values ({model_name_part})')\n",
    "        plt.xlabel('Sample Index')\n",
    "        plt.ylabel('Target Value')\n",
    "        plt.legend()\n",
    "        plt.grid(True)\n",
    "        plot_filename = os.path.join(OUTPUT_DIR, f\"plot_{model_name_part}_{timestamp}.png\")\n",
    "        try:\n",
    "            plt.savefig(plot_filename)\n",
    "            print(f\"Plot saved to {plot_filename}\")\n",
    "        except Exception as e:\n",
    "            print(f\"Error saving plot: {e}\")\n",
    "        plt.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.x.x"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}