In [1]:
# Import necessary libraries
#import tensorflow as tf
import os.path
import numpy as np
from keras.utils import to_categorical
from keras import Sequential
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, Dense, Flatten, Dropout
from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing import image
from keras.optimizers import Adam
from keras.models import model_from_yaml
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
%matplotlib inline

Using TensorFlow backend.


In [2]:
def get_data(data_file_path):
    '''
        Purpose: Function that loads the data from the given path
        Arguements: Path to source
        Return: raw data containing images and labels
    '''
    with open(data_file_path) as fp:
        data = fp.readlines()
    print("Labels for the dataset are: {}".format(data[0].split(",")))
    data = np.array(data[1:])
    print("Number of images read: {}".format(data.size))
    
    return data

In [3]:
def generate_data_split(data, num_of_classes=7):
    '''
        Purpose: Function that takes the given dataset and creates Train and Test sets
        Arguements: Raw data and number of classes (Categorical label generation)
        Return: Training and Test set - inputs and labels
    '''
    X_train = []
    Y_train = []
    X_test = []
    Y_test = []
    for point in data:
        try:
            emotion, image, instance_type = point.split(",")
            image_pixels = np.array(image.split(" "),'float32')
            # One hot encoding
            emotion = to_categorical(emotion, num_of_classes)
            if(instance_type.strip() == 'Training'):
                X_train.append(image_pixels)
                Y_train.append(emotion)
            else:
                X_test.append(image_pixels)
                Y_test.append(emotion)
        except Exception as e:
            print("ERROR - {}".format(e))
            
    X_train = np.array(X_train,'float32')
    Y_train = np.array(Y_train,'float32')
    X_test = np.array(X_test,'float32')
    Y_test = np.array(Y_test,'float32')
    return (X_train, Y_train, X_test, Y_test)

In [4]:
def preprocessing(X_train, X_test):
    
    '''
        Purpose: Function that performs pre-processing on train and test data
        Arguements: Train and Test input sets
        Return: Processed Train and Test sets
    '''
    
    # Normalize pixel values
    X_train = X_train / 255
    X_test = X_test / 255
    
    X_train = X_train.reshape(X_train.shape[0], 48, 48, 1).astype('float32')
    X_test = X_test.reshape(X_test.shape[0], 48, 48, 1).astype('float32')
    
    return (X_train,X_test)

