<a href="https://colab.research.google.com/github/Lakshita2002/FER_brain_and_cognitive_society/blob/master/Emotion_Recognition(BCS).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
%matplotlib inline
import cv2
import os
import math
from mlxtend.image import extract_face_landmarks
import gc
#Essential Keras Functions
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Input,Dense,Conv2D,MaxPooling2D,Dropout
from keras.layers import BatchNormalization, Activation, Flatten
from keras.models import Model,load_model
from keras.utils import to_categorical
from keras.optimizers import SGD
from keras import backend as K
#Essential sklearn Functions
from sklearn import model_selection
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.utils import shuffle

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def eye_centers(landmarks):
    '''
    To find the eye centers(mean of the 6 landmark points around the eye)
    '''
    #36-41 are landmark points surrounding right eye
    point1 = (np.mean(landmarks[36:42,0]),np.mean(landmarks[36:42,1]))
    #42-47 are landmark points surrounding left eye
    point2 = (np.mean(landmarks[42:48,0]),np.mean(landmarks[42:48,1]))

    return point1, point2

In [None]:
def find_angle(point1, point2):
    '''
    To find angle in degrees for the given two points
    '''
    # angle in radians
    angle_r = math.atan((point2[1] - point1[1])/(point2[0] - point1[0]))
    # angle in degrees
    angle_d = math.degrees(angle_r)

    return angle_d

In [None]:
def rotate_image(image,angle):
    '''
    Returns a rotated image given the angle(degrees) to be rotated and image
    '''
    rows,cols = image.shape
    #Transformation Matrix(M)
    M = cv2.getRotationMatrix2D(((rows - 1)/2.0,(cols - 1)/2.0),angle,1)
    rot_img = cv2.warpAffine(image, M, (cols,rows))

    return rot_img

In [None]:
def preprocessing(image_data):

    '''
    The preprocessing involves rotation of the image and image cropping
    Arguments :
    image_data -- array of images of shape (m,h,w)
    Returns :
    array of images after preprocessing
    '''
    preprocessed_faces = []
    for img in image_data:
        
        #landmark detection
        #(returns an array of landmarks of shape (68,2))
        landmarks = extract_face_landmarks(img)

        #detect eye cnters
        p1, p2 = eye_centers(landmarks)

        #find angle 
        angle = find_angle(p1, p2)
        
        #rotate image
        rot_img = rotate_image(img, angle)

        #find length 'd'
        p1_new, p2_new = eye_centers(extract_face_landmarks(rot_img))
        d = cv2.norm(np.array(p1_new) - np.array(p2_new))

        #mid point of new eye centers
        d_mid = ((p2_new[0]+p1_new[0])/2.0,(p2_new[1]+p1_new[1])/2.0)

        #point above line joining eye centers
        x_up = d_mid[0]
        y_up = d_mid[1] - (0.6*d)

        #cropping image
        x_start = int(landmarks[0,0])
        x_end = int(landmarks[16,0])
        y_start = int(y_up)
        y_end = int(landmarks[8,1])

        crop_img = img[y_start:y_end,x_start:x_end]

        #resize the cropped image
        face_roi = cv2.resize(crop_img,(48,48))
        
        preprocessed_faces.append(face_roi)

    return np.array(preprocessed_faces)

In [None]:
def normalization(imagedata, mean, std_dev):
    '''
    To apply Histogram equalization and 
    Z-Square Normalization to the preprocessed images
    Arguments :
    imagedata -- array of preprocessed images of shape (m,h,w)
    mean -- mean of imagedata array
    std_dev -- standard deviation of imagedata array
    Returns :
    array of normalized images of shape (m,48,48)
    '''
    normalized_images = []
    for i in range(imagedata.shape[0]):
        #Histogram Equalization
        hist_eqv = cv2.equalizeHist(imagedata[i])

        #Z-Square normalization
        zsq_norm = ((hist_eqv - mean)/std_dev)

        #Resize
        resized_image = cv2.resize(zsq_norm, (48,48))
        normalized_images.append(resized_image)

    return np.array(normalized_images)

