In [1]:
import warnings
warnings.filterwarnings('ignore')
import itertools

import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
import random
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.inception_v3 import preprocess_input
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
import cv2

from keras.applications import inception_v3
from keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_v3 import preprocess_input as inception_v3_preprocessor

from keras.layers import Dense, GlobalAveragePooling2D, Activation, Dropout, Flatten, Conv2D, MaxPooling2D, AveragePooling2D, Input
from keras.models import Model, Sequential

from keras.optimizers import Adam
from keras.metrics import categorical_crossentropy

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix, accuracy_score

from tqdm import tqdm

from os import makedirs
from os.path import expanduser, exists, join

import glob

Using TensorFlow backend.


In [None]:
# generate a csv file with a list of image paths, picking 20 random images for each video
start = time.time()

labels = pd.read_csv('../csv_files/labelsfolds2019.csv')

videos = []
paths = []
categories = []
numbers = []
fold_num = []

i = 1
# get a list of all the directories which contain images for a certain year
for folder in glob.glob("../299_verified_2019/*"):

    video = folder
    video = video.replace("../299_verified_2019/", "")
    video_path = video + ".mp4"
    
    image_list = []
    
    # go through every each for each video directory
    for image in glob.glob(f"{folder}/*jpg"):
        image_list.append(image)
    
    # get 20 random images from each directory
    if len(image_list) <= 20:
        image_list_short = image_list
    else:
        image_list_short = random.sample(image_list, 20)

#     for image in glob.glob(f"{folder}/*.jpg"):

    # save the image path, the label, and the fold number
    for image in image_list_short:
        
        file = labels[labels['Filename'] == video_path]
        fold = file['TestFold']
        fold = list(fold)[0]
        
        image = image.replace("\\", "/")
        
        category = file['Label']
        category = list(category)[0]
        
        
        videos.append(video)
        paths.append(image)
        categories.append(category)
        fold_num.append(fold)
        
        if category == 'Positive':
            numbers.append(2)
        elif category == 'Neutral':
            numbers.append(1)
        else:
            numbers.append(0)
        

    current_time = time.time()
    if i % 100 == 0:
        duration = current_time - start
        print(f"{i} videos processed")
        print(f"Time elapsed: {duration}")
        time_per_video = duration/i
        remaining_videos = 1320 - i
        estimated_time_remaining = time_per_video*remaining_videos
        print(f"Estimated time remaining: {estimated_time_remaining}")

    i += 1
    
dictionary = {"Video" : videos, "Path": paths, "Category": categories, "Category_Numbers" : numbers, "Fold" : fold_num}

data = pd.DataFrame.from_dict(dictionary)

data.to_csv("../csv_files/299_2019_paths_folds_20.csv", index = False)

print("Video length: ", len(videos))
print("Path length: ", len(paths))
print("Category Length: ", len(categories))

In [2]:
# functions to visualize the results of the model
# confusion matrix functions taken from the programming 1 assignment (neural network)
def getConfusionMatrix(YTrue, YPredict):
    """
    Computes the confusion matrix.
    Parameters
    ----------
    YTrue : numpy array
        This array contains the ground truth.
    YPredict : numpy array
        This array contains the predictions.
    Returns
    CM : numpy matrix
        The confusion matrix.
    """
    
    dim = int(np.max(YTrue)) + 1

    CM = np.zeros((dim, dim))
    for t, p in zip(YTrue, YPredict):
        CM[p][int(t)] += 1
        
    return CM

    
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=0)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else '.2f'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('Predicted label')
    plt.xlabel('True label')
    
# plotting history of model taken from this source: https://stackoverflow.com/questions/41908379/keras-plot-training-validation-and-test-set-accuracy
def plot_history(history):
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()
    
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()

In [3]:
# load the image paths with the relevant label and fold information
data = pd.read_csv("../csv_files/299_2019_paths_folds_20.csv")

In [None]:
learning_rates = [.0001]

fold_numbers = list(range(0,5))

image_size = (299, 299)
batch_size = 32

class_weights = {0: 2, 1 : 1, 2 : 1.8}

