In [16]:
# tested on MacBook (Retina, 12-inch, 2017) version 11.2.3 

# pip install tensorflow==2.8.0
# pip install tensorflow==2.8.0
# pip install pandas==1.3.4
# pip install numpy==1.21.4
# pip install re==2.2.1
# pip install PIL==8.4.0
# pip install matplotlib==3.5.0
# pip install sklearn==1.0.1
# pip install json==2.0.9
# pip installscikit-image==0.19.2

In [8]:
import sys
import json
import os
import re

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.compat.v1 import ConfigProto, Session
from tensorflow import keras
from tensorflow.keras import applications
from tensorflow.keras import Input, Model 
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import SpecificityAtSensitivity
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import inspect
from sklearn.preprocessing import OneHotEncoder

from tqdm import tqdm


import warnings
warnings.filterwarnings('ignore')

In [1]:
# for using only CPU

config = ConfigProto(
        device_count = {'GPU': 0}
    )
sess = Session(config=config)

## Data

In [2]:
# Author: ArpiHunanyan
# Created: 29 April,2022, 14:58 PM
# Email: arpi_hunanyan@edu.aua.am



import numpy as np

def getValidation(M = 10954, masked = False):
    path = "../Data/fairface_label_val.csv"
    return getOutput(path, M, masked)

def getTrain(M = 86744, masked = False):
    path = "../Data/fairface_label_train.csv"
    return getOutput(path, M, masked)

def getOutput(path, M, masked):

    if masked :
        file = "file_masked"
    else :
        file = "file"

        
    dataframe = pd.read_csv(path , usecols = [file, "race"], header = 0, nrows = M)

    # for i in np.unique(dataframe.race):
    #     print(i, dataframe.loc[dataframe["race"] == i, file].count())
    #     print(dataframe.loc[dataframe["race"] == i, file].count())

       
    assert len(dataframe.race.unique()) == 7, 'InvalidRaceGroupSize'
    
    # encoding
    encoder = OneHotEncoder(handle_unknown = 'ignore')
    encoded = pd.DataFrame(encoder.fit_transform(dataframe[["race"]]).toarray())
    dataframe = dataframe.join(encoded)
    #clean up
    dataframe.drop("race", axis = 1, inplace = True)
    dataframe.columns = [file, 'Black', 'East Asian', 'Indian', 'Latino_Hispanic', 'Middle Eastern', 'Southeast Asian', 'White'] # alphabetically


    # DataFrameIterator
    #sys.stdout = open(os.devnull, 'w')
    data = ImageDataGenerator().flow_from_dataframe(dataframe = dataframe,  
                                                    directory = '../Data',
                                                    x_col = file, 
                                                    y_col =  [ 'Black', 'East Asian', 'Indian', 'Latino_Hispanic', 'Middle Eastern', 'Southeast Asian', 'White'], 
                                                    batch_size = 16,
                                                    target_size = (224, 224),
                                                    class_mode = 'raw')

    #sys.stdout = sys.__stdout__


    return data



## Model

In [3]:
# Author: ArpiHunanyan
# Created: 16 May, 2022
# Email: arpi_hunanyan@edu.aua.am