In [None]:
def load_and_preprocess_data(dataset,num_classes,jaffe_dir_path,ck_dir_path):
    '''
    Loads the dataset given and also preprocesses it
    '''
    jaffe_data_list = []
    jaffe_labels_list = []
    ck_data_list = []
    ck_labels_list = []

    if dataset == 'jaffe' or dataset == 'combined':
        
        express_code = ['HA','AN','DI','FE','SA','SU','NE']
        for img in os.listdir(jaffe_dir_path):

            label = img[3:5]
            if num_classes == 6 and label == 'NE':
                continue                                              
            read_img = cv2.imread(jaffe_dir_path+img,
                                  cv2.IMREAD_GRAYSCALE)
            jaffe_data_list.append(read_img)
            jaffe_labels_list.append(express_code.index(label))

        jaffe_data = np.array(jaffe_data_list)
        jaffe_labels = np.array(jaffe_labels_list)
        jaffe_preprocessed_data = preprocessing(jaffe_data)
    if dataset == 'ck+' or dataset == 'combined':
        
        express_code = ['happy','anger','disgust','fear','sadness',
                        'surprise','contempt']
        for emcode in os.listdir(ck_dir_path):

            if num_classes == 6 and emcode == 'contempt':
                continue
            lst = os.listdir(ck_dir_path + emcode + '/')
            lst.sort()
            for i in range(2,len(lst),3):
                read_img = cv2.imread(ck_dir_path+emcode+'/'+lst[i],
                                      cv2.IMREAD_GRAYSCALE)
                ck_data_list.append(read_img)
                ck_labels_list.append(express_code.index(emcode))

        ck_data = np.array(ck_data_list)
        ck_labels = np.array(ck_labels_list)
        ck_preprocessed_data = preprocessing(ck_data)

    print('Data loading and Preprocessing is completed')
    if dataset == 'combined':
        X = np.concatenate((jaffe_preprocessed_data,ck_preprocessed_data),axis = 0)
        Y = np.concatenate((jaffe_labels,ck_labels),axis = 0)
        preprocessed_data, labels = shuffle(X,Y)
    elif dataset == 'ck+':
        preprocessed_data, labels = shuffle(ck_preprocessed_data, ck_labels)
    elif dataset == 'jaffe':
        preprocessed_data, labels = shuffle(jaffe_preprocessed_data, jaffe_labels)
    mean, std_dev = preprocessed_data.mean(),preprocessed_data.std()
    normalized_data = normalization(preprocessed_data, mean, std_dev)
    X_tmp = normalized_data.reshape(normalized_data.shape + (1,))
    Y_tmp = to_categorical(labels)
    print(X_tmp.shape)
    print(Y_tmp.shape)
    print('Normalization and Datareshaping is completed')

    return X_tmp,Y_tmp

In [None]:
def data_aug(X_train, y_train, train_batch_size, X_val = None, y_val = None, val_batch_size = None, cv = 'k_fold'):
    '''
    To apply data augmentation on training data.
    Arguments :
    X_train and X_val -- array of training and validation data containing images
    y_train and y_val -- labels of training and validation images
    train_batch_size and val_batch_size contains the batch size of images
    Returns:
    Iterator for training and validation batch 
    '''
    #create image data augmenatation generator for training and validation data
    train_datagen = ImageDataGenerator(rotation_range = 3,
                                       rescale = 1.0,
                                       horizontal_flip = True,
                                       fill_mode = 'nearest')
    #create iterator for training and validation data
    train_batch = train_datagen.flow(X_train, y_train, batch_size = train_batch_size)

    if cv == 'k_fold':
        val_datagen = ImageDataGenerator(rescale=1.0)
        val_batch = val_datagen.flow(X_val, y_val ,batch_size = val_batch_size)
        return (train_batch, val_batch)

    elif cv == 'one_fold':
        return train_batch 

