<a href="https://colab.research.google.com/github/atick-faisal/Crowd-Emotion/blob/main/src_v4/CE__Transfer_Learning_v4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
import os
import time
import json
import joblib
import datetime
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf

from sklearn.utils import shuffle
from sklearn.metrics import classification_report, confusion_matrix

tf.random.set_seed(42)

tf.__version__

'2.4.1'

In [21]:
CONFIG = {
    'timestamp'             : str(datetime.datetime.now()),
    'base_model'            : 'mobile_net_v2',
    'img_shape'             : (288, 288, 3),
    'input_shape'           : (160, 160, 3),
    'spec_config'           : 'LOG_SPEC_W400_H200_HAMM_IMG288',
    'test_fold'             : 'Fold 1',
    'architecture'          : '',
    'batch_size'            : 32,
    'epochs'                : 0,
    'learning_rate'         : 0.0001,
    'monitor'               : 'val_loss',
    'patience'              : 3,
    'class_weight'          : { 0: 0.71, 1:  2.32, 2: 0.86 },
    'training_time'         : 0,
    'testing_time'          : 0,
    'cm_atick'              : '',
    'cr_atick'              : '',
    'cm_valentina'          : '',
    'cr_valentina'          : ''
}

In [22]:
BASE_DIR_AF         = '/content/drive/MyDrive/Research/Crowd Emotion v4/'
BASE_DIR_VAL        = '/content/drive/MyDrive/Research/Crowd Emotion Val/'
DATASET_DIR         = 'Dataset_AF/'

LOG_FILE            = '/content/drive/MyDrive/Research/Crowd Emotion Logs/tl_specgrams.txt'
FOLDS               = ['Fold 1', 'Fold 2', 'Fold 3', 'Fold 4', 'Fold 5']
EMOTIONS            = ['Approval', 'Disapproval', 'Neutral']


In [None]:
path_AF   = os.path.join(BASE_DIR_AF, DATASET_DIR)
X_AF      = joblib.load(path_AF + CONFIG['spec_config'] + '_X.joblib')
y_AF      = joblib.load(path_AF + CONFIG['spec_config'] + '_y.joblib')
f_AF      = joblib.load(path_AF + CONFIG['spec_config'] + '_f.joblib')
c_AF      = joblib.load(path_AF + CONFIG['spec_config'] + '_f.joblib')

path_VAL  = os.path.join(BASE_DIR_VAL, DATASET_DIR)
test_X    = joblib.load(path_VAL + CONFIG['spec_config'] + '_X.joblib')
test_y    = joblib.load(path_VAL + CONFIG['spec_config'] + '_y.joblib')

mask      = (f_AF == FOLDS.index(CONFIG['test_fold']))
train_X   = X_AF[~mask, :]
train_y   = y_AF[~mask, :]
val_X     = X_AF[mask, :]
val_y     = y_AF[mask, :]

del X_AF