In [None]:
def generate_model(X_train, Y_train, force=False):
    if(force or not os.path.exists('model_cnn.yaml')):
        model = Sequential()

        model.add(Conv2D(filters=64 , kernel_size=(5,5), strides=(1, 1), padding='valid', dilation_rate=(1, 1), activation='relu', use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None, input_shape=(48, 48, 1)))
        model.add(MaxPooling2D(pool_size=(5, 5), strides=(2,2), padding='valid', data_format=None))

        model.add(Conv2D(filters=64 , kernel_size=(3,3), strides=(1, 1), padding='valid', dilation_rate=(1, 1), activation='relu', use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None, input_shape=(48, 48, 1)))
        model.add(Conv2D(filters=64 , kernel_size=(3,3), strides=(1, 1), padding='valid', dilation_rate=(1, 1), activation='relu', use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros', kernel_regularizer=None, bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, bias_constraint=None, input_shape=(48, 48, 1)))
        model.add(AveragePooling2D(pool_size=(3,3), strides=(2, 2)))

        model.add(Conv2D(128, (3, 3), activation='relu'))
        model.add(Conv2D(128, (3, 3), activation='relu'))
        model.add(AveragePooling2D(pool_size=(3,3), strides=(2, 2)))

        model.add(Flatten())

        model.add(Dense(1024, activation='relu'))
        model.add(Dropout(0.2))
        model.add(Dense(1024, activation='relu'))
        model.add(Dropout(0.2))

        model.add(Dense(7, activation='softmax'))

        # Fit the model with the training data
        model = fit_model(model, X_train, Y_train)
    else:
        model = load_model()
    return model
    

In [None]:
def fit_model(model,X_train, Y_train, batch_size=128 ,epochs=10, loss_function='categorical_crossentropy'):
    data_generator = ImageDataGenerator(featurewise_center=False, samplewise_center=False, featurewise_std_normalization=False, samplewise_std_normalization=False, zca_whitening=False, zca_epsilon=1e-06, rotation_range=0, width_shift_range=0.0, height_shift_range=0.0, brightness_range=None, shear_range=0.0, zoom_range=0.0, channel_shift_range=0.0, fill_mode='nearest', cval=0.0, horizontal_flip=False, vertical_flip=False, rescale=None, preprocessing_function=None, data_format=None, validation_split=0.0, dtype=None)
    training_data_generator = data_generator.flow(X_train, Y_train, batch_size=batch_size)
    model.compile(loss=loss_function, optimizer=Adam(), metrics=['accuracy'])
    model.fit_generator(training_data_generator, steps_per_epoch=batch_size, epochs=epochs)
    return model

In [None]:
def evaluate_model(model, X_train, Y_train, X_test, Y_test):
    
    train_score = model.evluate(X_train, Y_train, verbose=0)
    test_score = model.evluate(X_test, Y_test, verbose=0)
    
    print("Train loss: {}".format(train_score[0]))
    print("Test loss: {}".format(test_score[0]))
    
    print("Train accuracy: {}".format(training_score[1]))
    print("Test accuracy: {}".format(test_score[1]))
    

In [None]:
def show_image(image_path):
    try:
        plt.figure()
        image = mpimg.imread(image_path)
        #imgplot = plt.imshow(image)
        #plt.show()
    except:
        print("ERROR - Unable to read and plot the provided image")
        exit()

In [None]:
def emotion_display(predicted_label):
    emotion_classes = ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']
    y_ticks = np.arange(len(emotion_classes))
    plt.bar(y_pos, emotions, align='center', alpha=0.5)
    plt.xticks(y_ticks, emotion_classes)
    plt.ylabel('percentage')
    plt.title('emotion')
    plt.show()

In [None]:
def prediction(model, image_path):
    img = image.load_img(image_path, color_mode='grayscale', target_size=(48, 48))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis = 0)
    x /= 255
    model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])
    predicted_label = model.predict(x)
    print("Prediction: {}".format(predicted_label))
    # Draw image provided
    show_image(image_path)
    # Draw emotion pie chart    

In [None]:
def load_model():
    with open("model_cnn.yaml", "r") as yaml_file:
        loaded_model_yaml = yaml_file.read()
    model = model_from_yaml(loaded_model_yaml)
    model.load_weights("model_cnn.h5")
    print("Loaded model from disk")
    return (model)

In [None]:
def save_model(model):
    model_yaml = model.to_yaml()
    with open("model_cnn.yaml", "w") as yaml_file:
        yaml_file.write(model_yaml)
    # serialize weights to HDF5
    model.save_weights("model_cnn.h5")
    print("Saved model to disk.")

In [None]:
def generate_confusion_matrix(predictions, Y_test):
    predicted_list = []; actual_list = []
    for i in predictions:
        pred_list.append(np.argmax(i))
    for i in Y_test:
        actual_list.append(np.argmax(i))
    conf_matrix = confusion_matrix(actual_list, predicted_list)
    plt.figure()
    plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion matrix')
    plt.colorbar()
    classes = ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    thresh = conf_matrix.max() / 2.
    for i, j in itertools.product(range(conf_matrix.shape[0]), range(conf_matrix.shape[1])):
        plt.text(j, i, format(cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [None]:
def main():
        
    # Pre-defined variables
    data_path = '/Users/Iris/SJSU/Fall_2018/CMPE_257/Project/Group/local/data/fer2013.csv'
    
    
    # Obtain the data from the path provided
    data = get_data(data_path)
    
    # Generate training and test sets from the data
    X_train, Y_train, X_test, Y_test = generate_data_split(data)
    
    # Pre-process the image data
    X_train, X_test = preprocessing(X_train,X_test)
    
    # Generate or load trained model 
    #model = generate_model(X_train, Y_train)
    model = generate_model([],[])
    
    generate_confusion_matrix(model,predictions, Y_test)
    # Evaluate model
    #evaluate_model(model, X_train, Y_train, X_test, Y_test)
    
    # Save model to disk
    #save_model(model)
    
    # Predict for a given sample
    #sample_image_path = 'sample.png'
    #prediction(model, sample_image_path)

In [None]:
if __name__ == "__main__":
    main()

Loaded model from disk
