In [1]:
!pip install imageio
!pip install keras

Collecting imageio
[?25l  Downloading https://files.pythonhosted.org/packages/a7/1d/33c8686072148b3b0fcc12a2e0857dd8316b8ae20a0fa66c8d6a6d01c05c/imageio-2.3.0-py2.py3-none-any.whl (3.3MB)
[K    100% |████████████████████████████████| 3.3MB 7.7MB/s 
Installing collected packages: imageio
Successfully installed imageio-2.3.0


In [2]:
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils import shuffle
from sklearn.model_selection import StratifiedShuffleSplit
import matplotlib.pyplot as plt
import pandas as pd
import urllib.request
import os, tarfile
import imageio
import tensorflow as tf
from scipy.io import loadmat
# from tensorflow.examples.tutorials.mnist import input_data
%matplotlib inline

print(tf.test.gpu_device_name())


import keras
from keras import backend as K
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Reshape
from keras.layers import Conv2D, MaxPool2D
from keras.optimizers import SGD, Adam

/device:GPU:0


Using TensorFlow backend.


In [0]:
FER_URL = 'https://www.dropbox.com/s/uv14ve5uky71i99/fer2013.csv?dl=1'

# Utility functions

#### ToDos
- Create a function to fetch data from a url.
- Check if it is already downloaded.
- Check if the file is csv or tar gz etc.
- Add cross-validation code to be able to use sklearn cross_val_score function to quickly evaluate the performance.

In [0]:
def fetch_data(URL, DOWNLOAD_FOLDER, DOWNLOAD_FILE):
  if not os.path.isdir(DOWNLOAD_FOLDER):
   os.makedirs(DOWNLOAD_FOLDER)
  
  if not os.path.isfile(DOWNLOAD_FOLDER+DOWNLOAD_FILE):
    print('Beginning file download...')
    urllib.request.urlretrieve(URL, DOWNLOAD_FOLDER+DOWNLOAD_FILE)
    print('Done.')
  

In [0]:
def split_train_test(XY, n_splits=1, test_size=0.2, random_state=42):
    split = StratifiedShuffleSplit(n_splits=n_splits, test_size=test_size, random_state=random_state)
    for train_index, test_index in split.split(XY[0], XY[1]):
        X_train, Y_train = XY[0][train_index,:], XY[1][train_index]
        X_test, Y_test = XY[0][test_index,:], XY[1][test_index]
        
    return X_train, Y_train, X_test, Y_test

In [0]:
def get_fer_data(url, download_folder, download_file, split_data=False):
    
    fetch_data(url, download_folder, download_file)
    df = pd.read_csv(download_folder+download_file)
    Y = df['emotion'].as_matrix()
    X_str = df['pixels'].as_matrix()
    X = []
    for row in X_str:
        X.append(np.fromstring(row, dtype=int, sep=' '))

    X = np.array(X) / 255.0
    
    #X = (X - X.mean(axis=1, keepdims=True)) / X.std(axis=1, keepdims=True)

    usage = df['Usage'].as_matrix()
    X_train, Y_train = X[usage=='Training'], Y[usage=='Training']
    X_test, Y_test = X[(usage=='PrivateTest') | (usage=='PublicTest')], Y[(usage=='PrivateTest') | (usage=='PublicTest')]

    if split_data:
        return split_train_test((X_new, Y), n_splits=1, test_size=0.2, random_state=42)

    return X_train, np.expand_dims(Y_train,1), X_test, np.expand_dims(Y_test,1)

In [7]:
labels = np.arange(0,10,1)
print(labels[1:])

[1 2 3 4 5 6 7 8 9]


In [0]:
def one_hot_encoder(label):
    encoder = OneHotEncoder(dtype=np.float32)
    label_1hot = encoder.fit_transform(label.reshape(-1,1))
    print('The labels are: {}'.format(np.unique(label)))
    return label_1hot

# Load data

In [0]:
root_folder = 'drive/app/fer/'
# root_folder = 'D:/dev/data/'

In [10]:
X_train, Y_train, X_test, Y_test = get_fer_data(FER_URL, root_folder, 'fer2013.csv',
                                                split_data=False)

print("Train: [{}, {}], Test: [{}, {}]".format(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape))

Beginning file download...
Done.
Train: [(28709, 2304), (28709, 1)], Test: [(7178, 2304), (7178, 1)]


In [11]:
Y_train_1hot = one_hot_encoder(Y_train).toarray().view(np.float32)
Y_test_1hot = one_hot_encoder(Y_test).toarray().view(np.float32)
# print(Y_train_1hot[0:2])
# print(type(Y_train_1hot))

The labels are: [0 1 2 3 4 5 6]
The labels are: [0 1 2 3 4 5 6]


In [0]:
# plt.imshow(X_train[600,:,:,:])
# plt.title(Y_train[600,0])

In [0]:
class CNN(object):
    def __init__(self, width, height, n_channels, n_classes):

        self.width = width
        self.height = height
        self.channels = n_channels
        self.classes = n_classes
        
        self.model = Sequential()
    
    def compile(self, optimizer, loss):
        
        #model.add(Reshape(input_shape + (1, ), input_shape=input_shape))
        #self.model.add(Reshape((self.width*self.height*self.channels)+(1,), input_shape=(self.width*self.height*self.channels)))
        #self.model.add(Reshape((self.width, self.height, self.channels), input_shape=(self.width*self.height*self.channels, 1)))
        self.model.add(Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', 
                              input_shape=(self.width, self.height, self.channels)))
        self.model.add(Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu'))
        self.model.add(MaxPool2D(pool_size=(2,2)))
        self.model.add(Dropout(rate=0.5))
        
        self.model.add(Conv2D(filters=128, kernel_size=(3,3), padding='same', activation='relu'))
        self.model.add(Conv2D(filters=256, kernel_size=(3,3), padding='same', activation='relu'))
        self.model.add(MaxPool2D(pool_size=(2,2)))
        self.model.add(Dropout(rate=0.5))
        
        self.model.add(Flatten())
        self.model.add(Dense(1024, activation='relu'))
        self.model.add(Dropout(rate=0.5))
        self.model.add(Dense(self.classes, activation='softmax'))
        
        self.model.compile(loss=loss, optimizer=optimizer)


    def fit(self, X, Y, epochs, batch_size, print_time=None, X_test=None, Y_test=None):

        n_samples = X.shape[0]
        X = np.reshape(X, (n_samples, self.width, self.height, self.channels))
        n_samples = X_test.shape[0]
        X_test = np.reshape(X_test, (n_samples, self.width, self.height, self.channels))
        Y_1hot = one_hot_encoder(Y).toarray().view(np.float32)
        
        if X_test is None:
            self.model.fit(X, Y_1hot, batch_size=batch_size, epochs=epochs)
        else:
            Y_test_1hot = one_hot_encoder(Y_test).toarray().view(np.float32)
            self.model.fit(X, Y_1hot, batch_size=batch_size, epochs=epochs, validation_data=(X_test, Y_test_1hot))
        


    def predict(self, X, Y, batch_size, return_type='score'):
        #return_type: 'probs', 'score', 'predictions'
        
        if (return_type=='probs'):
            probs = self.model.predict(X)
        elif (return_type=='predictions'):
            return self.model.predict_classes(X)
        elif return_type=='score':
            return self.model.evaluate(X, Y)
        
        return None
    
    def score(self, Y, predictions):
        return 100*(predictions==Y).sum()/predictions.shape[0]


In [0]:
#@title Parameters
WIDTH = 48
HEIGHT = 48
N_CHANNELS = 1
N_CLASSES = 7
BATCH_SIZE = 32
MAX_ITER = 100
N_BATCHES = X_train.shape[0]//BATCH_SIZE
PRINT_TIME = N_BATCHES//2
TEST_N_BATCHES = X_test.shape[0]//BATCH_SIZE

In [0]:
ann = CNN(WIDTH, HEIGHT, N_CHANNELS, N_CLASSES)

In [0]:
optimizer = Adam(lr=0.001, decay=1e-5)
loss = 'categorical_crossentropy'
ann.compile(optimizer, loss)

In [21]:
history = ann.fit(X_train, Y_train, MAX_ITER, BATCH_SIZE, X_test=X_test, Y_test=Y_test)

The labels are: [0 1 2 3 4 5 6]
The labels are: [0 1 2 3 4 5 6]
Train on 28709 samples, validate on 7178 samples
Epoch 1/10

Epoch 2/10

Epoch 3/10

Epoch 4/10

Epoch 5/10

Epoch 6/10

Epoch 7/10

Epoch 8/10

Epoch 9/10

Epoch 10/10



In [26]:
n_samples = X_test.shape[0]
X_test_reshaped = np.reshape(X_test, (n_samples, WIDTH, HEIGHT, N_CHANNELS))
predictions = ann.predict(X_test_reshaped, Y_test, BATCH_SIZE, 'predictions')
print('Accuracy: {}%'.format(np.round(100*(predictions==np.squeeze(Y_test)).sum()/predictions.shape[0], 2)))

Accuracy: 58.46%


In [25]:
print(predictions.shape)

(7178,)


In [0]:
!kill -9 -1