# Model Training
In this notebook, we trained 4 different EfficientNet models on the faces extracted


In [1]:
%matplotlib inline
from IPython.core.display import set_matplotlib_formats
import matplotlib
import matplotlib.pyplot as plt
import os.path
import shutil
import pandas as pd
import numpy as np
import os
import json
import gc
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn import metrics
from tensorflow.keras import backend as k
from tensorflow.keras.callbacks import Callback
import tensorflow as tf
from common.model_utils import build_model, input_size
from common.landmark_model_utils import build_model_landmarks

# import packages and modules for graph plotting
from pandas.plotting import table

# setup output image format (Chrome works best)
set_matplotlib_formats("svg")

class ClearMemory(Callback):
    def on_epoch_end(self, epoch, logs=None):
        gc.collect()
        k.clear_session()

  set_matplotlib_formats("svg")


In [2]:
if tf.test.is_gpu_available():
    print("GPU is available and in use.")
else:
    print("No GPU found, using CPU.")

Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
GPU is available and in use.


2024-01-16 15:48:48.127926: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2 Pro
2024-01-16 15:48:48.127941: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 32.00 GB
2024-01-16 15:48:48.127944: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 10.67 GB
2024-01-16 15:48:48.127977: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:303] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-01-16 15:48:48.127993: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:269] Created TensorFlow device (/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


## Create necessary project folders


In [6]:
# Input list of folder names
models_folder = "Landmark Models"
history_folder = "Landmark History"
result_folder = "Result"
tuning_folder = "Tuning"
data_folder = "Training Landmarks Data"
folder_names = [models_folder, history_folder, result_folder, tuning_folder]

for folder_name in folder_names:
    folder_path = os.path.join(".", folder_name)
    try:
        os.mkdir(folder_path)
        print(f"Folder '{folder_name}' created at {folder_path}")
    except FileExistsError:
        print(f"Folder '{folder_name}' already exists at {folder_path}")

Folder 'Landmark Models' already exists at ./Landmark Models
Folder 'Landmark History' already exists at ./Landmark History
Folder 'Result' already exists at ./Result
Folder 'Tuning' already exists at ./Tuning


In [4]:
# # Split Data
# # ├── Test
# # │   ├── manipulated
# # │   └── original
# # ├── Training
# # │   ├── manipulated
# # │   └── original
# # └── Validation
# #     ├── manipulated
# #     └── original

# # Check if the dataset directory is already present to avoid redundant read and writes
# isExist = os.path.exists("./" + data_folder + "/")

# if not isExist:
#     # Creates the appropriate directory structures for training, validation and test sets.
#     try:
#         shutil.rmtree("./" + data_folder + "/")
#     except:
#         pass  # Split Data didn't exist

#     os.mkdir("./Split Data")
#     cdf = {
#         "Training": 0.7,
#         "Validation": 0.85,
#         "Test": 1,
#     }  # OBS! Has to be increment percentages of 5 to make batch size fit
#     for dir in list(cdf.keys()):
#         os.mkdir("./" + data_folder + "/{}".format(dir))
#         os.mkdir("./" + data_folder + "/{}/manipulated".format(dir))
#         os.mkdir("./" + data_folder + "/{}/original".format(dir))
# else:
#     print("Dataset directory already exists!")

Dataset directory already exists!


## Models we want to train


In [7]:
model_list = [
    "EfficientNetB0",
    "EfficientNetB1",
    "EfficientNetB2",
    "EfficientNetB3",
]

models = {}
landmarks_shape = (68, 2)

for model_type in model_list:
    size = input_size(model_type)
    models[model_type] = build_model_landmarks(model_type= model_type, input_shape=(size, size, 3), landmarks_shape=landmarks_shape)

ValueError: Exception encountered when calling layer "reshape_1" (type Reshape).

total size of new array must be unchanged, input_shape = [62856], output_shape = [62720]

Call arguments received by layer "reshape_1" (type Reshape):
  • inputs=tf.Tensor(shape=(None, 62856), dtype=float32)

### Training the models

In [None]:
from tensorflow.keras.applications import efficientnet as efn

# Specify the path to the train and validation data
trainpath = os.path.join(".", data_folder, "Training")
valpath = os.path.join(".", data_folder, "Validation")

# constant fields
BATCH_SIZE = 16
EPOCHS = 20

models_fit_history = {}
models_result = {}
models_failed = []

# Train the models
for model_type in model_list:
    error_flag = False
    imgsize = input_size(model_type)

    # Check if the model is already trained to avoid redundant processing
    if not os.path.exists(os.path.join(".", models_folder, model_type + ".hdf5")):
        # Load data and preprocess it
        train_datagen = ImageDataGenerator(
            preprocessing_function=efn.preprocess_input
        )

        train_gen = train_datagen.flow_from_directory(
            trainpath,
            target_size=(imgsize, imgsize),
            batch_size=BATCH_SIZE,
            class_mode="categorical",
        )
        val_gen = train_datagen.flow_from_directory(
            valpath,
            target_size=(imgsize, imgsize),
            batch_size=BATCH_SIZE,
            class_mode="categorical",
        )

        # load model
        model = models[model_type]
        model.summary()

        # callbacks for early stopping and saving the best model so far based on validation loss for each epoch
        cb_early_stopper = EarlyStopping(monitor="val_loss", patience=5)
        cb_checkpointer = ModelCheckpoint(
            filepath=os.path.join(".", models_folder, model_type + ".hdf5"),
            monitor="val_loss",
            save_best_only=True,
            mode="auto",
        )

        try:
            # train model
            fit_history = model.fit(
                train_gen,
                epochs=EPOCHS,
                validation_data=val_gen,
                callbacks=[
                    cb_checkpointer,
                    cb_early_stopper,
                    ClearMemory(),
                ],  # ClearMemory() is a custom callback to clear memory after each epoch to avoid memory leak
            )

            models_fit_history[model_type] = fit_history
        except Exception as e:
            models_failed.append(model_type)
            error_flag = True
            print("Encountered error while training:", e)

        history_df = pd.DataFrame(models_fit_history[model_type].history)
        os.path.join(".", history_folder, model_type + ".hdf5")
        history_df.to_csv(
            os.path.join(".", history_folder, model_type + "_history.csv"), index=False
        )

    # Load the model from disk if it is already trained
    else:
        imgsize = input_size(model_type)
        model = build_model(model_type, (imgsize, imgsize, 3))
        model_path = os.path.join(".", models_folder, model_type + ".hdf5")
        model.load_weights(model_path)
        print("Loaded model {} from disk".format(model_type))

    # Skip testing if there was an error in training the model
    if error_flag:
        continue

    gc.collect()
    k.clear_session()

if len(models_failed) == 0:
    print("Models that failed training:")
    for fail in models_failed:
        print("\t" + fail)
else:
    print("All models have been initialised!")

In [None]:
# Specify the path to the test data
test_path = os.path.join(".", data_folder, "Test")

# Test the models
for model_type in model_list:
    print("Starting prediction for", model_type)
    test_img_gen = ImageDataGenerator(
        preprocessing_function=efn.preprocess_input
    )

    imgsize = input_size(model_type)

    test_generator = test_img_gen.flow_from_directory(
        directory=test_path,
        target_size=(imgsize, imgsize),
        batch_size=BATCH_SIZE,
        class_mode=None,
        shuffle=False,
        seed=123,
    )

    model = build_model(model_type,(imgsize, imgsize, 3))
    model_path = os.path.join(".", models_folder, model_type + ".hdf5")
    model.load_weights(model_path)
    print("Loaded model {} from disk".format(model_type))

    predicted = model.predict(test_generator, steps=len(test_generator), verbose=1)
    predicted_class_indices = np.argmax(predicted, axis=1)
    y = test_generator.classes

    Accuracy = metrics.accuracy_score(y, predicted_class_indices)
    print("Model Accuracy: ", Accuracy)

    Precision = metrics.precision_score(y, predicted_class_indices, average="binary")
    print("Model Precision: ", Precision)

    Recall = metrics.recall_score(y, predicted_class_indices, average="binary")
    print("Model Recall: ", Recall)

    Auc = metrics.roc_auc_score(y, predicted_class_indices)
    print("AUC: ", Auc, "\n")

    models_result[model_type] = {
        "Accuracy": Accuracy,
        "Precision": Precision,
        "Recall": Recall,
        "AUC": Auc,
    }

    filenames = [os.path.split(i)[1] for i in test_generator.filenames]
    actualLabel = [os.path.split(i)[0] for i in test_generator.filenames]

    for i in range(len(actualLabel)):
        if actualLabel[i] == "manipulated":
            actualLabel[i] = "0"
        else:
            actualLabel[i] = "1"
    results_df = pd.DataFrame(
        {
            "id": pd.Series(filenames),
            "actual label": pd.Series(actualLabel),
            "pred label": pd.Series(predicted_class_indices),
        }
    )
    os.makedirs(result_folder, exist_ok=True)
    results_df.to_csv(os.path.join(".", result_folder, model_type + "_out.csv"))

with open(os.path.join(".", result_folder, "model_results.json"), "w") as outfile:
    json.dump(models_result, outfile)

model_prediction_results = pd.DataFrame.from_dict(models_result, orient="index")
model_prediction_results.to_csv(
    os.path.join(".", result_folder, "prediction_results.csv")
)

In [None]:
# creating a 2 dimensional dataframe out of the given data
results_df = pd.read_csv(os.path.join(".", result_folder, "prediction_results.csv"))
results_df = results_df.rename(columns={"Unnamed: 0": "Model"})


# Set the model names as the index for better labeling on the x-axis
results_df.set_index("Model", inplace=True)

results_df_transposed = results_df.transpose()

# Create a bar plot for the DataFrame
ax = results_df_transposed.plot(kind="bar", figsize=(12, 8))
ax.set_ylabel("Scores")
# ax.set_xlabel('Metrics')
ax.set_title("Model Evaluation Metrics")

# Set the y-axis lower limit to 0.90
ax.set_ylim(0.70, 1.0)
ax.legend(loc="upper right", bbox_to_anchor=(1.25, 1), ncol=1)

# Show the plot
plt.grid()

# Bold the fonts in the graph
matplotlib.rcParams["font.weight"] = "bold"
matplotlib.rcParams["axes.labelweight"] = "bold"
matplotlib.rcParams["axes.titleweight"] = "bold"
ax.tick_params(axis="y", which="major", labelsize=11, width=2, length=6)

# plt.xticks(rotation=45)
ax.set_xticklabels([])
plt.tight_layout()
plt.subplots_adjust(bottom=0.3)
table(ax, results_df.round(4), loc="bottom", cellLoc="center")
plt.show()