class Classifier:



    def __init__(self, input_shape = (224, 224, 3), classes = 7, modelName = None, createModel = True, path = None):
        """Inatializ the CNN model with pre-trained Keras application"""
        
        self.input_shape = input_shape
        self.classes = classes
        self.modelName = modelName

        if (createModel):
            # Instantiate a base model and load pre-trained weights into it.
            self.base_model = eval("applications." + self.modelName)(include_top = False,
                                                                weights = 'imagenet', # pre-training on ImageNet
                                                                input_shape = self.input_shape                                     
                                                                )

            # Freeze all layers in the base model 
            self.base_model.trainable = False

            # Create a new model on top of the output of one (or several) layers from the base model.
            input_tensor = Input(shape = self.input_shape, name = "input_face")
            base_output_tensor = self.base_model(input_tensor, training = False ) #"pre_traind_model"
            pooled_output_tensor = GlobalAveragePooling2D(name = "pooled_output")(base_output_tensor)
            output_tensor = Dense(self.classes,  activation = "softmax", name = "output_class")(pooled_output_tensor)
            self.model = Model(input_tensor, output_tensor, name = self.modelName )
    


        else :
            # Load model in the path
            self.model = load_model(path)
            self.base_model = self.model.layers[1]
            self.base_model.trainable = False


        




    def compile(self, optimizer = Adam(), metrics = ['accuracy',  'Recall', 'Precision',  SpecificityAtSensitivity(sensitivity = 0.5)] ):

        """ Compiles the model with the specific argumantes."""

        self.model.compile(optimizer = optimizer,
                          loss = 'categorical_crossentropy',
                          metrics = metrics
                          )


    def fit(self, x = None, y = None, batch_size = 16, epochs = 1, validation_data = None, callbacks = None):

        """ Train the model with spesifice arguments. Returns the genaratied metrics and loss for each epoch."""

        training = self.model.fit( x = x, 
                                   y = y, 
                                   batch_size = batch_size, 
                                   epochs = epochs, 
                                   validation_data = validation_data ,
                                   callbacks = callbacks
                                )

        return training

    def baseModelParamsCount(self):

        """Returns count of parametrs in pre-traind model."""

        return self.base_model.count_params()

    def baseModelLayersCount(self):

        """Returns count of layers in pre-traind model."""

        return len(self.base_model.layers)


    def setTrainable (self, trainable = False):

        """Sets pre-traind models' layers in training mode if trainable is True. Otherwise infrance mode."""

        self.base_model.trainable = trainable
    

    def unfreazeLastLayers(self, num = 10): 

        """Sets pre-traind models' last layers in training mode with corsponding number.""" 

       # Lower layers refer to general features (problem independent)
       # higher layers refer to specific features (problem dependent)
        if (num == 0):
            return
            
        for layer in self.base_model.layers[len(self.base_model.layers) - num : ]:
            layer.trainable = True



    def summary(self):

        """Returns the summary of the structure of the model.""" 

        return self.model.summary(expand_nested = True,
                                 show_trainable = True)


    def save(self, path = None):

        """ Save the model. """ 

        self.model.save(path)
        print("Model is succesfuly saved in ", path, ".")

    def evaluate(self, data):

        """ Evaluate the data. """

        return self.model.evaluate(data, batch_size = 16, verbose = 1, return_dict = True)
     


def kerasModelNames():

    """ Returns the Names of  all Keras aplication names."""

    return  [m[0] for m in inspect.getmembers(applications, inspect.isfunction)]



## Result Interpretation

In [4]:
# Author: ArpiHunanyan
# Created: 18 May, 2022
# Email: arpi_hunanyan@edu.aua.am


