In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import data as tf_data
from tensorflow import keras
from keras import Model, Input
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.applications import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.layers import Dense, Flatten, Resizing, Dropout, Lambda


def VisualizeLossCurve(h):
    plt.figure()
    plt.plot(h.history["loss"], label="Training", color="red")
    plt.plot(h.history["val_loss"], label="Validation", color="blue")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid()
    plt.show()

# **1. VGG16 + Random Initialization**

In [None]:
# Control Parameters
BATCH_SIZE = 32
EPOCHS = 200
idx_tc = 0  # index of test cases [0, 1]; 0 for baseline and 1 for Proposed

n_iter = 82
train_results = []
test_results = []
for j in range(0, n_iter, 1):
    print("Fold: %2d out of %d" % (j + 1, n_iter), end=', ')
    
    if idx_tc == 0:  # Baseline Performance
        data = np.load(file=os.getcwd() + "/FGNET/npz/fgnet_lopo%02d.npz" % (j + 1))
    elif idx_tc == 1:  # Proposed Data Augmentation
        data = np.load(file=os.getcwd() + "/FGNET/npz/fgnet_lopo_da%02d.npz" % (j + 1))
        
    
    x_train, y_train = data["x_train"].astype("float32"), data["y_train"]
    x_test, y_test = data["x_test"].astype("float32"), data["y_test"]
    
    x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.25, random_state=42)
    
    x_train = x_train.reshape(len(x_train), 48, 48, 1)
    x_val = x_val.reshape(len(x_val), 48, 48, 1)
    x_test = x_test.reshape(len(x_test), 48, 48, 1)
    
    train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    val_ds = tf.data.Dataset.from_tensor_slices((x_val, y_val))
    test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    
    train_ds = train_ds.batch(BATCH_SIZE).prefetch(tf_data.AUTOTUNE).cache()
    val_ds = val_ds.batch(BATCH_SIZE).prefetch(tf_data.AUTOTUNE).cache()
    test_ds = test_ds.batch(BATCH_SIZE).prefetch(tf_data.AUTOTUNE).cache()
    
    # VGG16 Architecture
    inputs = Input(shape=(48, 48, 1))  # Input Layer
    x = Resizing(height=224, width=224)(inputs)
    x = Lambda(tf.image.grayscale_to_rgb, output_shape=(224, 224, 3))(x)
    x = preprocess_input(x)
    
    base_model = VGG16(include_top=False, weights=None, input_shape=(224, 224, 3))
    
    x = base_model(x)
    x = Flatten()(x)
    x = Dense(units=256, activation="relu")(x)
    x = Dropout(rate=0.5)(x)
    outputs = Dense(units=1)(x)  # Output Layer
    
    model = Model(inputs, outputs)
    # model.summary()
    
    model.compile(optimizer=keras.optimizers.Adam(), 
                  loss=keras.losses.MeanSquaredError(), 
                  metrics=["mae"])
    
    modelpath = os.getcwd() + "\\VGG16_Radom.keras"  # Random Initialization
    mc = keras.callbacks.ModelCheckpoint(filepath=modelpath, monitor="val_loss", verbose=0, save_best_only=True)
    es = keras.callbacks.EarlyStopping(monitor="val_loss", patience=5)
    
    history = model.fit(train_ds, epochs=EPOCHS, verbose=0, callbacks=[es], validation_data=val_ds)
    # VisualizeLossCurve(history)
    
    scores = model.evaluate(train_ds, verbose=0)
    train_results.append(scores)
    print("Training Set: MAE=%.4f, MSE=%.4f" % (scores[1], scores[0]), end=' / ')
    
    scores = model.evaluate(test_ds, verbose=0)
    test_results.append(scores)
    print("Test Set: MAE=%.4f, MSE=%.4f" % (scores[1], scores[0]))
    
    y_pred = model.predict(test_ds, verbose=0)


train_mean = np.array(train_results).mean(axis=0)
test_mean = np.array(test_results).mean(axis=0)

