<a href="https://colab.research.google.com/github/chain28/UNDSP-D-25-00030/blob/main/LSTM_kFold_set.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#README

This notebook provides an example of Python scripts for training an LSTM model, including a k-fold cross-validation approach to ensure thorough assessment of the model’s performance. In this demonstration, ground surface settlement is used as the target variable.

It should be noted that the code combines essential functions for LSTM training with additional procedures for gathering and displaying results. As a result, running the notebook may take some time due to the computational load from cross-validation and visualization tasks.

#Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#Import libraries

In [None]:
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error
from sklearn.model_selection import KFold

import tensorflow as tf

from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import GRU, LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.callbacks import Callback

#Setting

In [None]:
# =====[GENERAL SETTING]=====
TARGET = "SET"      # [PEN or SET]
dlMOD  = "LSTM"     # [LSTM OR GRU]
NOTICE = 0          # [0 or 1]

# =====[FILE PATH AND NAME SETTING]=====
NAME_DATASET = "BL - 25 cases"
NAME_CASE    = "BL - 25 cases"

# =====[TIMESTEP SETTING]=====
if TARGET == "SET":
  PTS, NTS, CTS = 5, 5, 1
elif TARGET == "PEN":
  PTS, NTS, CTS = 0, 0, 1

SEQ = PTS+NTS+CTS # Total timestep in one sequence

# =====[K-FOLD SETTING]=====
num_FOLD = 5 # Amount of fold for using in k-fold cross-validation process

# =====[DEEP LEARNING SETTING]=====
EPOCHS  = 1000
PATIENT = 100
DROPOUT = 0.2
OPTIM   = "adam"
LOSS    = "mse"
METRIC  = ["mse", "mape"]
ATV_FUNC = ["relu", "sigmoid", "linear"]

ARCH_LIST = [[32, 32, 16, 1],               #Architecture No.01
              [64, 64, 32, 1],              #Architecture No.02
              [128, 128, 64, 1],            #Architecture No.03
              [32, 32, 16, 8, 1],           #Architecture No.04
              [64, 64, 32, 16, 1],          #Architecture No.05
              [128, 128, 64, 32, 1],        #Architecture No.06
              [32, 32, 16, 8, 4, 1],        #Architecture No.07
              [64, 64, 32, 16, 8, 1],       #Architecture No.08
              [128, 128, 64, 32, 16, 1],    #Architecture No.09
              ]

# =====[DATASET RATIO FOR BEST MODEL TRAINING]=====
if TARGET == "SET":
  RATIO_num_val = 5 # Approx. 20% of 25 dataset for settlement prediction
elif TARGET == "PEN":
  RATIO_num_val = 54 # Approx. 20% of 268 dataset for penetration rate prediction

# =====[VISUAL SETTING]=====
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

#Part-I: k-fold cross-validation

##Define function

In [None]:
def imp_trainData():
  global NAME_DATASET, TARGET, SEQ
  if TARGET == "SET":
    df_f = pd.read_excel(f"/content/drive/MyDrive/Paper/01_Prediction - Dataset/Feature_{NAME_DATASET} - set.xlsx")
    df_t = pd.read_excel(f"/content/drive/MyDrive/Paper/01_Prediction - Dataset/Target_{NAME_DATASET} - set.xlsx")
  elif TARGET == "PEN":
    df_f = pd.read_excel(f"/content/drive/MyDrive/Paper/01_Prediction - Dataset/Feature_{NAME_DATASET} - pen.xlsx")
    df_t = pd.read_excel(f"/content/drive/MyDrive/Paper/01_Prediction - Dataset/Target_{NAME_DATASET} - pen.xlsx")

  X   = df_f.values

  num_SEQ = X.shape[0]//SEQ

  X_t = X.reshape(((num_SEQ), SEQ, X.shape[1]))
  y_t = df_t.values.squeeze()
  return X_t, y_t, num_SEQ

