In [1]:
import keras
import os
import pandas as pd, numpy as np
import cv2, imutils
from keras.models import Sequential
from keras.layers import Dense, Dropout
from tensorflow.keras.utils import to_categorical
from hyperopt.pyll.base import scope 
from hyperopt import hp, fmin, tpe, STATUS_OK, Trials
from keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import confusion_matrix
from sklearn import preprocessing
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import shutil

In [2]:
cwd = os.getcwd()
pre_csv = os.path.abspath(os.path.join(os.sep, cwd, '..', 'neural_network', 'augmentation_data.csv'))
df = pd.read_csv(pre_csv)

In [3]:
def getFullImagePath(full_path):
    sub_image_path = full_path
    split_sub_image_path = sub_image_path.split("/")
    image_path = ""
    if(len(split_sub_image_path) == 2):
        image_path = os.path.abspath(os.path.join(os.sep, cwd, "..", "neural_network", "augmentation", split_sub_image_path[0], split_sub_image_path[1]))
    else:
        image_path = os.path.abspath(os.path.join(os.sep, cwd, "..", "neural_network", "augmentation", sub_image_path))
    return image_path

In [4]:
count = [0 for x in range(8)]
raw_images =  []
labels = []
for i, row in df.iterrows():
    image_path = getFullImagePath(row.image)

    image = cv2.imread(image_path)
    image = cv2.resize(image, (112, 112), interpolation = cv2.INTER_AREA)
    pixels = image.flatten()
    raw_images.append(pixels)
    
    label = row.emotion
    labels.append(label)
    
    count[row.emotion] += 1
    if i > 0 and i % 10000 == 0: print('[INFO] processed {}/{}'.format(i, len(df)))
    
print(count)

[INFO] processed 10000/77205


In [None]:
space = {
    'rate'       : hp.uniform('rate', 0.01, 0.5),
    'dropout'    : hp.uniform('dropout', 0.01, 0.5),
    'units1'      : scope.int(hp.quniform('units1', 10, 100, 5)),
    'units2'      : scope.int(hp.quniform('units2', 10, 100, 5)),
    'units3'      : scope.int(hp.quniform('units3', 10, 100, 5)),
    'units4'      : scope.int(hp.quniform('units4', 10, 100, 5)),
    'batch_size' : scope.int(hp.quniform('batch_size', 100, 250, 25)),
    'layers'     : scope.int(hp.quniform('layers', 3, 4, 1)),
    'optimizer'  : hp.choice('optimizer', ['adam', 'adadelta', 'sgd', 'RMSprop']),
    'epochs'     : scope.int(hp.quniform('epochs', 100, 300, 10)),
    'activation' : hp.choice('activation', ['relu']), # ['relu', 'sigmoid', 'tanh', 'elu']
}

In [None]:
def f_nn(params):
    print("params", params)

    # Keras LSTM model
    model = Sequential()

    # params['layers'] => layer of model (do not count input layer)

    if params['layers'] == 1:
        model.add(Dense(params['units1'], activation=params['activation'], input_shape=(X_train.shape[1],)))
        model.add(keras.layers.BatchNormalization())
        #model.add(Dropout(rate=params['rate']))
    else:
        # First layer specifies input_shape and returns sequences
        model.add(Dense(params['units1'], activation=params['activation'], input_shape=(X_train.shape[1],)))
        model.add(keras.layers.BatchNormalization())
        #model.add(Dropout(rate=params['rate']))

        # Middle layers return sequences
        for i in range(params['layers']-2):
            model.add(Dense(params['units' + str(i + 2)], activation=params['activation']))
            model.add(keras.layers.BatchNormalization())
            #model.add(Dropout(rate=params['rate']))

        # Last layer doesn't return anything                                            
        model.add(Dense(params['units' + str(params['layers'])], activation=params['activation']))
        model.add(keras.layers.BatchNormalization())
        model.add(Dropout(rate=params['rate']))

    model.add(Dense(8, activation='softmax'))
    model.compile(optimizer=params['optimizer'], loss='mean_squared_error', metrics=['accuracy'])

    es = EarlyStopping(monitor='val_loss',mode='min', verbose=1,patience=15)
    result =  model.fit(X_train, y_train, validation_data=(X_val, y_val,), batch_size=params['batch_size'], epochs=int(X_train.shape[0]/best_params['batch_size']), verbose=0, callbacks=[es])

    # Get the lowest validation loss of the training epochs
    validation_loss = np.amin(result.history['val_loss']) 
    print('Best validation loss of epoch:', validation_loss)

    return {'loss': validation_loss, 
            'status': STATUS_OK, 
            'model': model, 
            'params': params}