print("\nAverage Performance")
print("Training Set: MAE=%.4f, MSE=%.4f" % (train_mean[1], train_mean[0]))
print("Test Set: MAE=%.4f, MSE=%.4f" % (test_mean[1], test_mean[0]))

# **2. VGG16 + Pre-training on ImageNet**

In [None]:
# Control Parameters
BATCH_SIZE = 32
EPOCHS = 200
idx_tc = 0  # index of test cases [0, 1]; 0 for baseline and 1 for Proposed

n_iter = 82
train_results = []
test_results = []
for j in range(0, n_iter, 1):
    print("Fold: %2d out of %d" % (j + 1, n_iter), end=', ')
    
    if idx_tc == 0:  # Baseline Performance
        data = np.load(file=os.getcwd() + "/FGNET/npz/fgnet_lopo%02d.npz" % (j + 1))
    elif idx_tc == 1:  # Proposed Data Augmentation
        data = np.load(file=os.getcwd() + "/FGNET/npz/fgnet_lopo_da%02d.npz" % (j + 1))

    
    x_train, y_train = data["x_train"].astype("float32"), data["y_train"]
    x_test, y_test = data["x_test"].astype("float32"), data["y_test"]
    
    x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.25, random_state=42)
    
    x_train = x_train.reshape(len(x_train), 48, 48, 1)
    x_val = x_val.reshape(len(x_val), 48, 48, 1)
    x_test = x_test.reshape(len(x_test), 48, 48, 1)
    
    train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    val_ds = tf.data.Dataset.from_tensor_slices((x_val, y_val))
    test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    
    train_ds = train_ds.batch(BATCH_SIZE).prefetch(tf_data.AUTOTUNE).cache()
    val_ds = val_ds.batch(BATCH_SIZE).prefetch(tf_data.AUTOTUNE).cache()
    test_ds = test_ds.batch(BATCH_SIZE).prefetch(tf_data.AUTOTUNE).cache()
    
    # VGG16 Architecture
    inputs = Input(shape=(48, 48, 1))  # Input Layer
    x = Resizing(height=224, width=224)(inputs)
    x = Lambda(tf.image.grayscale_to_rgb, output_shape=(224, 224, 3))(x)
    x = preprocess_input(x)
    
    base_model = VGG16(include_top=False, weights="imagenet", input_shape=(224, 224, 3))
    base_model.trainable = False
    
    x = base_model(x, training=False)
    x = Flatten()(x)
    x = Dense(units=256, activation="relu")(x)
    x = Dropout(rate=0.5)(x)
    outputs = Dense(units=1)(x)  # Output Layer
    
    model = Model(inputs, outputs)
    # model.summary()
    
    model.compile(optimizer=keras.optimizers.Adam(), 
                  loss=keras.losses.MeanSquaredError(), 
                  metrics=["mae"])
    
    modelpath = os.getcwd() + "\\VGG16_ImageNet.keras"  # Pre-training on ImageNet
    mc = keras.callbacks.ModelCheckpoint(filepath=modelpath, monitor="val_loss", verbose=0, save_best_only=True)
    es = keras.callbacks.EarlyStopping(monitor="val_loss", patience=5)
    
    history = model.fit(train_ds, epochs=EPOCHS, verbose=0, callbacks=[es], validation_data=val_ds)
    # VisualizeLossCurve(history)
    
    scores = model.evaluate(train_ds, verbose=0)
    train_results.append(scores)
    print("Training Set: MAE=%.4f, MSE=%.4f" % (scores[1], scores[0]), end=' / ')
    
    scores = model.evaluate(test_ds, verbose=0)
    test_results.append(scores)
    print("Test Set: MAE=%.4f, MSE=%.4f" % (scores[1], scores[0]))
    
    y_pred = model.predict(test_ds, verbose=0)


train_mean = np.array(train_results).mean(axis=0)
test_mean = np.array(test_results).mean(axis=0)

print("\nAverage Performance")
print("Training Set: MAE=%.4f, MSE=%.4f" % (train_mean[1], train_mean[0]))
print("Test Set: MAE=%.4f, MSE=%.4f" % (test_mean[1], test_mean[0]))