<a href="https://colab.research.google.com/github/souhirbenamor/EPF/blob/main/2025_LSTM_Bridging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

LSTM code

In [3]:

pip install matplotlib


Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping

# ----------------------------------------------------------------
# 0) (Optional) Fully Deterministic Seeding
# ----------------------------------------------------------------
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)

# ----------------------------------------------------------------
# 1) Parameters & paths
# ----------------------------------------------------------------
file_path    = 'EPF_2017_2022.xlsx'
output_path  = 'forecast_results_LSTM_ESM.xlsx'
date_col     = 'Date'
features     = [
     'MCP'
    # 'Demand Day-ahead DE',
    #   'Wind and PV Day ahead (MWh/h)',
    #   'Gas',
    #   'Coal',
    #   'CO2'
    ]      # add other feature column names here if desired
target       = 'Price'
n_steps      = 24           # look-back window (hours)
l2_reg       = 1e-3         # L2 regularization coefficient
drop_rate    = 0.2          # dropout rate in LSTM

# ----------------------------------------------------------------
# 2) Load & preprocess with gap filling
# ----------------------------------------------------------------
df = pd.read_excel(file_path)
df[date_col] = pd.to_datetime(df[date_col])
df.set_index(date_col, inplace=True)

# ensure a complete hourly index over the whole span
df = df.sort_index().resample('H').asfreq()

# fill any gaps by time-based interpolation (alternatives: ffill().bfill())
df[features + [target]] = (
    df[features + [target]]
      .interpolate(method='time', limit_direction='both')
)

# drop any remaining NaNs (e.g. at the very start/end)
df = df.dropna(subset=features + [target])

# ----------------------------------------------------------------
# 3) Calendar‐year split: first 4 years → train; last 2 → test
# ----------------------------------------------------------------
start_year = df.index.year.min()
split_date = pd.Timestamp(year=start_year+4, month=1, day=1)
train_df   = df.loc[: split_date - pd.Timedelta(seconds=1)]
test_df    = df.loc[split_date :]

print(f"Train: {train_df.index.min().date()} → {train_df.index.max().date()} ({len(train_df)})")
print(f"Test:  {test_df.index.min().date()} → {test_df.index.max().date()} ({len(test_df)})")

# ----------------------------------------------------------------
# 4) Sequence generator
# ----------------------------------------------------------------
def create_sequences(data, feat_cols, tgt_col, lookback):
    arr = data[feat_cols + [tgt_col]].values
    X, y = [], []
    for i in range(len(arr) - lookback):
        X.append(arr[i : i + lookback, :len(feat_cols)])
        y.append(arr[i + lookback, -1])
    return np.array(X), np.array(y)

# ----------------------------------------------------------------
# 5) Scale & build sequences for initial training
# ----------------------------------------------------------------
scaler_X = StandardScaler().fit(train_df[features])
scaler_y = StandardScaler().fit(train_df[[target]])

X_tr_raw, y_tr_raw = create_sequences(train_df, features, target, n_steps)
X_te_raw, y_te_raw = create_sequences(test_df,  features, target, n_steps)

ns, nf = n_steps, len(features)
X_train = scaler_X.transform(X_tr_raw.reshape(-1, nf)).reshape(-1, ns, nf)
X_test  = scaler_X.transform(X_te_raw.reshape(-1, nf)).reshape(-1, ns, nf)
y_train = scaler_y.transform(y_tr_raw.reshape(-1,1)).flatten()
y_test  = scaler_y.transform(y_te_raw.reshape(-1,1)).flatten()

print("Shapes →",
      "X_train", X_train.shape,
      "y_train", y_train.shape,
      "X_test",  X_test.shape,
      "y_test",  y_test.shape)

# ----------------------------------------------------------------
# 6) Build & train initial LSTM
# ----------------------------------------------------------------
model = Sequential([
    LSTM(
        50,
        activation='relu',
        input_shape=(ns, nf),
        kernel_regularizer=l2(l2_reg),
        dropout=drop_rate,
        recurrent_dropout=drop_rate
    ),
    Dense(1, kernel_regularizer=l2(l2_reg))
])
model.compile(optimizer='adam', loss='mse')
es = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

model.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=50,
    batch_size=32,
    callbacks=[es],
    verbose=1
)

# ----------------------------------------------------------------
# 7) Freeze Dense layer for rolling fine-tuning
# ----------------------------------------------------------------
model.layers[-1].trainable = False
model.compile(optimizer='adam', loss='mse')

# ----------------------------------------------------------------
# 8) Rolling‐window forecasting + fine‐tuning, tracking actual timestamps
# ----------------------------------------------------------------
preds, actuals, dates = [], [], []
test_start = test_df.index.min().normalize()
test_end   = test_df.index.max().normalize()

for day in pd.date_range(test_start, test_end, freq='D'):
    for h in range(24):
        t0 = day + pd.Timedelta(hours=h)
        # get the last n_steps hours of features
        hist = df.loc[t0 - pd.Timedelta(hours=n_steps) : t0 - pd.Timedelta(hours=1), features]
        if len(hist) < n_steps:
            # if there's still any gap, skip
            continue

        # prepare input and predict
        x_in     = scaler_X.transform(hist.values).reshape(1, ns, nf)
        y_s_pred = model.predict(x_in, verbose=0).flatten()[0]
        y_pred   = scaler_y.inverse_transform([[y_s_pred]])[0,0]

        preds.append(y_pred)
        actuals.append(test_df[target].get(t0, np.nan))
        dates.append(t0)

        # fine-tune on the true value if available
        if t0 in test_df.index:
            y_true_s = scaler_y.transform([[test_df.at[t0, target]]]).flatten()
            model.fit(x_in, y_true_s, epochs=1, batch_size=1, verbose=0)

# ----------------------------------------------------------------
# 9) Compile results & save
# ----------------------------------------------------------------
results = pd.DataFrame({
    'Date':     dates,
    'Actual':   actuals,
    'Forecast': preds
})
results.to_excel(output_path, index=False)
print(f"Rolling‐window results saved to {output_path}")

# ----------------------------------------------------------------
# 10) Report any missing hours (should be zero)
# ----------------------------------------------------------------
full_range = pd.date_range(test_df.index.min(), test_df.index.max(), freq='H')
missing = full_range.difference(pd.DatetimeIndex(dates))
print(f"Total test hours  : {len(full_range)}")
print(f"Forecasted hours  : {len(dates)}")
print(f"Missing hours     : {len(missing)}  → {len(missing)/24:.2f} days")
if len(missing):
    print(missing)

# ----------------------------------------------------------------
# 11) Compute metrics & plot
# ----------------------------------------------------------------
valid = ~results['Actual'].isna()
mae  = mean_absolute_error(results.loc[valid, 'Actual'], results.loc[valid, 'Forecast'])
rmse = np.sqrt(mean_squared_error(results.loc[valid, 'Actual'], results.loc[valid, 'Forecast']))
print(f"MAE: {mae:.3f}   RMSE: {rmse:.3f}")

plt.figure(figsize=(12,5))
plt.plot(results['Date'], results['Actual'],   label='Actual')
plt.plot(results['Date'], results['Forecast'], label='Forecast')
plt.title('LSTM Rolling‐Window Forecast')
plt.xlabel('Date')
plt.ylabel(target)
plt.legend()
plt.tight_layout()
plt.show()