In [None]:
## if it's your first run please install the following packages
#!pip install opencv-python
#!pip install matplotlib
#!pip install tensorflow
#!pip install pandas
#!pip install sklearn

## to use the gpu with tensorflow please install
#!pip install tensorflow-gpu

In [None]:
import os
import cv2 
import random
import numpy as np
import matplotlib.pyplot as plt 
import tensorflow as tf 
import pandas as pd
import sklearn
import datetime
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, TensorBoard, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras import optimizers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import mixed_precision
from keras.utils import np_utils
from keras.preprocessing.image import ImageDataGenerator 
from keras.models import Model
from keras.layers import Input, Dense, Flatten, Dropout, BatchNormalization
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from sklearn.metrics import accuracy_score

# to run on the cpu unmark the next line
## os.environ['CUDA_VISIBLE_DEVICES'] = '-1'


## functions

In [None]:
def create_training_data(data_Directory, classes):
    datas_final=[]
    labels_final=[]
    datas_valid=[]
    labels_valid=[]
    for category in classes:
        datas=[]
        labels=[]
        path = os.path.join(data_Directory, category)
        class_num = classes.index(category)
        for img in os.listdir(path):
            try:
                image = cv2.imread(os.path.join(path, img))
                gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                new_array = cv2.resize(gray, (img_size, img_size))
                datas.append(new_array)
                labels.append(class_num)
            except Exception as e:
                pass
        x_train, x_valid, y_train, y_valid = train_test_split(datas, labels, test_size=0.33, random_state=42)
        datas_final.extend(x_train)
        labels_final.extend(y_train)
        datas_valid.extend(x_valid)
        labels_valid.extend(y_valid)    
    return (datas_final, labels_final, datas_valid, labels_valid)

def create_testing_data(data_Directory, classes):
    datas_final=[]
    labels_final=[]
    for category in classes:
        datas=[]
        labels=[]
        path = os.path.join(data_Directory, category)
        class_num = classes.index(category)
        for img in os.listdir(path):
            try:
                image = cv2.imread(os.path.join(path, img))
                gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                new_array = cv2.resize(gray, (img_size, img_size))
                datas.append(new_array)
                labels.append(class_num)
            except Exception as e:
                pass
        datas_final.extend(datas)
        labels_final.extend(labels)
    return (datas_final, labels_final)

def random_list(datas,labels):
    temp = list(zip(datas, labels))
    random.shuffle(temp)
    datas_temp, lables_temp = zip(*temp)
    return(datas_temp, lables_temp)

