### Import Necessary Packages

In [1]:
import cv2
import glob
import h5py
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import shutil
import time

import skimage as sk
from skimage import util, io

import keras
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import LearningRateScheduler, ModelCheckpoint
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.optimizers import SGD

from collections import Counter, deque
from moviepy.editor import *
from moviepy.video.io.VideoFileClip import VideoFileClip
from mpl_toolkits.axes_grid1 import ImageGrid
from scipy import ndarray
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

### Create a dictionary of the characters names that will be included in the analysis.

This dictionary will be used to map the character names with the associated images.

In [2]:
map_characters = {0: 'abraham_grampa_simpson', 1: 'agnes_skinner', 2: 'apu_nahasapeemapetilon', 3: 'barney_gumble', 
                  4: 'bart_simpson', 5: 'carl_carlson', 6: 'charles_montgomery_burns', 7: 'chief_wiggum', 
                  8: 'cletus_spuckler', 9: 'comic_book_guy', 10: 'edna_krabappel', 11: 'groundskeeper_willie',
                  12: 'homer_simpson', 13: 'kent_brockman', 14: 'krusty_the_clown', 15: 'lenny_leonard', 
                  16: 'lisa_simpson', 17: 'maggie_simpson', 18: 'marge_simpson', 19: 'martin_prince', 20: 'mayor_quimby',
                  21: 'milhouse_van_houten', 22: 'moe_szyslak', 23: 'ned_flanders', 24: 'nelson_muntz', 25: 'otto_mann',
                  26: 'patty_bouvier', 27: 'principal_skinner', 28: 'professor_joun_frink', 29: 'rainier_wolf_castle', 
                  30: 'ralph_wiggum' , 31: 'selma_bouvier', 32: 'sideshow_bob', 33: 'sideshow_mel', 34: 'snake_jailbird', 
                  35: 'waylon_smithers'}

### Create all of the directories that will be used in the analysis as well as any fixed variables that will be used.

In [5]:
# Parent directory path.  This directory is the parent directory for the entire project and likely the working directory 
os.chdir('D:/School_Files/Simpsons_Project')

# Directory to master image file
image_dir = './simpsons_dataset/' 

# Directory to augmented image file
aug_path = './simpsons_augmented'

pic_size = 128 #The size that each image will be modified to
batch_size = 32 #The batch size the images will be fed through the model
epochs = 50 #The number of epochs that will be run
num_classes = len(map_characters) #The number of classes for the analysis (number of characters)
pictures_per_class = 1500 #Number of images for each character
val_size = 0.15 #Size of the validation set
aug_images_number = 2500 #Number of images that will be created after image augmentation

### Create all the required image augmentation functions

We do not have a consistent number of images for each character, and in some instances, there are not enough images of a character to conduct the analysis.  In order to adjust for this, rather than going out to obtain additional images, we will use image augmentation to ensure that both, we have a sufficient number of images, and each image is different.  The next several steps will set up out image augmentation process.

In [6]:
# Random image rotation between -45 anf 45 deg
def random_rotation(image_array: ndarray):
    # pick a random degree of rotation between 25% on the left and 25% on the right
    random_degree = random.randint(-45, 45)
    return sk.transform.rotate(image_array, random_degree)

# Random noise
def random_noise(image_array: ndarray):
    # add random noise to the image
    return sk.util.random_noise(image_array)

# Flip the image on the y axis (horizontally)
def horizontal_flip(image_array: ndarray):
    # horizontal flip doesn't need skimage, it's easy as flipping the image array of pixels !
    return image_array[:, ::-1]

# Randomly crop 1 to 75 pixels from each side of the image
def crop_image(image_array: ndarray):
    rand_1 = random.randint(1,75)
    rand_2 = random.randint(1,75)
    rand_3 = random.randint(1,75)
    rand_4 = random.randint(1,75)
    return sk.util.crop(image_array, ((rand_1, rand_2), (rand_3, rand_4), (0,0)), copy=False)

# Create a dictionary of the transformations we defined above for use in functions
available_transformations = {'rotate': random_rotation,
                             'noise': random_noise,
                             'horizontal_flip': horizontal_flip,
                             'crop': crop_image}

### Create the function that will do the actual image augmentation. 

This function not only conducts the image augmentation, it also ensures each of the augmentations are completely random.  

