# LSTM Energy Prediction with Quantile Loss for Uncertainty Quantification (Online Adaptation)

- Uses TensorFlow/Keras for LSTM modeling.
- Implements Quantile Loss to estimate prediction intervals.
- Adapts the structure for potential online prediction within a BMS context (predicting one step ahead based on recent history).

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os
import warnings

warnings.filterwarnings("ignore")
%matplotlib inline

## 1. Setup and Configuration

In [None]:
# Set paths to data files (Ensure these files are in the same directory)
EPW_FILE = "Torino_IT-hour.epw"
SIM_FILE = "eplusout.csv"

# Create output directory for plots
os.makedirs("output_lstm", exist_ok=True)

# Parameters
N_LAG = 24  # Use past 24 hours to predict the next hour
SPLIT_RATIO = 0.8
EPOCHS = 50 # Adjust as needed
BATCH_SIZE = 64
QUANTILES = [0.1, 0.5, 0.9] # For 80% prediction interval (0.1 to 0.9) and median (0.5)

## 2. Data Loading and Preprocessing

In [None]:
print("Loading and preprocessing data...")

# Load EPW weather data
try:
    epw = pd.read_csv(EPW_FILE, skiprows=8, header=None)
    epw.columns = ["Year", "Month", "Day", "Hour", "Minute", "DataSource", "DryBulb", "DewPoint", "RH", "AtmosPressure", 
                   "ExtGlobHorRad", "ExtDirNormRad", "ExtDifHorRad", "GlobalHorRad", "DirectNormRad", 
                   "DiffuseHorRad", "InfraSky", "WindDir", "WindSpd", "TotalSkyCover", "OpaqueSkyCover", 
                   "Visibility", "CeilingHeight", "PresWeatherObs", "PresWeatherCodes", "PrecipWater", "AerosolOptDepth",
                   "SnowDepth", "DaysSinceSnow", "Albedo", "LiquidPrecip", "RainRate", "RainDuration", "SnowRate", "SnowDuration"]
    # Select relevant weather features
    weather_df = epw[["DryBulb", "RH", "ExtDirNormRad", "ExtDifHorRad"]].copy()
    weather_df.columns = ["Temperature", "Humidity", "DirectRad", "DiffuseRad"]
except FileNotFoundError:
    print(f"Error: EPW file not found at {EPW_FILE}")
    # Handle error appropriately, e.g., exit or use dummy data
    exit()

# Load simulation output data
try:
    sim_df = pd.read_csv(SIM_FILE, low_memory=False)
    # Select the same target column as before
    target_col = "BLOCCO1:ZONA3:Zone Total Internal Latent Gain Energy [J](TimeStep)"
    if target_col not in sim_df.columns:
        print(f"Error: Target column \t{target_col}\t not found in {SIM_FILE}")
        # Try finding a similar column or exit
        energy_cols = [c for c in sim_df.columns if "Energy" in c and "Latent" in c]
        if energy_cols:
            target_col = energy_cols[0] # Use the first found column as a fallback
            print(f"Using fallback target column: {target_col}")
        else:
            print("No suitable energy column found. Exiting.")
            exit()

    sim_df_hourly = sim_df[[target_col]].copy()
except FileNotFoundError:
    print(f"Error: Simulation file not found at {SIM_FILE}")
    exit()

# Combine weather and simulation data
weather_df_reset = weather_df.reset_index(drop=True)
sim_df_hourly_reset = sim_df_hourly.reset_index(drop=True)
min_length = min(len(weather_df_reset), len(sim_df_hourly_reset))
weather_df_reset = weather_df_reset.iloc[:min_length]
sim_df_hourly_reset = sim_df_hourly_reset.iloc[:min_length]
full_df = pd.concat([weather_df_reset, sim_df_hourly_reset], axis=1).dropna()

print(f"Combined dataset shape: {full_df.shape}")
print("Selected features: [Temperature, Humidity, DirectRad, DiffuseRad]")
print(f"Selected target: {target_col}")
print(full_df.head())

## 3. Feature Scaling and Sequence Creation

