In [None]:
import os
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"

# import keras
from keras.models import load_model, Model
from keras.models import Model
from keras.layers import Input, Conv2D, BatchNormalization, Activation, Add, ZeroPadding2D, MaxPooling2D, AveragePooling2D, Dense, Flatten
from keras.initializers import glorot_uniform
from keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint

from PIL import Image
import numpy as np
import cv2 as cv

from timeit import default_timer as timer


# Labels

In [None]:

labels_file = open("dataset/labels.data", "r")
LABELS = {name:idx for idx, name in enumerate(labels_file.read().split('\n'))}

labels_file.close()

REVERSED_LABELS = {_[0]:_[1] for _ in [(value, key) for key, value in LABELS.items()]}


# Dataset loading functions

In [None]:

face_classifier = cv.CascadeClassifier(cv.data.haarcascades + "haarcascade_frontalface_default.xml")

def save_bounding_box(img:Image.Image, path:str, shape:tuple):
    
    img_mat = cv.cvtColor(np.array(img), cv.COLOR_RGB2BGR)
    gray_img = cv.cvtColor(img_mat, cv.COLOR_BGR2GRAY)

    faces = face_classifier.detectMultiScale(gray_img, scaleFactor=1.1, minNeighbors=5, minSize=(40, 40))

    faces_to_return = []
    root_pos = path.rfind('.')
    index = 0
    for (x, y, w, h) in faces:

        to_save = cv.resize(img_mat[y:y+h, x:x+w], shape)

        cv.imwrite(
           path[:root_pos] + str(index) + path[root_pos:],
           to_save
        )
        index += 1

        faces_to_return.append(Image.fromarray(cv.cvtColor(to_save, cv.COLOR_BGR2RGB)))

    return faces_to_return




def load(dir:str, shape:tuple=(224,224)) -> tuple:

    # Loading dataset
    data = []
    labels = []

    dir_content = os.listdir(dir)

    for person_folder in dir_content: # For each person's folder
    
        for _ in os.listdir(os.path.join(dir, person_folder)): # For face image in a person's folder
            
            if '.' in _: # If _ is actually a file
                file = Image.open(os.path.join(dir, person_folder, _))

                # If the loaded image doesn't meet the shape standards (maybe not cropped yet) we do so,
                # save the cropped version before adding to the dataset
                if file.size != shape:
                    # faces is a list consisting of Image objects of all the faces extrated in the current file 
                    faces = save_bounding_box(file, os.path.join(dir, _), shape)

                    # Adding the face(s)
                    for f in faces:

                        data.append(np.asarray(f))
                    
                        # Adding the label
                        for key in LABELS.keys():
                            if key in _:
                                labels.append(LABELS[key])
                                break

                    # Moving the old parent image
                    os.system("mkdir " + os.path.join(dir, "old_images").replace('/', '\\'))
                    # print(('move "' + os.path.join(dir, _) + '" "' + os.path.join(dir, "old_images/")).replace('/', '\\') + '"')
                    os.system(('move "' + os.path.join(dir, _) + '" "' + os.path.join(dir, "old_images/")).replace('/', '\\') + '"')

                else:
                    
                    # Adding the file
                    data.append(np.asarray(file))
                    
                    # Adding the label
                    for key in LABELS.keys():
                        if key.lower() == person_folder.lower() or person_folder.lower() in key.lower():
                            labels.append(LABELS[key])
                            break


    return np.array(data), to_categorical(labels)


def load_data(path:str="./dataset") -> tuple:

    train_data_path = os.path.join(path, "train_data")
    test_data_path = os.path.join(path, "test_data")

    # Loading training dataset
    train_data = load(train_data_path)
    # Loading training dataset
    test_data = load(test_data_path)

    return train_data, test_data




class TimingCallback(Callback):
    
    def __init__(self, logs={}):
        self.logs = []

    def on_epoch_begin(self, epoch, logs={}):
        self.starttime = timer()
    
    def on_epoch_end(self, epoch, logs=None):
        self.logs.append(timer() - self.starttime)


# Model implementation, loading and saving functions

In [None]:

#Implementation of convolution block
def convolutional_block(X, f, filters, stage, block, s):
    
    F1, F2, F3 = filters
    X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    return X



#Implementation of Identity Block

def identity_block(X, f, filters, stage, block):

    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    F1, F2, F3 = filters

    X_shortcut = X

    X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2a', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    # Skip Connection
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X




#Implementation of ResNet-50
def ResNet50(input_shape=(224, 224, 3)):

    X_input = Input(input_shape)

    X = ZeroPadding2D((3, 3))(X_input)

    X = Conv2D(64, (7, 7), strides=(2, 2), name='conv1', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name='bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling2D((3, 3), strides=(2, 2))(X)

    X = convolutional_block(X, f=3, filters=[16, 16, 64], stage=2, block='a', s=1)
    X = identity_block(X, 3, [16, 16, 64], stage=2, block='b')
    X = identity_block(X, 3, [16, 16, 64], stage=2, block='c')


    X = convolutional_block(X, f=3, filters=[32, 32, 128], stage=3, block='a', s=2)
    X = identity_block(X, 3, [32, 32, 128], stage=3, block='b')
    X = identity_block(X, 3, [32, 32, 128], stage=3, block='c')
    X = identity_block(X, 3, [32, 32, 128], stage=3, block='d')

    X = convolutional_block(X, f=3, filters=[64, 64, 256], stage=4, block='a', s=2)
    X = identity_block(X, 3, [64, 64, 256], stage=4, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=4, block='c')
    X = identity_block(X, 3, [64, 64, 256], stage=4, block='d')
    X = identity_block(X, 3, [64, 64, 256], stage=4, block='e')
    X = identity_block(X, 3, [64, 64, 256], stage=4, block='f')

    X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=5, block='a', s=2)
    X = identity_block(X, 3, [128, 128, 512], stage=5, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=5, block='c')

    X = AveragePooling2D(pool_size=(2, 2), padding='same')(X)

    model = Model(inputs=X_input, outputs=X, name='ResNet50')

    return model



    
def create_model() -> Model:

    base_model = ResNet50(input_shape=(224, 224, 3))
    x = base_model.output
    x = Flatten()(x)
    x = Dense(512, activation='relu', name='fc1',kernel_initializer=glorot_uniform(seed=0))(x)
    x = Dense(256, activation='relu', name='fc2',kernel_initializer=glorot_uniform(seed=0))(x)
    x = Dense(3, activation='softmax', name='fc3',kernel_initializer=glorot_uniform(seed=0))(x)

    model = Model(inputs=base_model.input, outputs=x)

    # for layer in base_model.layers:
    #     layer.trainable = False

    return model


def save_model(model:Model):

    model.save("base_model_og.keras")

def load_weights():

    return load_model("base_model_og.keras")


# Training phase

In [None]:

print('\nLoading the model ...')
model = create_model()

print("\nLoading the model's weights ...")
model.set_weights(load_weights())

print('\nLoading the dataset ...')
(X_train, y_train), (X_test, y_test) = load_data()

print('\nCompiling the model ...')
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

print('\nTraining the model ...')
# callback = EarlyStopping(monitor="val_accuracy", patience=1)

filepath = "model-{epoch:.2f}-loss-{loss:.2f}.keras"
checkpoint = ModelCheckpoint(filepath, monitor="loss", verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]

model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=30, shuffle=True, callbacks=[callbacks_list])

print("\nEnd of training.")

print("\nSaving...")
save_model(model)

print('\nModel saved.')