all_preds = []
all_history = []
for lr in learning_rates:
    for fold_num in fold_numbers:

        start = time.time()

        # separate the data into training and test set based on folds
        train_fold = data[data['Fold'] != fold_num]
        train_fold = train_fold.copy()

        test_fold = data[data['Fold'] == fold_num]
        test_fold = test_fold.copy()

        # generators taken from: https://www.kaggle.com/careyai/inceptionv3-full-pretrained-model-instructions
        datagen = ImageDataGenerator(rescale=1. / 255., validation_split=0.25,)

        # Train generator
        train_generator = datagen.flow_from_dataframe(
            dataframe=train_fold,
            x_col="Path",
            y_col="Category",
            subset="training",
            batch_size=batch_size,
            seed=8,
            shuffle=True,
            class_mode="categorical",
            target_size=image_size)

        print('Class indices: ', train_generator.class_indices)
        # Validation generator
        val_generator = datagen.flow_from_dataframe(
            dataframe=train_fold,
            x_col="Path",
            y_col="Category",
            subset="validation",
            batch_size=batch_size,
            seed=8,
            shuffle=True,
            class_mode="categorical",
            target_size=image_size)
        print('Class indices: ', val_generator.class_indices)

        # Test generator
        test_datagen = ImageDataGenerator(rescale=1. / 255.)
        test_generator = test_datagen.flow_from_dataframe(
            dataframe=test_fold,
            x_col="Path",
            y_col='Category',
            class_mode="categorical",
            batch_size=batch_size,
            seed=8,
            shuffle=False,
            target_size=image_size)


        # model taken from the following source:
        # https://sefiks.com/2018/01/01/facial-expression-recognition-with-keras/
        model = Sequential()

        #1st convolution layer
        model.add(Conv2D(64, (5, 5), activation='relu', input_shape=(299,299,3)))
        model.add(MaxPooling2D(pool_size=(5,5), strides=(2, 2)))

        #2nd convolution layer
        model.add(Conv2D(64, (3, 3), activation='relu'))
        model.add(Conv2D(64, (3, 3), activation='relu'))
        model.add(AveragePooling2D(pool_size=(3,3), strides=(2, 2)))

        #3rd convolution layer
        model.add(Conv2D(128, (3, 3), activation='relu'))
        model.add(Conv2D(128, (3, 3), activation='relu'))
        model.add(AveragePooling2D(pool_size=(3,3), strides=(2, 2)))

        model.add(Flatten())

        #fully connected neural networks
        model.add(Dense(512, activation='relu'))
        model.add(Dropout(0.2))
        model.add(Dense(512, activation='relu'))
        model.add(Dropout(0.2))

        model.add(Dense(3, activation='softmax'))

        # compile the model
        model.compile(Adam(lr=lr),
                      loss='categorical_crossentropy',
                      metrics=['accuracy'],
                     )
        # print the model summary
        model.summary()
        
        # fit the model
        history = model.fit(train_generator,
                    steps_per_epoch = 100,
#                             validation_data = test_generator,
#                             validation_steps = 20,
                            epochs = 10,
                            verbose = 1,
                            workers = 8,
                            use_multiprocessing = True)

        plt.plot(history.history['accuracy'])
        plt.title('model accuracy')
        plt.ylabel('accuracy')
        plt.xlabel('epoch')
        plt.legend(['train'], loc='upper left')
        plt.show()

        plt.plot(history.history['loss'])
        plt.title('model loss')
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.legend(['train'], loc='upper left')
        plt.show()

        all_history.append(history)

        # run the model on the test fold
        preds = model.predict(test_generator,
                             workers = 8,
                             use_multiprocessing = True)
        
        # get the largest probability value for each entry
        pred_array = []
        for pred in preds:
            guess = np.argmax(pred)
            pred_array.append(guess)

        all_preds.append(pred_array)

        # generate and plot the confusion matrix
        CM = getConfusionMatrix(test_fold['Category_Numbers'], pred_array)
        plt.figure()
        plot_confusion_matrix(CM, classes=['0', '1', '2'], title='Confusion matrix')
        plt.show()

        end = time.time()
        duration = end - start
        print(f"Fold #{fold_num} took {duration} seconds to complete")
        print("Learning rate: ", lr)