In [None]:
# Extract features and target
features = full_df[["Temperature", "Humidity", "DirectRad", "DiffuseRad"]].values
target = full_df[target_col].values

# Normalize data
scaler_X = MinMaxScaler()
scaler_Y = MinMaxScaler()
X_scaled = scaler_X.fit_transform(features)
Y_scaled = scaler_Y.fit_transform(target.reshape(-1, 1))

# Create time sequences
X_seq, Y_seq = [], []
for i in range(N_LAG, len(X_scaled)):
    X_seq.append(X_scaled[i - N_LAG:i])
    Y_seq.append(Y_scaled[i])

X_seq = np.array(X_seq)
Y_seq = np.array(Y_seq)

print(f"Shape of sequence data: X={X_seq.shape}, Y={Y_seq.shape}")

# Split data into training and testing sets
split_idx = int(SPLIT_RATIO * len(X_seq))
X_train, X_test = X_seq[:split_idx], X_seq[split_idx:]
Y_train, Y_test = Y_seq[:split_idx], Y_seq[split_idx:]

print(f"Training data shape: X={X_train.shape}, Y={Y_train.shape}")
print(f"Testing data shape: X={X_test.shape}, Y={Y_test.shape}")

## 4. LSTM Model with Quantile Loss

In [None]:
# Define Quantile Loss function
def quantile_loss(q, y_true, y_pred):
    e = y_true - y_pred
    return keras.backend.mean(keras.backend.maximum(q * e, (q - 1) * e), axis=-1)

# Build the LSTM model
def build_lstm_quantile_model(n_timesteps, n_features, n_quantiles):
    model = keras.Sequential()
    model.add(layers.LSTM(50, activation="relu", input_shape=(n_timesteps, n_features), return_sequences=True))
    model.add(layers.LSTM(50, activation="relu"))
    model.add(layers.Dense(50, activation="relu"))
    model.add(layers.Dense(n_quantiles)) # Output layer predicts all quantiles
    return model

# Instantiate the model
n_features = X_train.shape[2]
n_quantiles = len(QUANTILES)
lstm_model = build_lstm_quantile_model(N_LAG, n_features, n_quantiles)

# Compile the model with quantile losses for each output
losses = [lambda y_true, y_pred: quantile_loss(q, y_true, y_pred) for q in QUANTILES]
lstm_model.compile(optimizer="adam", loss=losses)

lstm_model.summary()

## 5. Train the LSTM Model

In [None]:
print("Training LSTM model...")

# Since the model predicts all quantiles, but the loss needs to compare against the single true value,
# we need to provide the true value replicated for each quantile output during training.
# However, Keras handles this implicitly when a list of losses is provided for a single output tensor.
# We just need to ensure Y_train is the correct shape (samples, 1).

history = lstm_model.fit(
    X_train,
    Y_train, 
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_split=0.2, # Use part of training data for validation
    verbose=1, # Set to 1 or 2 to see progress
    callbacks=[keras.callbacks.EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)] # Early stopping
)

print("Training complete.")

# Plot training history
plt.figure(figsize=(10, 6))
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.title("LSTM Model Training History")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.savefig("output_lstm/training_history.png")
# plt.show() # Avoid showing plot in non-interactive execution

## 6. Evaluate the Model and Visualize Predictions

In [None]:
print("Evaluating model on test data...")

# Predict quantiles on the test set
Y_pred_quantiles_scaled = lstm_model.predict(X_test)

# Inverse transform the predictions and the true values
Y_pred_quantiles = scaler_Y.inverse_transform(Y_pred_quantiles_scaled)
Y_test_actual = scaler_Y.inverse_transform(Y_test)

# Extract lower, median, and upper predictions
# Assuming QUANTILES = [0.1, 0.5, 0.9]
Y_pred_lower = Y_pred_quantiles[:, 0]
Y_pred_median = Y_pred_quantiles[:, 1]
Y_pred_upper = Y_pred_quantiles[:, 2]

# Calculate metrics using the median prediction
mae = mean_absolute_error(Y_test_actual, Y_pred_median)
rmse = np.sqrt(mean_squared_error(Y_test_actual, Y_pred_median))