def model_arch(X_train, ARCH_NO, sel_ARCH):
  global EPOCHS, PATIENT, DROPOUT, OPTIM, LOSS, METRIC, ATV_FUNC, ARCH_LIST, num_SEQ

  num_CELL = sel_ARCH[0]
  num_MLP  = sel_ARCH[1:]

  model = tf.keras.Sequential()
  if dlMOD == "LSTM":
    model.add(LSTM(num_CELL, activation=ATV_FUNC[0], input_shape=(X_train.shape[1], X_train.shape[2])))
  if dlMOD == "GRU":
    model.add(GRU(num_CELL, activation=ATV_FUNC[0], input_shape=(X_train.shape[1], X_train.shape[2])))

  model.add(Dropout(DROPOUT))

  for i, cell in enumerate(num_MLP):
    if cell != 1:
      model.add(Dense(cell, activation=ATV_FUNC[1]))
    elif cell == 1:
      model.add(Dense(1, activation=ATV_FUNC[2]))

  model.compile(optimizer=OPTIM, loss=LOSS, metrics=METRIC)
  model.summary()
  return model

def model_callback(ARCH_NO, fold):
  global dlMOD, TARGET
  path = f"/content/drive/MyDrive/Paper/02_Prediction - Model/{NAME_CASE}/Callback_{dlMOD} - {TARGET} - ARCH {ARCH_NO+1} Fold {fold+1}.keras"
  callback = [EarlyStopping(monitor="val_loss", patience=PATIENT),
              ModelCheckpoint(path, monitor="val_loss", save_best_only=True, mode="min")]
  return callback

def model_fitting(model, callback, X_train, y_train, X_val, y_val):
  global EPOCHS, NOTICE
  history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=EPOCHS, callbacks=callback, verbose=NOTICE)

  plt.plot(history.history["loss"], label="Training Loss", color="b")
  plt.plot(history.history["val_loss"], label="Validation Loss", color="darkorange")
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend()
  plt.show()

  return history

def model_load(ARCH_NO, fold):
  global dlMOD, TARGET, NAME_CASE
  model = load_model(f"/content/drive/MyDrive/Paper/02_Prediction - Model/{NAME_CASE}/Callback_{dlMOD} - {TARGET} - ARCH {ARCH_NO+1} Fold {fold+1}.keras")
  return model

def model_evaluation(model, X_train, y_train, X_val, y_val, ARCH_NO, fold):
  global dlMOD, TARGET, NAME_CASE
  list_df_X = [X_train, X_val]
  list_df_y = [y_train, y_val]
  df_eval = pd.DataFrame()
  # >> Use MSE and MAPE for evaluation in this case
  for i in range(2):
    MSE_loss, MSE_metric, MAPE_metric = model.evaluate(list_df_X[i],list_df_y[i])
    df = pd.DataFrame({"MSE_loss": [MSE_loss], "MSE_metric": [MSE_metric], "MAPE_metric": [MAPE_metric]})
    df_eval = pd.concat([df_eval, df], axis=0)
    df_eval = df_eval.rename(index={"row1": "trainDataset", "row2": "valDataset"})

  df_eval.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Evaluation - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - Fold {fold+1}.xlsx", index=False)

## Run

In [None]:
X_t, y_t, num_SEQ = imp_trainData()