class Selection:

    def __init__(self, path = 'Results/selection/benchmark_df.csv'):
        # reading the data
        self.benchmark_df = pd.read_csv(path)

        # higlite "EfficientNetV2B2", "DenseNet121", "ResNet50", "MobileNetV3Large" models' info
        self.coloring = """ np.where(self.benchmark_df["model_name"] == "EfficientNetV2B2", '#756bb1', 
                                np.where(self.benchmark_df["model_name"] == "DenseNet121", '#4B8BBE', 
                                    np.where(self.benchmark_df["model_name"] == "ResNet50", '#238443',
                                        np.where(self.benchmark_df["model_name"] == "MobileNetV3Large",  '#FFE873',
                                            "#BFC2C5")
                                    )
                                )
                            ) """
        self.modelNames = ["EfficientNetV2B2", "DenseNet121", "ResNet50", "MobileNetV3Large"]



    def table(self, criteria = "accuracy") :
        #prints top 5 models' info sored by the input criteria
        self.benchmark_df.sort_values(["validation_" + criteria ], inplace = True, ascending = False)
        print(self.benchmark_df.head())

    def metricBar (self, criteria = "accuracy"):
        # bar plot for inpute criteria
        col = "self.benchmark_df." + criteria
        name = criteria.capitalize()
        self.benchmark_df.sort_values(by = [criteria], inplace = True, ascending = True)
        self.benchmark_df.reset_index(drop=True, inplace = True)
        self.barH(col = col, name =  name, title = " of Pre-Trained Models: 3 epochs with 5 top training layers")

    
    def parameterBar (self, criteria = "parameters"):
        # num_model_parameters, num_model_layers
         
        col = "self.benchmark_df.num_model_" + criteria
        name = criteria.capitalize()

        self.benchmark_df.sort_values(by = ["num_model_" + criteria], inplace = True, ascending = False)
        self.benchmark_df.reset_index(drop = True, inplace = True)

        self.barH(col = col, name =  name, textVisible = False, title = " of Pre-Trained Models")


    
    def barH(self, col, name, textVisible = True, title = "" ):
        
        
        fig, ax = plt.subplots(figsize =(16, 8))
        bar_plot = ax.barh(self.benchmark_df.model_name, eval(col), height = 0.5, color = eval(self.coloring) )

        if textVisible:
            for idx, rect in enumerate(bar_plot):
                if self.benchmark_df["model_name"][idx] in self.modelNames:
                    ax.text(np.round_(eval(col)[idx], 3) + 0.01,  rect.get_y(), str(int(np.round_(eval(col)[idx], 2) * 100 )) + "%", ha = 'center', rotation = 0, size = 7)

        for s in ['top', 'left', 'right']:
            ax.spines[s].set_visible(False)

            
        ax.xaxis.set_ticks_position('none')
        ax.yaxis.set_ticks_position('none')
        ax.xaxis.set_tick_params(pad = 5)
        ax.yaxis.set_tick_params(pad = 0.5)
        
        # ax.set_xticklabels(["0%", "10%", "20%", "30%", "40%"])



        ax.xaxis.grid(b = True, color = 'grey',
                linestyle = '-.', linewidth = 0.5,
                alpha = 0.4)


        plt.xlabel( name )
        plt.ylabel('Model')
        plt.ylim(ymax = 33.5, ymin = -1)
        plt.title( name + title, pad = 10, size = 15)
    
        plt.show()

    def scaterplot(self, y = "accuracy", x = "params"):

            fig, ax = plt.subplots(figsize =(16, 9))
            ax.scatter(eval("self.benchmark_df.num_model_" + x), eval("self.benchmark_df." + y), color = eval(self.coloring))
            
            
            for i, txt in enumerate(self.benchmark_df["model_name"]):
                if self.benchmark_df["model_name"][i] in self.modelNames:
                    ax.annotate(txt, (eval("self.benchmark_df.num_model_" + x)[i], (eval("self.benchmark_df." + y)[i] + 0.005)), ha = 'center', rotation = 0, size = 6)

            ax.xaxis.set_ticks_position('none')
            ax.yaxis.set_ticks_position('none')
            ax.xaxis.set_tick_params(pad = 5)
            ax.yaxis.set_tick_params(pad = 0.55)


            for s in ['top', 'right']:
                ax.spines[s].set_visible(False)
                ax.grid(b = True, color ='grey',
                        linestyle ='-.', linewidth = 0.3,
                        alpha = 0.5)

            plt.xlabel(x.capitalize(),  size = 10)
            plt.ylabel(y.capitalize(),  size = 10)
            plt.title(y .capitalize() + ' vs Model\'s ' + x.capitalize() + ': 3 epochs with 5 top training layers', pad = 10, size = 15)
            plt.show()




class Execution:
    
    def __init__(self, path = "../Results/5.ModelMobileNetV3Large_cont/Tuning"):

        with open(path, 'r') as results:
            self.results = eval(results.read().splitlines()[0])


    def plotResaluts(self, critaria = "accuracy"): 


        plt.plot(self.results[critaria],  label = 'training')
        plt.plot(self.results["val_" + critaria], label = 'validation') # shows overfitting

        plt.grid(color = 'grey', linewidth = 0.5, linestyle = '-.', alpha = 0.2, b = True)
        plt.xlabel("Epoch",  size = 15 )
        plt.ylabel(critaria.capitalize(), size = 15)

        plt.legend();
        plt.title(label = "Training " + critaria.capitalize()+ " vs Validation " + critaria.capitalize() , pad = 30, size = 30)
        plt.show() 


        
    def lastValues(self, matric ):

        for m  in matric:
            if (self.results.get(m) is not None):
                if ( not (isinstance(self.results.get(m), list) ) ):
                    
                        print(m ," : ",  np.round_(self.results.get(m), 3))
                    
                else:
                    print(m ," : ", np.round_(self.results.get(m)[-1], 3) )


    




    