print(f"Test MAE (Median Prediction): {mae:.2f}")
print(f"Test RMSE (Median Prediction): {rmse:.2f}")

# Visualize predictions with uncertainty bounds
plt.figure(figsize=(15, 7))
plt.plot(Y_test_actual, label="True Energy", color="black")
plt.plot(Y_pred_median, label="Predicted Median (q=0.5)", color="purple")
plt.fill_between(range(len(Y_test_actual)), Y_pred_lower, Y_pred_upper, color="purple", alpha=0.2, label="80% Prediction Interval (q=0.1 to 0.9)")
plt.title("LSTM Energy Prediction with Uncertainty (Quantile Regression)")
plt.xlabel("Time Step (Hour)")
plt.ylabel("Energy (J)")
plt.legend()
plt.grid(True)
plt.savefig("output_lstm/prediction_vs_actual_quantile.png")
# plt.show() # Avoid showing plot in non-interactive execution

# Visualize first 100 hours for clarity
plt.figure(figsize=(15, 7))
plt.plot(Y_test_actual[:100], label="True Energy", color="black")
plt.plot(Y_pred_median[:100], label="Predicted Median (q=0.5)", color="purple")
plt.fill_between(range(100), Y_pred_lower[:100], Y_pred_upper[:100], color="purple", alpha=0.2, label="80% Prediction Interval (q=0.1 to 0.9)")
plt.title("LSTM Energy Prediction - First 100 Hours")
plt.xlabel("Time Step (Hour)")
plt.ylabel("Energy (J)")
plt.legend()
plt.grid(True)
plt.savefig("output_lstm/prediction_vs_actual_quantile_100h.png")
# plt.show() # Avoid showing plot in non-interactive execution

## 7. Adaptation for Online Prediction (Conceptual)

In [None]:
# Save the trained model and scalers for online use
lstm_model.save("lstm_energy_predictor.keras")
import joblib
joblib.dump(scaler_X, "scaler_X.joblib")
joblib.dump(scaler_Y, "scaler_Y.joblib")

print("Model and scalers saved.")

# --- Function for Online Prediction ---

def predict_next_step(model, scaler_x, scaler_y, recent_data_sequence):
    """
    Predicts the next energy step based on a sequence of recent data.
    
    Args:
        model (keras.Model): The trained LSTM model.
        scaler_x (MinMaxScaler): The fitted scaler for input features.
        scaler_y (MinMaxScaler): The fitted scaler for the target variable.
        recent_data_sequence (np.array): Array of recent feature data 
                                        with shape (N_LAG, n_features).
                                        Data should be in original scale.
                                        Example: [[temp1, hum1, rad1, dif1], ... , [tempN, humN, radN, difN]]
                                        where N = N_LAG.
    
    Returns:
        dict: A dictionary containing the predicted quantiles in original scale.
              e.g., {"lower": float, "median": float, "upper": float}
    """
    if recent_data_sequence.shape != (N_LAG, n_features):
        raise ValueError(f"Input sequence must have shape ({N_LAG}, {n_features})")
    
    # Scale the input sequence
    input_scaled = scaler_x.transform(recent_data_sequence)
    
    # Reshape for LSTM input (1 sample, N_LAG steps, n_features)
    input_reshaped = input_scaled.reshape(1, N_LAG, n_features)
    
    # Predict scaled quantiles
    pred_scaled = model.predict(input_reshaped)[0] # Get the first (only) prediction
    
    # Inverse transform to original scale
    pred_original = scaler_y.inverse_transform(pred_scaled.reshape(1, -1))[0]
    
    # Return as a dictionary (assuming QUANTILES = [0.1, 0.5, 0.9])
    return {
        "lower": pred_original[0],
        "median": pred_original[1],
        "upper": pred_original[2]
    }

# Example usage (conceptual - requires actual recent data)
# recent_data = np.random.rand(N_LAG, n_features) * 50 # Replace with actual data
# prediction = predict_next_step(lstm_model, scaler_X, scaler_Y, recent_data)
# print(f"Example Prediction: {prediction}")