for ARCH_NO, sel_ARCH in enumerate(ARCH_LIST, start=0):
  kf = KFold(n_splits=num_FOLD, shuffle=True, random_state=42)

  df_train, df_val = pd.DataFrame(), pd.DataFrame()
  df_train_loss, df_train_metric, df_val_loss, df_val_metric = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

  for fold, (train_ind, val_ind) in enumerate(kf.split(X_t), start=0):

    # Split dataset
    X_train, X_val = X_t[train_ind], X_t[val_ind]
    y_train, y_val = y_t[train_ind], y_t[val_ind]

    # Index record
    df_train_new = pd.DataFrame(train_ind)
    df_train = pd.concat([df_train, df_train_new], axis=1)
    df_val_new = pd.DataFrame(val_ind)
    df_val = pd.concat([df_val, df_val_new], axis=1)

    # Training
    print(f" Architecture No.0{ARCH_NO+1} - Fold{fold+1}")
    model = model_arch(X_train, ARCH_NO, sel_ARCH)
    callback = model_callback(ARCH_NO, fold)
    history = model_fitting(model, callback, X_train, y_train, X_val, y_val)

    # Loss and metric record
    df_train_loss_new = pd.DataFrame(history.history["loss"])
    df_train_loss = pd.concat([df_train_loss, df_train_loss_new], axis=1)
    df_val_loss_new = pd.DataFrame(history.history["val_loss"])
    df_val_loss = pd.concat([df_val_loss, df_val_loss_new], axis=1)
    df_train_metric_new = pd.DataFrame(history.history["mape"])
    df_train_metric = pd.concat([df_train_metric, df_train_metric_new], axis=1)
    df_val_metric_new = pd.DataFrame(history.history["val_mape"])
    df_val_metric = pd.concat([df_val_metric, df_val_metric_new], axis=1)

    if fold + 1 == num_FOLD:
      df_train.columns = [f"Fold{i+1}" for i in range(num_FOLD)]
      df_val.columns   = [f"Fold{i+1}" for i in range(num_FOLD)]
      df_train.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Index_kFold - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - train.xlsx")
      df_val.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Index_kFold - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - val.xlsx")

      df_train_loss.columns   = [f"Fold{i+1}" for i in range(num_FOLD)]
      df_val_loss.columns     = [f"Fold{i+1}" for i in range(num_FOLD)]
      df_train_metric.columns = [f"Fold{i+1}" for i in range(num_FOLD)]
      df_val_metric.columns   = [f"Fold{i+1}" for i in range(num_FOLD)]

      df_train_loss.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Record - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - train.xlsx")
      df_val_loss.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Record - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - val.xlsx")
      df_train_metric.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Metric Record - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - train.xlsx")
      df_val_metric.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Metric Record - {dlMOD} - {TARGET} - ARCH {ARCH_NO+1} - val.xlsx")

    model_trained = model_load(ARCH_NO, fold)
    model_evaluation(model_trained, X_train, y_train, X_val, y_val, ARCH_NO, fold)

#Part-II: Best model selection

##Define function

In [None]:
def best_eval():
  global NAME_CASE, dlMOD, TARGET
  df_kfold_eval = pd.DataFrame()

  for ARCH_NO, _ in enumerate(ARCH_LIST, start=1):
    df = pd.DataFrame()

    for fold in range(1,(num_FOLD+1)):
      df_error = pd.DataFrame()
      df_new   = pd.read_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Evaluation - {dlMOD} - {TARGET} - ARCH {ARCH_NO} - Fold {fold}.xlsx", index_col=None)
      df_error = pd.DataFrame([[df_new.iat[1,0], df_new.iat[1,-1]]]) # Use validation dataset error to evaluate [MSE, MAPE]
      df       = pd.concat([df, df_error], axis=0)

    avg_MSE       = df.iloc[:,0].mean()
    avg_MAPE      = df.iloc[:,1].mean()
    df_avg        = pd.DataFrame({"Average MSE": [avg_MSE], "Average MAPE":[avg_MAPE]})
    df_kfold_eval = pd.concat([df_kfold_eval, df_avg], axis=0)

  df_kfold_eval   = df_kfold_eval.reset_index(drop=True)
  df_kfold_eval.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/k-fold Evaluation - {dlMOD} - {TARGET} - All ARCH.xlsx")

  min_MSE     = df_kfold_eval.iloc[:,0].min()
  min_MAPE    = df_kfold_eval.iloc[:,1].min()
  minInd_MSE  = df_kfold_eval.iloc[:,0].idxmin()
  minInd_MAPE = df_kfold_eval.iloc[:,1].idxmin()

  print(f"Minimum MSE index: {minInd_MSE} = ARCH No.{minInd_MSE+1} // MSE={min_MSE}")
  print(f"Minimum MAPE index: {minInd_MAPE}  = ARCH No.{minInd_MAPE+1} // MAPE={min_MAPE}")
  return minInd_MSE, minInd_MAPE