## Execution

In [None]:
# # Author: ArpiHunanyan
# # Created: 14 May,2022, 9:00 PM
# # Email: arpi_hunanyan@edu.aua.am


# Set up___________________________________________________________________________________________________________________________________________________
print()
print("Demo.py started execution...")

print()
print('Do you want to generate new model?')
print("Options : True, False")
generateNewModel = True if input() == "True" else False

if(generateNewModel):
     print()
     print('Enter modelName')
     print("Options: InceptionV3, InceptionResNetV2, NASNetMobile, Xception, MobileNet, ResNet152V2, MobileNetV2, DenseNet169, DenseNet201, VGG19, ResNet101V2, DenseNet121,ResNet50V2, VGG16, EfficientNetB3, EfficientNetB4, EfficientNetB2, EfficientNetB0, EfficientNetB1, ResNet50, EfficientNetB6, MobileNetV3Small, EfficientNetB5, MobileNetV3Large, EfficientNetB7, ResNet152, ResNet101, EfficientNetV2B1, EfficientNetV2S, EfficientNetV2M, EfficientNetV2B2, EfficientNetV2B0, EfficientNetV2B3, EfficientNetV2L")
     modelName = input()   


else:
     print()
     print('Enter  modelName')
     print("Options:  ResNet50, MobileNetV3Large")
     modelName = input() 


     print('Enter the path of the existic model:')
     print('Enter the path of the existic model:')
     path = ["../Model/3.MobileNetV3Large/Training", 
             "../Model/3.MobileNetV3Large/Tuning", 
             "../Model/4.ResNet50/Training", 
             "../Model/4.ResNet50/Tuning", 
             "../Model/5.MobileNetV3Large_cont/Tuning", 
             "../Model/6.MobileNetV3LargeMasked/Tuning"]

     print("Options:")

     _ = [print(str ) for str in path if modelName in str]
     path_running_model = input()

print()
print('Do you want to inclued masked images ?')
print("Options : True, False")
trainMask = True if input() == "True" else False
valMask = trainMask

print()
print('How many images to inclued in training data set(maximum 86744)?')
trainM = float(input())

print()
print('How many images to inclued in validation data set(maximum 10954)?')
valM = float(input())

print()
print("Training Configuration")

print("How many of the top layes should be in training mode? ")
unfreezeedLayersTraining = int(input())
print("Enter the learning rate ")

learningRateTrainig = float(input())
print("How many epochs?")
epochsTraining = int(input())

print()
print("Fine-Tuning Configuration")

print("Enter the learning rate? ")
learningRateFineTuning  = float(input())

print("How many epochs?")
epochsFineTuning = int(input())
print()

# New path genaration _________________________________________________________________________________________________________________________________

MaskFlag = "Masked" if trainMask else ""

path_model = "../Model/" + "new"+ modelName + MaskFlag
path_results = "../Results/" + "new" + modelName + MaskFlag

if os.path.exists(path_model) :
     print()
     print("Clean ", path_model)
     print()
     sys.exit(0)

if os.path.exists(path_results) :
     print()
     print("Clean ", path_results)
     print()
     sys.exit(0)

os.mkdir(path_model)
os.mkdir(path_results)

# Data Preperation_________________________________________________________________________________________________________________________________________________________
print()
print("Data Preperation...")
print()

tarin_data = getTrain( M = trainM, masked = trainMask )
validation_data  = getValidation( M = valM, masked =  valMask)

# Model generation_________________________________________________________________________________________________________________________________________________________

if ( generateNewModel ):
     model = Classifier( modelName = modelName , createModel = True)
else :    
     model = Classifier( modelName = modelName , createModel = False, path = path_running_model)
 

# ----------------------------------------------------------------------------------: Training

print()
print("Training the top layer/s (for classifaction)  ")
print()

model.unfreazeLastLayers(unfreezeedLayersTraining)  
model.compile(optimizer = keras.optimizers.Adam(learningRateTrainig)) 