In [None]:
X_train, X_test, y_train, y_test = train_test_split(np.array(raw_images), np.array(labels), test_size=0.1, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

In [None]:
le = preprocessing.LabelEncoder()
le.fit(y_train)

y_train = le.transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_val = to_categorical(y_val)

In [None]:
# hyperparameter
trials = Trials()
best = fmin(f_nn, 
            space, 
            algo=tpe.suggest,
            max_evals=50,
            trials=trials)
print(best)

In [None]:
best_model = trials.results[np.argmin([r['loss'] for r in trials.results])]['model']
best_params = trials.results[np.argmin([r['loss'] for r in trials.results])]['params']
worst_model = trials.results[np.argmax([r['loss'] for r in trials.results])]['model']
worst_params = trials.results[np.argmax([r['loss'] for r in trials.results])]['params']

print(best_model)
print(best_params)

In [None]:
def create_model_best_param(params, output):
    model = Sequential()

    if params['layers'] == 1:
        model.add(Dense(params['units1'], activation=params['activation'], input_shape=(X_train.shape[1],)))
        #model.add(Dropout(rate=params['rate']))
    else:
        # First layer specifies input_shape and returns sequences
        model.add(Dense(params['units1'], activation=params['activation_1'], input_shape=(X_train.shape[1],)))
        model.add(keras.layers.BatchNormalization())
        #model.add(Dropout(rate=params['rate']))

        # Middle layers return sequences
        for i in range(params['layers']-2):
            model.add(Dense(params['units' + str(i + 2)], activation=params['activation']))
            model.add(keras.layers.BatchNormalization())
            #model.add(Dropout(rate=params['rate']))

        # Last layer doesn't return anything                                            
        model.add(Dense(params['units' + str(params['layers'])], activation=params['activation']))
        model.add(keras.layers.BatchNormalization())
        model.add(Dropout(rate=params['rate']))

    model.add(Dense(output, activation='softmax'))
    model.compile(optimizer=params['optimizer'], loss='mean_squared_error', metrics=['accuracy'])
    return model

In [None]:
# without augmentation
# best_params = {'activation': 'relu', 'batch_size': 125, 'dropout': 0.022269352793562694, 'epochs': 190, 'layers': 4, 'optimizer': 'adadelta', 'rate': 0.341066733379372, 'units1': 70, 'units2': 55, 'units3': 40, 'units4': 25}

# with augmentation
# best_params = {'activation': 'relu', 'batch_size': 225, 'dropout': 0.43494195542218583, 'epochs': 210, 'layers': 3, 'optimizer': 'adadelta', 'rate': 0.3152384932786021, 'units1': 100, 'units2': 100, 'units3': 45, 'units4': 80}

# flook model 0.6695
best_params = {'activation_1': 'elu','activation': 'relu', 'batch_size': 225, 'dropout': 0.01854633556762575, 'epochs': 300, 'layers': 4, 'optimizer': 'adagrad', 'rate': 0.012361452297880514, 'units1': 100, 'units2': 70, 'units3': 100, 'units4': 100}

model = create_model_best_param(best_params, 8)
print(model.summary())

In [None]:
history = model.fit(X_train, y_train, validation_data=(X_val, y_val,), batch_size=best_params['batch_size'], epochs=int(X_train.shape[0]/best_params['batch_size']), verbose=2)

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and Validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
epochs = range(1, len(acc) + 1)
plt.plot(epochs, acc, 'y', label='Training accuracy')
plt.plot(epochs, val_acc, 'r', label='Validation accuracy')
plt.title('Training and Validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
# test model
dictionary = {0: 'ANGER', 1: 'CONTEMPT', 2: 'DISGUST', 3: 'FEAR', 4: 'HAPPINESS',  5: 'NEUTRAL', 6: 'SADNESS', 7: 'SURPRISE'}
target_name = [dictionary[i] for i in range(len(dictionary))]

value = model.predict(X_test)
y_pred = np.argmax(value, axis=1)
y_true = np.argmax(y_test, axis=1)
print(classification_report(y_true,y_pred, target_names=target_name, digits=4))

print(y_test.shape, y_true.shape)

dictionary = ['ANGER', 'CONTEMPT', 'DISGUST', 'FEAR', 'HAPPINESS',  'NEUTRAL', 'SADNESS', 'SURPRISE']
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=dictionary)
fig, ax = plt.subplots(figsize=(10,10))
disp.plot(ax=ax)
plt.show()

In [None]:
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import RocCurveDisplay
from sklearn.metrics import auc

dictionary = ['ANGER', 'CONTEMPT', 'DISGUST', 'FEAR', 'HAPPINESS',  'NEUTRAL', 'SADNESS', 'SURPRISE']

for idx in range(len(dictionary)):
    print(dictionary[idx])

    emo_feature = np.copy(X_train)
    emo_target = list(label[idx] for label in y_train)
    emo_target = np.array(emo_target)

    X = emo_feature
    y = emo_target

    history = []
    tprs = []
    aucs = []
    mean_fpr = np.linspace(0, 1, 100)
    fig, ax = plt.subplots()

    # plot ROC curves
    cv = StratifiedKFold(n_splits=5, shuffle=True)
    for i, (train, val) in enumerate(cv.split(X, y)):
        X_train_kf, X_val_kf = X[train], X[val]
        y_train_kf, y_val_kf = y[train], y[val]

        le = preprocessing.LabelEncoder()
        y_train_kf = to_categorical(y_train_kf)
        y_val_kf = to_categorical(y_val_kf)

        model = create_model_best_param(best_params, 2)
        model.fit(X_train_kf,
                    y_train_kf,
                    validation_data=(X_val_kf, y_val_kf,),
                    batch_size=best_params['batch_size'],
                    epochs=best_params["epochs"],
                    verbose=2)

        # predict
        y_pred = model.predict(X_val_kf).ravel()
        y_val_kf = y_val_kf.ravel()

        print('====================Fold ', i , '====================')

        # plot ROC curve
        viz = RocCurveDisplay.from_predictions(y_val_kf, y_pred, ax=ax, name="ROC fold {}".format(i), alpha=0.3, lw=1,)
        interp_tpr = np.interp(mean_fpr, viz.fpr, viz.tpr)
        interp_tpr[0] = 0.0
        tprs.append(interp_tpr)
        aucs.append(viz.roc_auc)

    # middle line
    ax.plot([0, 1], [0, 1], 'k--')

    # mean line
    mean_tpr = np.mean(tprs, axis=0)
    mean_tpr[-1] = 1.0
    mean_auc = auc(mean_fpr, mean_tpr)
    std_auc = np.std(aucs)
    ax.plot(
        mean_fpr,
        mean_tpr,
        color="b",
        label=r"Mean ROC (AUC = %0.2f $\pm$ %0.2f)" % (mean_auc, std_auc),
        lw=2,
        alpha=0.8,
    )

    # std
    std_tpr = np.std(tprs, axis=0)
    tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
    tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
    ax.fill_between(
        mean_fpr,
        tprs_lower,
        tprs_upper,
        color="grey",
        alpha=0.2,
        label=r"$\pm$ 1 std. dev.",
    )

    ax.set(xlim=[-0.05, 1.05],
            ylim=[-0.05, 1.05],
            title="Receiver operating characteristic")
    ax.legend(loc="lower right")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.savefig(cwd + '/../graph/' + dictionary[idx] + '/ann_relu.jpg')
    plt.show()

In [None]:
name = 'model_relu_best'
path = cwd + "/model/" + name
if os.path.exists(path):
    shutil.rmtree(path)
os.makedirs(path)

model.save(path)
# model = keras.models.load_model(path)