def create_model(num_classes,input_shape=(48,48,1)):
    # first input model
    visible = Input(shape=input_shape, name='input')

    #the 1-st block
    conv1_1 = Conv2D(64, kernel_size=3, activation='relu', padding='same', name = 'conv1_1')(visible)
    conv1_1 = BatchNormalization()(conv1_1)
    conv1_2 = Conv2D(64, kernel_size=3, activation='relu', padding='same', name = 'conv1_2')(conv1_1)
    conv1_2 = BatchNormalization()(conv1_2)
    pool1_1 = MaxPooling2D(pool_size=(2,2), name = 'pool1_1')(conv1_2)
    drop1_1 = Dropout(0.3, name = 'drop1_1')(pool1_1)

    #the 2-nd block
    conv2_1 = Conv2D(128, kernel_size=3, activation='relu', padding='same', name = 'conv2_1')(drop1_1)
    conv2_1 = BatchNormalization()(conv2_1)
    conv2_2 = Conv2D(128, kernel_size=3, activation='relu', padding='same', name = 'conv2_2')(conv2_1)
    conv2_2 = BatchNormalization()(conv2_2)
    conv2_3 = Conv2D(128, kernel_size=3, activation='relu', padding='same', name = 'conv2_3')(conv2_2)
    conv2_2 = BatchNormalization()(conv2_3)
    pool2_1 = MaxPooling2D(pool_size=(2,2), name = 'pool2_1')(conv2_3)
    drop2_1 = Dropout(0.3, name = 'drop2_1')(pool2_1)

    #the 3-rd block
    conv3_1 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_1')(drop2_1)
    conv3_1 = BatchNormalization()(conv3_1)
    conv3_2 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_2')(conv3_1)
    conv3_2 = BatchNormalization()(conv3_2)
    conv3_3 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_3')(conv3_2)
    conv3_3 = BatchNormalization()(conv3_3)
    conv3_4 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_4')(conv3_3)
    conv3_4 = BatchNormalization()(conv3_4)
    pool3_1 = MaxPooling2D(pool_size=(2,2), name = 'pool3_1')(conv3_4)
    drop3_1 = Dropout(0.3, name = 'drop3_1')(pool3_1)

    #the 4-th block
    conv4_1 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_1')(drop3_1)
    conv4_1 = BatchNormalization()(conv4_1)
    conv4_2 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_2')(conv4_1)
    conv4_2 = BatchNormalization()(conv4_2)
    conv4_3 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_3')(conv4_2)
    conv4_3 = BatchNormalization()(conv4_3)
    conv4_4 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_4')(conv4_3)
    conv4_4 = BatchNormalization()(conv4_4)
    pool4_1 = MaxPooling2D(pool_size=(2,2), name = 'pool4_1')(conv4_4)
    drop4_1 = Dropout(0.3, name = 'drop4_1')(pool4_1)

    #the 5-th block
    conv5_1 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_1')(drop4_1)
    conv5_1 = BatchNormalization()(conv5_1)
    conv5_2 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_2')(conv5_1)
    conv5_2 = BatchNormalization()(conv5_2)
    conv5_3 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_3')(conv5_2)
    conv5_3 = BatchNormalization()(conv5_3)
    conv5_4 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_4')(conv5_3)
    conv5_3 = BatchNormalization()(conv5_3)
    pool5_1 = MaxPooling2D(pool_size=(2,2), name = 'pool5_1')(conv5_4)
    drop5_1 = Dropout(0.3, name = 'drop5_1')(pool5_1)

    #Flatten and output
    flatten = Flatten(name = 'flatten')(drop5_1)
    ouput = Dense(num_classes, activation='softmax', name = 'output')(flatten)

    # create model 
    model = Model(inputs =visible, outputs = ouput)
    # summary layers
    print(model.summary())
    
    return model

def print_prediction_accuracy(model,data_test,labels_test):
    pred2=model.predict(data_test)
    PRED=[]
    for item in pred2:
        value2=np.argmax(item)      
        PRED+=[value2]
    print(classification_report(labels_tests, PRED,labels=labels_names, target_names=target_names))    
    cm = confusion_matrix(labels_tests, PRED,labels=labels_names)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=target_names)
    disp = disp.plot(cmap=plt.cm.Blues,values_format='g')
    plt.show()
    accuracy=accuracy_score(labels_test,PRED)
    print("accuracy of: ",accuracy)

def print_Training_vs_validation_accuracy_graph(history):
    get_acc = history.history['accuracy']
    value_acc = history.history['val_accuracy']
    epochs = range(len(get_acc))
    plt.plot(epochs, get_acc, 'r', label='Accuracy of Training data')
    plt.plot(epochs, value_acc, 'b', label='Accuracy of Validation data')
    plt.title('Training vs validation accuracy')
    plt.legend(loc=0)
    plt.figure()
    plt.show()   

def print_Training_vs_validation_loss_graph(history):
    get_loss = history.history['loss']
    validation_loss = history.history['val_loss']
    epochs = range(len(get_loss))
    plt.plot(epochs, get_loss, 'r', label='Loss of Training data')
    plt.plot(epochs, validation_loss, 'b', label='Loss of Validation data')
    plt.title('Training vs validation loss')
    plt.legend(loc=0)
    plt.figure()
    plt.show()
    

## main run
## config variables 

In [None]:
data_Directory_Positive = "archive/train/positive/"  # positive emotions training dataset
data_Directory_Negative = "archive/train/negative/"  # negative emotions training dataset
data_Directory_All = "archive_all/train/" # all emotions training dataset
data_test_Directory_Positive = "archive/test/positive/"  # positive emotions testing dataset
data_test_Directory_Negative = "archive/test/negative/"  # negative emotions testing dataset
data_test_Directory_All = "archive_all/test/" # all emotions testing dataset
classes_Positive = ["happy", "neutral", "surprise"]  # list of positive emotions
classes_Negative = ["angry", "fear", "sad"]  # list of negative emotions
classes_all = ["angry", "fear","happy", "neutral", "sad","surprise"] # list of all emotions
batch_size = 64
img_size = 48
num_epochs = 100
model_type = 1  ##  0 - create negative model, 1- create positive model, 2 - create all emotions model

