In [59]:
import pandas as pd
import numpy as np

from scipy.ndimage import rotate
from scipy.ndimage import gaussian_filter

import matplotlib.pyplot as plt

import random

import keras

%matplotlib inline

Using TensorFlow backend.


In [12]:
mapping = {
    0: 'Angry',
    1: 'Disgust',
    2: 'Fear',
    3: 'Happy',
    4: 'Sad',
    5: 'Surprise',
    6: 'Neutral'
}

imapping = {
    'Angry': 0,
    'Disgust': 1,
    'Fear': 2,
    'Happy': 3,
    'Sad': 4,
    'Surprise': 5,
    'Neutral': 6
}


def flip_horizontally(image):
    return np.flip(image, axis=1)


def add_noise(image):
    return image + np.random.poisson(image.astype('float64'))


def rotate_right(image, angle=-20):
    return rotate(image, angle, mode='reflect', reshape=False)


def rotate_left(image, angle=20):
    return rotate(image, angle, mode='reflect', reshape=False)


def add_blur(image):
    return gaussian_filter(image, sigma=1)

In [5]:
def find_subsets(df: pd.DataFrame, class_to_fraction: dict):
    """
    select random subsets for given classes with given fractions
    :param df: data
    :param class_to_fraction: dictionary {'emotion' : fraction of the data to that is going to be augmented}
    :return: indices of the images to augmented
    """
    data = df.loc[df['y'].isin(class_to_fraction.keys())]
    indices = []
    for emotion in class_to_fraction.keys():
        temp = data.loc[data['y'] == emotion]
        indices = indices + random.sample(temp.index.tolist(), int(class_to_fraction[emotion] * len(temp)))

    return indices

In [6]:
class Augmenter:

    @staticmethod
    def augment_images(df: pd.DataFrame, target_file: str = None, class_to_fraction: dict, augmentations: list) -> pd.DataFrame:

        # df = pd.read_csv(source_file)

        df['category'] = df['category'].str.strip()

        del df['Unnamed: 0']

        df['y'] = df['y'].map(mapping)

        indices = find_subsets(df, class_to_fraction)

        print(len(indices))
        temp = []

        if 'flip' in augmentations:
            for index in indices:
                temp.append([df.iloc[index, 0]] + flip_horizontally(
                    np.array(df.iloc[index, 1:2305]).reshape((48, 48))).flatten().tolist() + [df.iloc[index, 2305]])

        if 'noise' in augmentations:
            for index in indices:
                temp.append([df.iloc[index, 0]] + add_noise(
                    np.array(df.iloc[index, 1:2305]).reshape((48, 48))).flatten().tolist() + [df.iloc[index, 2305]])

        if 'rotate_right' in augmentations:
            for index in indices:
                temp.append([df.iloc[index, 0]] + rotate_right(
                    np.array(df.iloc[index, 1:2305], dtype='int').reshape((48, 48))).flatten().tolist() + [df.iloc[index, 2305]])

        if 'rotate_left' in augmentations:
            for index in indices:
                temp.append([df.iloc[index, 0]] + rotate_left(
                    np.array(df.iloc[index, 1:2305], dtype='int').reshape((48, 48))).flatten().tolist() + [df.iloc[index, 2305]])

        if 'blur' in augmentations:
            for index in indices:
                temp.append([df.iloc[index, 0]] + add_blur(
                    np.array(df.iloc[index, 1:2305], dtype='int').reshape((48, 48))).flatten().tolist() + [df.iloc[index, 2305]])

        print(len(temp))

        data = pd.DataFrame(temp, columns=df.columns)

        if target_file:
            data.to_csv(target_file)

        return data

### Read data

In [35]:
df = pd.read_csv('../data/emotions/emotions.csv')

In [36]:
df['category'] = df['category'].str.strip()

In [37]:
train = df[df['category'] == 'Training']
valid = df.loc[df.category == 'PublicTest',:]
test  = df.loc[df.category == 'PrivateTest',:]

#### Create a dictionary with classes fractions to augment

In [38]:
ctf = {
    'Disgust': 1,
    'Sad': 0.1,
    'Fear': 0.1,
    'Neutral': 0.1,
    'Angry': 0.1
}