In [None]:
def recog_model(input_shape, num_classes, beta):
    '''
    Create the keras model architecture
    Arguments:
    input_shape -- The dimensions of the input data(h,w,1)
    Returns:
    The created model after compilation
    '''

    # input placeholder as a tensor of input_shape
    X_input = Input(input_shape)

    # define the keras model
    X = Conv2D(48, (5,5), strides = (1,1), padding = 'valid', name = 'conv1')(X_input)
    X = BatchNormalization(axis = 3, name = 'bn1')(X)
    X = Activation('relu')(X)

    X = MaxPooling2D((2,2), strides = (2,2), padding = 'valid', name = 'maxpool1')(X)

    X = Conv2D(64, (5,5), strides = (1,1), padding = 'valid', activation = 'relu', name = 'conv2')(X)
    X = BatchNormalization(axis = 3, name = 'bn2')(X)
    X = Activation('relu')(X)

    X = MaxPooling2D((2,2), strides = (2,2), padding = 'valid', name = 'maxpool2')(X)

    X = Flatten()(X)
    X = Dropout(0.5)(X)
    X = Dense(units = num_classes,activation = 'softmax',name = 'out',kernel_initializer = 'glorot_normal')(X)

    model = Model(inputs = X_input, outputs = X, name = 'recog_model')
    sgd = SGD(learning_rate = 0.001, momentum = beta)

    #compile the keras model
    model.compile(optimizer = sgd, loss = 'categorical_crossentropy', metrics = ['accuracy'])

    return model