print()

trainingFirst = model.fit(tarin_data,   epochs = epochsTraining, validation_data = validation_data)

   
with open(path_results + "/Training", 'w') as convert_file:
     convert_file.write(json.dumps(trainingFirst.history))
print()
print("History is suuccsesfuly saved.")
print()

os.mkdir(path_model + "/Training")
model.save(path_model + "/Training");
# ----------------------------------------------------------------------------------: Tuning


## Fine - Tuning
print()
print("Fine - Tuning ")
print()

model.setTrainable(True)
model.compile(optimizer = keras.optimizers.Adam(learningRateFineTuning)) 

print()

trainingSecond = model.fit(tarin_data , epochs =  epochsFineTuning, validation_data = validation_data)

with open(path_results + "/Tuning", 'w') as convert_file:
     convert_file.write(json.dumps(trainingSecond.history))

print()
print("History is suuccsesfuly saved in " + path_results + "/Tuning")
print()

print()
os.mkdir(path_model + "/Tuning")
model.save(path_model + "/Tuning");
print()



In [None]:
# # Author: ArpiHunanyan
# # Created: 14 May,2022, 9:00 PM
# # Email: arpi_hunanyan@edu.aua.am

# Set up___________________________________________________________________________________________________________________________________________________

print()
print("Demo_Evaluation.py started execution...")
print()


print('Enter  modelName')
print("Options:  ResNet50, MobileNetV3Large")
modelName = input() 


print('Enter the path of the existic model:')
path = [".. Model/3.MobileNetV3Large/Training", 
        "../Model/3.MobileNetV3Large/Tuning", 
        "../Model/4.ResNet50/Training", 
        "../Model/4.ResNet50/Tuning", 
        "../Model/5.MobileNetV3Large_cont/Tuning", 
        "../Model/6.MobileNetV3LargeMasked/Tuning"]

print("Options:")

_ = [print(str ) for str in path if modelName in str]
path_running_model = input()

print()
print('Do you want to inclued masked images ?')
print("Options : True, False")
valMask = True if input() == "True" else False


print()
print('How many images to inclued in validation data set(maximum 10954)?')
valM = float(input())


# New path genaration _________________________________________________________________________________________________________________________________
MaskFlag = "Masked" if valMask else ""
path_results = "../Results/" + "new" + modelName  + MaskFlag + "Evaluation"


if os.path.exists(path_results) :
     print()
     print("Clean ", path_results)
     print()
     sys.exit(0)

os.mkdir(path_results)


# Data Preperation_________________________________________________________________________________________________________________________________________________________
print()
print("Data Preperation...")
print()

validation_data  = getValidation(M = valM, masked = valMask )

# Model generation_________________________________________________________________________________________________________________________________________________________
model = Classifier( modelName = modelName , createModel = False, path = path_running_model)

# ----------------------------------------------------------------------------------: Evaluation

print()
print("Evaluations")
print()


results = model.evaluate(validation_data)



with open(path_results + "/Evaluation", 'w') as convert_file:
     convert_file.write(json.dumps(results))

print()
print("History is suuccsesfuly saved in " + path_results + "/Evaluation")
print()


# demostrating results
exicution = Execution(path = path_results + "/Evaluation")
exicution.lastValues(matric = ['loss', 'accuracy',  'recall', 'precision',  'specificity_at_sensitivity'] )

In [None]:
# Author: ArpiHunanyan
# Created: 18 May, 2022
# Email: arpi_hunanyan@edu.aua.am

# Set up___________________________________________________________________________________________________________________________________________________

print()
print("Demo_Result.py started execution...")
print()

print()
print("Do you want to compare all Keras Application result(True)? Or analize spesific model result(False)?")
print()
isSelection = True if input() == "True" else False