def best_model_arch(minInd_MSE):
  global ARCH_LIST
  sel_ARCH = ARCH_LIST[minInd_MSE]
  model = model_arch(X_train, minInd_MSE, sel_ARCH)
  return model

def best_model_callback():
  global dlMOD, TARGET
  path = f"/content/drive/MyDrive/Paper/02_Prediction - Model/{NAME_CASE}/Callback_{dlMOD} - {TARGET} - Best.keras"
  callback = [EarlyStopping(monitor="val_loss", patience=PATIENT),
              ModelCheckpoint(path, monitor="val_loss", save_best_only=True, mode="min")]
  return callback

def best_model_load():
  global dlMOD, TARGET, NAME_CASE
  model = load_model(f"/content/drive/MyDrive/Paper/02_Prediction - Model/{NAME_CASE}/Callback_{dlMOD} - {TARGET} - Best.keras")
  return model

def best_model_evaluation(model, X_train, y_train, X_val, y_val, ARCH_NO, fold):
  global dlMOD, TARGET, NAME_CASE
  list_df_X = [X_train, X_val]
  list_df_y = [y_train, y_val]
  df_eval = pd.DataFrame()
  # >> Use MSE and MAPE for evaluation in this case
  for i in range(2):
    MSE_loss, MSE_metric, MAPE_metric = model.evaluate(list_df_X[i],list_df_y[i])
    df = pd.DataFrame({"MSE_loss": [MSE_loss], "MSE_metric": [MSE_metric], "MAPE_metric": [MAPE_metric]})
    df_eval = pd.concat([df_eval, df], axis=0)
    df_eval = df_eval.rename(index={"row1": "trainDataset", "row2": "valDataset"})

  df_eval.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Evaluation - {dlMOD} - {TARGET} - Best.xlsx", index=False)

##Run

In [None]:
# =====[Import and pre-process dataset]=====
X_t, y_t, num_SEQ = imp_trainData()

randInd = random.sample(range(X_t.shape[0]), RATIO_num_val)
randInd = sorted(randInd)
print(randInd)

X_val = X_t[randInd]
y_val = y_t[randInd]

X_train = np.delete(X_t, randInd, axis=0)
y_train = np.delete(y_t, randInd, axis=0)

df_valInd = pd.DataFrame(randInd)
df_valInd.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Index_validation - {dlMOD} - {TARGET} - Best.xlsx", index=False)

# =====[Find the best model]=====
minInd_MSE, minInd_MAPE = best_eval()

# =====[Training]=====
model = best_model_arch(minInd_MSE)
callback = best_model_callback()
history = model_fitting(model, callback, X_train, y_train, X_val, y_val)

# =====[Loss record]=====
df_train, df_val = pd.DataFrame(), pd.DataFrame()
df_train_loss, df_train_metric, df_val_loss, df_val_metric = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

df_train_loss_new = pd.DataFrame(history.history["loss"])
df_train_loss = pd.concat([df_train_loss, df_train_loss_new], axis=1)
df_val_loss_new = pd.DataFrame(history.history["val_loss"])
df_val_loss = pd.concat([df_val_loss, df_val_loss_new], axis=1)
df_train_metric_new = pd.DataFrame(history.history["mape"])
df_train_metric = pd.concat([df_train_metric, df_train_metric_new], axis=1)
df_val_metric_new = pd.DataFrame(history.history["val_mape"])
df_val_metric = pd.concat([df_val_metric, df_val_metric_new], axis=1)

df_train_loss.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Record - {dlMOD} - {TARGET} - Best - train.xlsx")
df_val_loss.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Loss Record - {dlMOD} - {TARGET} - Best - val.xlsx")
df_train_metric.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Metric Record - {dlMOD} - {TARGET} - Best - train.xlsx")
df_val_metric.to_excel(f"/content/drive/MyDrive/Paper/03_Prediction - Result/{NAME_CASE}/Metric Record - {dlMOD} - {TARGET} - Best - val.xlsx")