In [None]:
def image_augmentation(aug_images_number):
    # Check if augmented directory exists
    folder_check = os.path.isdir(aug_path)
        
    if not folder_check:
        os.makedirs(aug_path)
        print("created folder : ", aug_path)
    else:
        print(aug_path, "already exists.")
        
        for k, char in map_characters.items():
            folder_path = os.path.join(aug_path, char)             
            num_files_desired = aug_images_number            
            subfolder_check = os.path.isdir(folder_path)
            
            if not subfolder_check:
                shutil.copytree(os.path.join(image_dir , char), folder_path)
                print("copied folder : ", folder_path)
                
                images = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
        
                num_generated_files = len(os.listdir(os.path.join(image_dir , char)))
                while num_generated_files <= num_files_desired:
                    # random image from the folder
                    image_path = random.choice(images)
                    # read image as an two dimensional array of pixels
                    image_to_transform = sk.io.imread(image_path)
                    # random num of transformation to apply
                    num_transformations_to_apply = random.randint(1, len(available_transformations))
                
                    num_transformations = 0
                    transformed_image = None
                    while num_transformations <= num_transformations_to_apply:
                        # random transformation to apply for a single image
                        key = random.choice(list(available_transformations))
                        transformed_image = available_transformations[key](image_to_transform)
                        num_transformations += 1
                
                        new_file_path = '%s/pic_%s.jpg' % (folder_path, num_generated_files)
                
                        # write image to the disk
                        io.imsave(new_file_path, transformed_image)
                    num_generated_files += 1    
                    
            elif len(glob.glob(aug_path + '/%s/*' % char)) < aug_images_number:
                
                print('adding ', (aug_images_number - len(glob.glob(aug_path + '/%s/*' % char))), ' images to ', folder_path)
                num_generated_files = len(glob.glob(aug_path + '/%s/*' % char))
                images = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
                
                while num_generated_files <= num_files_desired:
                    # random image from the folder
                    image_path = random.choice(images)
                    # read image as an two dimensional array of pixels
                    image_to_transform = sk.io.imread(image_path)
                    # random num of transformation to apply
                    num_transformations_to_apply = random.randint(1, len(available_transformations))
                
                    num_transformations = 0
                    transformed_image = None
                    while num_transformations <= num_transformations_to_apply:
                        # random transformation to apply for a single image
                        key = random.choice(list(available_transformations))
                        transformed_image = available_transformations[key](image_to_transform)
                        num_transformations += 1
                
                        new_file_path = '%s/pic_%s.jpg' % (folder_path, num_generated_files)
                
                        # write image to the disk
                        io.imsave(new_file_path, transformed_image)
                    num_generated_files += 1
            else:
                print(folder_path, "already exists.")

In [None]:
def load_pictures(BGR):
    """
    Load pictures from folders for characters from the map_characters dict and create a numpy dataset and 
    a numpy labels set. Pictures are re-sized into picture_size square.
    :param BGR: boolean to use true color for the picture (RGB instead of BGR for plt)
    :return: dataset, labels set
    """
    pics = []
    labels = []
    for k, char in map_characters.items():
        pictures = [k for k in glob.glob(aug_path + '/%s/*' % char)]
        nb_pic = round(pictures_per_class/(1-test_size)) if round(pictures_per_class/(1-test_size))<len(pictures) else len(pictures)
        # nb_pic = len(pictures)
        for pic in np.random.choice(pictures, nb_pic):
            a = cv2.imread(pic)
            if BGR:
                a = cv2.cvtColor(a, cv2.COLOR_BGR2RGB)
            a = cv2.resize(a, (pic_size,pic_size))
            pics.append(a)
            labels.append(k)
    return np.array(pics), np.array(labels) 