if ( isSelection) :

    selection = Selection()
    print()
    print("Choose: Bar Plot for model's stucure(0), Bar Plot for metrics(1), Scaterplot(3)")
    print()
    plotIndex = input()
    if (plotIndex == "0"):
        print()
        print("Options: parameters, layers")
        print()
        selection.parameterBar( input() )

    elif (plotIndex == "1"):

        print()
        print("Options: accuracy, recall, specificity")
        print()
        selection.metricBar(input())


    elif (plotIndex == "3"):
        print()
        print("Options for x: parameters, layers")
        x = input()
        print()
        print("Options for y: accuracy, recall,  specificity")
        y = input()
        selection.scaterplot(x = x, y = y)

    else:
        print("InputMismuch!")
        sys.exit(0)

else :
    print()
    print('Enter the path of the existing results:')
    path = [ "../Results/1.ResNet121/Training",
             "../Results/1.ResNet121/Tuning",
             "../Results/2.EfficientNetV2B2/Training",
             "../Results/3.MobileNetV3Large/Training", 
             "../Results/3.MobileNetV3Large/Tuning", 
             "../Results/4.ResNet50/Training",
             "../Results/4.ResNet50/Tuning" , 
             "../Results/5.MobileNetV3Large_cont/Tuning",
             "../Results/6.MobileNetV3LargeEvaluationMask/Evaluation",
             "../Results/7.MobileNetV3LargeMasked/Tuning"]

    _ = [print(str) for str in path]

    path = input()
    execution = Execution( path = path )

    validation = ['val_loss', 'val_accuracy',  'val_recall', 'val_precision',  'val_specificity_at_sensitivity'] # for trainig 
    train = ['loss', 'accuracy',  'recall', 'precision',  'specificity_at_sensitivity']  # for trainig and evaluation

    if ("Evaluation" in path ):
        execution.lastValues(matric = train )
        sys.exit(0)

    print()
    print("Choose: Plot(0), Values(1)")
    print()
    output = input()

    if ( output == "0"):
        print()
        print("Options: loss, accuracy")
        print()
        execution.plotResaluts(input())

    elif (output == "1"):
        print()
        print("Options: train, validation")
        print()
        execution.lastValues( matric = eval(input()) )
    
    else:
        print("InputMismuch!")
        sys.exit(0)











## Best Keras PreTrained Model

In [None]:
# Author: ArpiHunanyan
# Created: 1 May,2022, 8:00 PM
# Email: arpi_hunanyan@edu.aua.am




# model names
model_names = kerasModelNames()


# Download the training and validation data
train_data =  getTrain(86744)
validation_data =  getValidation(10954)

# Number of training examples and labels
batch_size = 16
input_shape=(224, 224,3)
num_train = 86744
num_validation = 10954 
num_classes = 7
num_iterations = int(num_train/batch_size)

# Print important info
print(f'Num train images: {num_train} \
        \nNum validation images: {num_validation} \
        \nNum classes: {num_classes} \
        \nNum iterations per epoch: {num_iterations}')






# Loop over each model available in Keras
model_benchmarks = {'model_name': [], 'num_model_params': [], 'validation_accuracy': [] , 'val_recall': [], 'val_specificity_at_sensitivity': []}


num = 1
for model_name in tqdm(model_names):

    #  "NASNetLarge" requires input images with size (331,331)
    if 'NASNetLarge' in model_name:
        continue 

    print("------------------------------------------------")
    print("N: ", num)
    print("Model Name: ", model_name)
    num = num + 1
    print("------------------------------------------------")
        
    clf_model = Classifier( modelName = model_name)
    # unfreazing
    clf_model.unfreazeLastLayers(5);
    clf_model.compile()
    history = clf_model.fit(train_data, epochs = 3, validation_data = validation_data)
    
    # Calculate all relevant metrics
    model_benchmarks['model_name'].append(model_name)
    model_benchmarks['num_model_params'].append(clf_model.baseModelParamsCount())
    model_benchmarks['validation_accuracy'].append(history.history['val_accuracy'][-1])
    model_benchmarks['val_recall'].append(history.history['val_recall'][-1])
    model_benchmarks['val_specificity_at_sensitivity'].append(history.history['val_specificity_at_sensitivity'][-1])



benchmark_df = pd.DataFrame(model_benchmarks)
benchmark_df.sort_values(['validation_accuracy', 'val_recall', 'val_specificity_at_sensitivity'], inplace = True)
benchmark_df.to_csv('benchmark_df.csv', index = False)