In [None]:
class TransferLearning():
    def __init__(self, config):

        self.base_model = config['base_model']
        self.img_size = config['img_shape']
        self.input_shape = config['input_shape']
        self.learning_rate = config['learning_rate']
        self.batch_size = config['batch_size']
        self.epochs = config['epochs']
        self.monitor = config['monitor']
        self.patience = config['patience']
        self.class_weight = config['class_weight']

        if self.base_model == 'mobile_net_v2':
            self.base_model = tf.keras.applications.MobileNetV2(
                input_shape     = self.input_shape,
                include_top     = False,
                weights         = 'imagenet'
            )
            self.base_model.trainable = False
            self.preprocess = tf.keras.applications.mobilenet_v2.preprocess_input
            self.model = None
            self.callbacks = None

    def __init_model(self, num_classes):
        inputs = tf.keras.Input(shape=self.img_shape)
        x = tf.keras.layers.experimental.preprocessing.Resizing(
            height = self.input_shape[0],
            width  = self.input_shape[1]
        )(inputs)
        x = self.preprocess(x)
        x = self.base_model(x, training=False)
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
        x = tf.keras.layers.Dropout(0.2)(x)
        outputs = tf.keras.layers.Dense(num_classes)(x)
        self.model = tf.keras.Model(inputs, outputs)
        self.model.compile(
            optimizer      = tf.keras.optimizers.Adam(lr=self.learning_rate),
            loss           = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics        = ['accuracy']
        )

    def __init_callbacks(self):
        early_stopping = tf.keras.callbacks.EarlyStopping(
            monitor        = self.monitor,
            patience       = self.patience,
            verbose        = 1
        )
        self.callbacks = [
                          early_stopping,
        ]

    def fit(self, train_X, train_y, val_X, val_y):
        num_classes = np.unique(train_y).shape[0]
        self.__init_model(num_classes)
        self.__init_callbacks()
        history = self.model.fit(
            x                 = train_X,
            y                 = train_y,
            batch_size        = self.batch_size,
            epochs            = self.epochs,
            verbose           = 1,
            validation_data   = (val_X, val_y),
            shuffle           = True,
            callbacks         = self.callbacks,
            class_weight      = self.class_weight
        )
    
        return self.model, history 

    def evaluate(self, test_X, test_y):
        prob_model = tf.keras.Sequential([self.model, tf.keras.layers.Softmax()])
        y_pred_hot = prob_model.predict(test_X)
        y_pred = np.argmax(y_pred_hot, axis=1)
        return classification_report(test_y.ravel(), y_pred)

In [None]:
%%time
start_time = time.time()

tl = TransferLearning(CONFIG)
model, history = tl.fit(train_X, train_y, val_X, val_y)

training_time = time.time() - start_time


In [None]:
%%time

# ---------------- Testing on valentina's data -------------------

start_time = time.time()

loss, accuracy = model.evaluate(test_X, test_y, batch_size=CONFIG['batch_size'])

print("Loss: ", loss)
print("Accuracy: ", accuracy)

testing_time = time.time() - start_time

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.show()

In [None]:
# ------------------ Testing on Aticks's data -----------------

y_true = val_y.ravel()
y_pred = np.argmax(model.predict(val_ds), axis=1)

result_atick = classification_report(y_true, y_pred)
print(result_atick)

In [None]:
# confusion_mtx_atick = tf.math.confusion_matrix(y_true, y_pred) 
confusion_mtx_atick = (confusion_matrix(y_true, y_pred, normalize='true') * 100).astype('int')
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx_atick, xticklabels=EMOTIONS, yticklabels=EMOTIONS, 
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
# ------------------ Testing on Valentina's data -----------------

y_true = test_y.ravel()
y_pred = np.argmax(model.predict(test_ds), axis=1)

result_valentina = classification_report(y_true, y_pred)
print(result_valentina)

In [None]:
# confusion_mtx_valentina = tf.math.confusion_matrix(y_true, y_pred) 
confusion_mtx_valentina = (confusion_matrix(y_true, y_pred, normalize='true') * 100).astype('int')
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx_valentina, xticklabels=EMOTIONS, yticklabels=EMOTIONS, 
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
summary = []
model.summary(print_fn=lambda x: summary.append(x))
CONFIG['architecture'] = summary
CONFIG['epochs'] = max(history.epoch)
CONFIG['training_time'] = training_time
CONFIG['testing_time'] = testing_time
CONFIG['cm_atick'] = np.array2string(confusion_mtx_atick)
result_list_atick = result_atick.split('\n')
CONFIG['cr_atick'] = result_list_atick
CONFIG['cm_valentina'] = np.array2string(confusion_mtx_valentina)
result_list_valentina = result_valentina.split('\n')
CONFIG['cr_valentina'] = result_list_valentina

In [None]:
config = json.dumps(CONFIG, indent=4)
print(config)

In [None]:
f = open(LOG_FILE, 'a')
f.write('\n')
f.write(config)
f.write('\n')
f.close()