In [None]:
def get_dataset(save=False, load=False, BGR=False):
    """
    Create the actual dataset split into train and test, pictures content is as float32 and
    normalized (/255.). The dataset could be saved or loaded from h5 files.
    :param save: saving or not the created dataset
    :param load: loading or not the dataset
    :param BGR: boolean to use true color for the picture (RGB instead of BGR for plt)
    :return: X_train, X_test, y_train, y_test (numpy arrays)
    """
    if load:
        h5f = h5py.File('dataset.h5','r')
        X_train = h5f['X_train'][:]
        X_test = h5f['X_test'][:]
        h5f.close()    

        h5f = h5py.File('labels.h5','r')
        y_train = h5f['y_train'][:]
        y_test = h5f['y_test'][:]
        h5f.close()    
    else:
        X, y = load_pictures(BGR)
        y = keras.utils.to_categorical(y, num_classes)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)
        if save:
            h5f = h5py.File('dataset.h5', 'w')
            h5f.create_dataset('X_train', data=X_train)
            h5f.create_dataset('X_test', data=X_test)
            h5f.close()

            h5f = h5py.File('labels.h5', 'w')
            h5f.create_dataset('y_train', data=y_train)
            h5f.create_dataset('y_test', data=y_test)
            h5f.close()
            
    X_train = X_train.astype('float32') / 255.
    X_test = X_test.astype('float32') / 255.
    print("Train", X_train.shape, y_train.shape)
    print("Test", X_test.shape, y_test.shape)
    if not load:
        dist = {k:tuple(d[k] for d in [dict(Counter(np.where(y_train==1)[1])), dict(Counter(np.where(y_test==1)[1]))]) 
                for k in range(num_classes)}
        print('\n'.join(["%s : %d train pictures & %d test pictures" % (map_characters[k], v[0], v[1]) 
            for k,v in sorted(dist.items(), key=lambda x:x[1][0], reverse=True)]))
    return X_train, X_test, y_train, y_test

In [None]:
def create_model_six_conv(input_shape):
    """
    CNN Keras model with 6 convolutions.
    :param input_shape: input shape, generally X_train.shape[1:]
    :return: Keras model, RMS prop optimizer
    """
    model = Sequential()
    model.add(Conv2D(32, (3, 3), padding='same', input_shape=input_shape))
    model.add(Activation('relu'))
    model.add(Conv2D(32, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.2))
    
    model.add(Conv2D(64, (3, 3), padding='same'))
    model.add(Activation('relu'))
    model.add(Conv2D(64, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.2))

    model.add(Conv2D(256, (3, 3), padding='same')) 
    model.add(Activation('relu'))
    model.add(Conv2D(256, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.2))

    model.add(Flatten())
    model.add(Dense(1024))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    opt = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
    
    model.compile(loss='categorical_crossentropy',
          optimizer=opt,
          metrics=['accuracy'])    
    return model, opt

In [None]:
def load_model_from_checkpoint(weights_path, six_conv=False, input_shape=(pic_size,pic_size,3)):
    model, opt = create_model_six_conv(input_shape)
    return model

In [None]:
def lr_schedule(epoch):
    lr = 0.01
    return lr*(0.1**int(epoch/10))

In [None]:
def training(model, X_train, X_test, y_train, y_test, data_augmentation=True):
    """
    Training.
    :param model: Keras sequential model
    :param data_augmentation: boolean for data_augmentation (default:True)
    :param callback: boolean for saving model checkpoints and get the best saved model
    :param six_conv: boolean for using the 6 convs model (default:False, so 4 convs)
    :return: model and epochs history (acc, loss, val_acc, val_loss for every epoch)
    """
    if data_augmentation:
        datagen = ImageDataGenerator(
            featurewise_center=False,  # set input mean to 0 over the dataset
            samplewise_center=False,  # set each sample mean to 0
            featurewise_std_normalization=False,  # divide inputs by std of the dataset
            samplewise_std_normalization=False,  # divide each input by its std
            zca_whitening=False,  # apply ZCA whitening
            rotation_range=10,  # randomly rotate images in the range (degrees, 0 to 180)
            width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
            height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
            horizontal_flip=True,  # randomly flip images
            vertical_flip=False)  # randomly flip images
        # Compute quantities required for feature-wise normalization
        # (std, mean, and principal components if ZCA whitening is applied).
        datagen.fit(X_train)
        filepath="weights_6conv_%s.hdf5" % time.strftime("%d%m/%Y")
        checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=0, save_best_only=True, mode='max')
        callbacks_list = [LearningRateScheduler(lr_schedule) ,checkpoint]
        history = model.fit(datagen.flow(X_train, y_train,
                            batch_size=batch_size),
                            steps_per_epoch=X_train.shape[0] // batch_size,
                            epochs=epochs,
                            validation_data=(X_test, y_test),
                            callbacks=callbacks_list)        
    else:
        history = model.fit(X_train, y_train,
          batch_size=batch_size,
          epochs=epochs,
          validation_data=(X_test, y_test),
          shuffle=True)
    return model, history

In [None]:
if __name__ == '__main__':
    image_augmentation(aug_images_number)
    X_train, X_test, y_train, y_test = get_dataset(save=True)
    model, opt = create_model_six_conv(X_train.shape[1:])
    model.compile(loss='categorical_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])
    model, history = training(model, X_train, X_test, y_train, y_test, data_augmentation=True)