In [None]:
def k_fold_cv(X_tmp, Y_tmp, dataset, num_classes, model_dir, graph_dir,
              n_splits, batch_size, iterations, beta):
    '''
    
    Applies K fold cross validation for given Training data
    Saves the header file(.h5) for each fold
    Returns the Mean and standard deviation of accuracies
    '''
  
    i = 1
    model_accuracies = []
    for train_index, val_index in KFold(n_splits = n_splits, shuffle = True, random_state = 0).split(X_tmp, Y_tmp):
      
        X_train, X_val = X_tmp[train_index], X_tmp[val_index]
        X_train = X_train.astype('float32')
        X_val = X_val.astype('float32')
        Y_train, Y_val = Y_tmp[train_index], Y_tmp[val_index]
        
        print('Model evaluation : ' + str(i))
        
        #get the data generators from augmentation function
        train_batch, val_batch = data_aug(X_train, Y_train, batch_size, X_val, Y_val, batch_size)
        #create a model object
        Recog_Model = recog_model(X_tmp.shape[1:], num_classes, beta)
        if dataset == 'ck+' or dataset == 'combined':
            constant = 3
        elif dataset == 'jaffe':
            constant = 5
        #train the model
        history = Recog_Model.fit(train_batch, validation_data = val_batch, epochs = iterations,
                                  steps_per_epoch = constant * (X_train.shape[0]//batch_size),
                                  validation_steps = constant * (X_val.shape[0]//batch_size), verbose = 1)
        
        #plotting validation loss and training loss vs epochs
        loss_train = history.history['loss']
        loss_val = history.history['val_loss']
        epochs = range(iterations)
        plt.plot(epochs, loss_train, 'g', label = 'training_loss')
        plt.plot(epochs, loss_val, 'b', label = 'val_loss')
        plt.title('Loss Vs Epochs')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.savefig(graph_dir + f'{dataset}{num_classes}classes({i} of {n_splits}fold)losses.png')
        plt.clf()

        #plotting validation accuracy and training accuracy vs epochs
        accuracy_train = history.history['accuracy']
        accuracy_val = history.history['val_accuracy']
        plt.plot(epochs, accuracy_train, 'g', label = 'training_accuracy')
        plt.plot(epochs, accuracy_val, 'b', label = 'val_accuracy')
        plt.title('Accuracy Vs Epochs')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.savefig(graph_dir + f'{dataset}{num_classes}classes({i} of {n_splits}fold)accuracies.png')
        plt.clf()

        metrics = Recog_Model.evaluate(X_val, Y_val)
        print(f'Accuracy fold {i}: ' + str(metrics[1] * 100))
        model_accuracies.append(metrics[1] * 100)
        Recog_Model.save(model_dir + f'{dataset}{num_classes}classes({i} of {n_splits}fold).h5')
        #Discard the present model
        del Recog_Model

        gc.collect()
        K.clear_session()
        i += 1

    #calculating mean and standard deviation of accuracies
    Mean_Accuracy = np.mean(model_accuracies)
    standard_deviation = np.std(model_accuracies)

    return Mean_Accuracy,standard_deviation

In [None]:
def one_fold_cv(X_tmp, Y_tmp, dataset, num_classes, model_dir, graph_dir,
                batch_size, iterations, beta):
    '''
    Train and saves the model as header file
    Arguments :
    X_tmp -- normalized and reshaped data
    Y_tmp -- labels in one hot encodings
    '''
    #get the data generator from augmentation function
    train_batch = data_aug(X_tmp, Y_tmp, batch_size, cv = 'one_fold')
    #create a model object
    Recog_Model = recog_model(X_tmp.shape[1:], num_classes, beta)
    if dataset == 'ck+' or dataset == 'combined':
        constant = 3
    elif dataset == 'jaffe':
        constant = 5
    #train the model
    history = Recog_Model.fit(train_batch, epochs = iterations, shuffle = True,
                              steps_per_epoch = constant * (X_tmp.shape[0]//batch_size))
    #plotting training loss vs epochs
    loss_train = history.history['loss']
    epochs = range(iterations)
    plt.plot(epochs, loss_train, 'g', label = 'training_loss')
    plt.title('Loss Vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(graph_dir + f'{dataset}{num_classes}classes(1fold)losses.png')
    plt.clf()

    #plotting training accuracy vs epochs
    accuracy_train = history.history['accuracy']
    plt.plot(epochs, accuracy_train, 'g', label = 'training_accuracy')
    plt.title('Accuracy Vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.savefig(graph_dir + f'{dataset}{num_classes}classes(1fold)accuracies.png')
    plt.clf()

    metrics = Recog_Model.evaluate(X_tmp, Y_tmp)
    print('Accuracy one_folded : ' + str(metrics[1] * 100))
    Accuracy = metrics[1] * 100
    Recog_Model.save(model_dir + f'{dataset}{num_classes}classes(1fold).h5')

    return Accuracy

In [None]:
def result_model(dataset, num_classes, model_dir, graph_dir, jaffe_dir_path = None,
                 ck_dir_path = None,cv = 'k_fold', batch_size = 16, iterations = 120,
                 beta = 0.9, n_splits = 10):
    '''
    Returns : the results of trained model with given arguments
    Arguments :
    dataset -- A string representing dataset('ck+' or 'jaffe' or 'combined')
    num_classes -- No. of emotions the model to be trained upon'''
    #model_dir -- path to the directory/folder to save trained model header file
                 #(eg : C:\users\folder\)
    #graph_dir -- path to the directory/folder to save various graphs
    #dataset_dir_path -- path to the saved dataset directory
    #cv -- string representing k-fold or 1-fold cross validation('k_fold' or 'one_fold')
    #beta -- momentum variable
    #n_splits -- representing k in k_fold

    if dataset == 'combined':
        num_classes = 6
    if jaffe_dir_path == None and ck_dir_path == None:
        print('Dataset path is needed!!')
        return None,None
    else:
        X_tmp, Y_tmp = load_and_preprocess_data(dataset, num_classes, jaffe_dir_path, ck_dir_path)
        if cv == 'one_fold':
            Accuracy = one_fold_cv(X_tmp, Y_tmp, dataset, num_classes, model_dir, graph_dir,
                        batch_size, iterations, beta)
            return Accuracy,None
        elif cv == 'k_fold':
            Mean_Accuracy, standard_deviation = k_fold_cv(X_tmp, Y_tmp, dataset, num_classes, model_dir,
                                                          graph_dir,n_splits,batch_size,iterations,beta)
            print('Mean Accuracy : %0.2f' %(Mean_Accuracy))
            print('standard deviation : %0.2f' %(standard_deviation))
            return Mean_Accuracy, standard_deviation
            

In [None]:
Mean_Accuracy, standard_deviation = result_model('combined',num_classes = 6, model_dir = 'drive/My Drive/ER project/',
                                                 graph_dir = 'drive/My Drive/ER project/',cv = 'one_fold',
                                                 jaffe_dir_path = 'drive/My Drive/Dataset Images/Jaffe Images/',
                                                 ck_dir_path = 'drive/My Drive/Dataset Images/CK+48/')