In [39]:
aug = Augmenter.augment_images(train.copy(), None, ctf, ['flip', 'rotate_right', 'rotate_left', 'blur', 'noise'])

2223
11115


In [41]:
train.head()

Unnamed: 0.1,Unnamed: 0,y,0,1,2,3,4,5,6,7,...,2295,2296,2297,2298,2299,2300,2301,2302,2303,category
0,0,0,70,80,82,72,58,58,60,63,...,182,183,136,106,116,95,106,109,82,Training
1,1,0,151,150,147,155,148,133,111,140,...,108,95,108,102,67,171,193,183,184,Training
2,2,2,231,212,156,164,174,138,161,173,...,138,152,122,114,101,97,88,110,152,Training
3,3,4,24,32,36,30,32,23,19,20,...,126,132,132,133,136,139,142,143,142,Training
4,4,6,4,0,0,0,0,0,0,0,...,34,31,31,31,27,31,30,29,30,Training


In [42]:
aug.head()

Unnamed: 0,y,0,1,2,3,4,5,6,7,8,...,2295,2296,2297,2298,2299,2300,2301,2302,2303,category
0,Disgust,83,75,87,83,90,89,79,85,84,...,233,171,172,218,215,200,217,221,222,Training
1,Disgust,58,58,59,59,59,60,61,62,61,...,3,0,0,0,0,0,0,0,0,Training
2,Disgust,132,132,132,131,131,131,131,132,132,...,140,140,141,141,141,141,142,144,146,Training
3,Disgust,19,9,5,7,9,9,12,10,11,...,23,22,21,18,15,13,11,12,14,Training
4,Disgust,116,123,124,120,120,120,110,101,91,...,222,223,222,220,217,202,181,140,136,Training


In [43]:
del train['Unnamed: 0']
del valid['Unnamed: 0']
del test['Unnamed: 0']

In [45]:
aug['y'] = aug['y'].map(imapping)

In [50]:
train = train.append(aug.sample(frac=1), ignore_index=True)

In [54]:
X_train = np.array(train.iloc[:, 1:2305])
y_train = np.array(train.loc[:, ['y']])

X_val = np.array(valid.iloc[:, 1:2305])
y_val = np.array(valid.loc[:, ['y']])

X_test = np.array(test.iloc[:, 1:2305])
y_test = np.array(test.loc[:, ['y']])

In [60]:
y_train = keras.utils.to_categorical(y_train, 7)
y_val = keras.utils.to_categorical(y_val, 7)
y_test = keras.utils.to_categorical(y_test, 7)

X_train_r = X_train.reshape((len(X_train), 48, 48))
X_val_r   = X_val.reshape((len(X_val), 48, 48))
X_test_r   = X_test.reshape((len(X_test), 48, 48))

In [61]:
X_train_r = X_train_r / 255
X_val_r = X_val_r / 255
X_test_r = X_test_r / 255

X_train_rgb = np.stack((X_train_r,) * 3, axis = -1)
X_val_rgb = np.stack((X_val_r, ) * 3, axis = -1)
X_test_rgb = np.stack((X_test_r, ) * 3, axis = -1)

MemoryError: 

In [None]:
X_train_bw = X_train_r.reshape((len(X_train_r), 48, 48, 1))
X_val_bw = X_val_r.reshape((len(X_val_r), 48, 48, 1))
X_test_bw = X_test_r.reshape((len(X_test_r), 48, 48, 1))

### Model

In [62]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras import optimizers

from keras.models import model_from_json

from keras.metrics import categorical_accuracy

np.random.seed(100)

In [63]:
model = Sequential()

model.add(Conv2D(64,(3,3), padding='same', input_shape=(48, 48,1)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(128,(5,5), padding='same'))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(512,(3,3), padding='same'))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(512,(3,3), padding='same'))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())

model.add(Dense(256))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.25))

model.add(Dense(512))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.25))

model.add(Dense(7, activation='softmax'))

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=[categorical_accuracy])

In [None]:
history = model.fit(X_train_r, y_train, batch_size=128, epochs=100, validation_data=(X_val_r, y_val))