## preparations of variables according to model type 

In [None]:
## create negative traning data
if model_type==0:
    print("creates negative dataset and training set")
    data_Directory=data_Directory_Negative
    data_test_Directory=data_test_Directory_Negative
    classes=classes_Negative
    num_classes = 3
    target_names = classes_Negative
    labels_names = [0,1,2]
    model_name= 'modelNegative.h5'
    best_model_name= 'best_negative_model.h5'
    log_name= 'training_negative.log'

if model_type==1:## create postive traning data
    print("creates positive dataset and training set")
    data_Directory=data_Directory_Positive
    data_test_Directory=data_test_Directory_Positive
    classes=classes_Positive
    num_classes = 3
    target_names = classes_Positive
    labels_names = [0,1,2] 
    model_name= 'modelPositive.h5'
    best_model_name= 'best_positive_model.h5'
    log_name= 'training_positive.log'    

if model_type==2:## create all emotions traning data
    print("creates all dataset and training set")
    data_Directory=data_Directory_All
    data_test_Directory=data_test_Directory_All
    classes=classes_all
    num_classes = 6
    target_names = classes_all
    labels_names = [0,1,2,3,4,5]
    model_name= 'modelAll.h5'
    best_model_name= 'best_All_model.h5'
    log_name= 'training_all.log'    

## create training data

In [None]:
datas,labels,datas_valid,labels_valid = create_training_data(data_Directory, classes)
random_list(datas,labels)
random_list(datas_valid,labels_valid)
X_train = np.array(datas, np.float32).reshape(-1,img_size, img_size, 1)
X_val = np.array(datas_valid, np.float32).reshape(-1,img_size, img_size, 1)

## create testing data

In [None]:
datas_tests,labels_tests =create_testing_data(data_test_Directory, classes)
random_list(datas_tests,labels_tests)
X_test = np.array(datas_tests, np.float32).reshape(-1,img_size, img_size, 1)

## convert labels to categorical

In [None]:
y_train = np_utils.to_categorical(labels, num_classes) 
y_val = np_utils.to_categorical(labels_valid, num_classes) 

## config image data genrator

In [None]:
data_gen = ImageDataGenerator( 
    rescale=1./255,
    rotation_range = 10,
    horizontal_flip = True,
    width_shift_range=0.1,
    height_shift_range=0.1,
    fill_mode = 'nearest')

val_gen = ImageDataGenerator( 
    rescale=1./255
    )
data_gen.fit(X_train)
train_flow = data_gen.flow(X_train, y_train, batch_size=batch_size) 
val_flow = val_gen.flow(X_val, y_val, batch_size=batch_size)

## create callbacks and optimizer 

In [None]:
chk_path = best_model_name
csv_logger = CSVLogger(log_name)
log_dir = "checkpoint/logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

checkpoint = ModelCheckpoint(filepath=chk_path,
                             save_best_only=True,
                             verbose=1,
                             mode='min',
                             moniter='val_loss')

callbacks_list = [checkpoint, csv_logger]
adam = optimizers.Adam(learning_rate=0.0001, decay=1e-6 )

## create model and run cnn

In [None]:
model = create_model(num_classes)
model.compile(loss= "categorical_crossentropy", optimizer = adam, metrics = ["accuracy"])
history = model.fit(train_flow, 
                steps_per_epoch=len(X_train) / batch_size, 
                epochs=num_epochs,  
                verbose=1,  
                callbacks=callbacks_list,
                validation_data=val_flow,  
                validation_steps=len(X_val) / batch_size)  
model.save(model_name) #saving the model to be used later

In [None]:
print("Finished createing model")

## display accuracy, confusion matrix and graphs  

In [None]:
print_prediction_accuracy(model,X_test/255.,labels_tests)

In [None]:
print_Training_vs_validation_accuracy_graph(history)

In [None]:
print_Training_vs_validation_loss_graph(history)