In [1]:
import pandas as pd
import numpy as np
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.autograd import Variable
import random

# Download Data 

In [109]:
emotions = pd.read_csv('../Downloads/fer2013/fer2013.csv')
print(len(emotions))
print(len(emotions.loc[0, 'pixels'].split(' ')))
emotions.head()

35887
2304


Unnamed: 0,emotion,pixels,Usage
0,0,70 80 82 72 58 58 60 63 54 58 60 48 89 115 121...,Training
1,0,151 150 147 155 148 133 111 140 170 174 182 15...,Training
2,2,231 212 156 164 174 138 161 173 182 200 106 38...,Training
3,4,24 32 36 30 32 23 19 20 30 41 21 22 32 34 21 1...,Training
4,6,4 0 0 0 0 0 0 0 0 0 0 0 3 15 23 28 48 50 58 84...,Training


# Summary Statistics 

In [111]:
#Training-Public_Test-Private_Test split
emotions.groupby('Usage').count()[['emotion']]

Unnamed: 0_level_0,emotion
Usage,Unnamed: 1_level_1
PrivateTest,3589
PublicTest,3589
Training,28709


In [112]:
#Frequency of each label
print('0=Angry, 1=Disgust, 2=Fear, 3=Happy, 4=Sad, 5=Surprise, 6=Neutral')
table = emotions.groupby('emotion').count()[['Usage']]
table['Pct'] = table['Usage']/table['Usage'].sum()
table['Pct'] = table['Pct'].map(lambda x: round(x, 3)*100)
table

0=Angry, 1=Disgust, 2=Fear, 3=Happy, 4=Sad, 5=Surprise, 6=Neutral


Unnamed: 0_level_0,Usage,Pct
emotion,Unnamed: 1_level_1,Unnamed: 2_level_1
0,4953,13.8
1,547,1.5
2,5121,14.3
3,8989,25.0
4,6077,16.9
5,4002,11.2
6,6198,17.3


## Process Input into Tensors

 

In [5]:
#Process Input into Tensors
pixel_matrix = np.zeros(35887*2304).reshape(35887, 2304)
for r in range(len(emotions)):
    pixel_matrix[r, :] = np.array(emotions.loc[r, 'pixels'].split(' ')).astype(float)

Pixel_Tensor = torch.from_numpy(pixel_matrix).float()
Emotion_Tensor = torch.from_numpy(np.asarray(emotions['emotion']))
Pixel_Tensor[:5]


   70    80    82  ...    106   109    82
  151   150   147  ...    193   183   184
  231   212   156  ...     88   110   152
   24    32    36  ...    142   143   142
    4     0     0  ...     30    29    30
[torch.FloatTensor of size 5x2304]

In [6]:
X_train = Pixel_Tensor[:28709]
X_test = Pixel_Tensor[28709:]

y_train = Emotion_Tensor[:28709]
y_test = Emotion_Tensor[28709:]

In [7]:
X_train[:5]


   70    80    82  ...    106   109    82
  151   150   147  ...    193   183   184
  231   212   156  ...     88   110   152
   24    32    36  ...    142   143   142
    4     0     0  ...     30    29    30
[torch.FloatTensor of size 5x2304]

# Basic Model - Feedforward Neutral Net 

In [113]:
class Network1Layer(nn.Module):
    def __init__(self, input_size, n_hidden, output_size):
        super(Network1Layer, self).__init__()
        self.input_size = input_size
        self.layer1 = nn.Linear(input_size, n_hidden)
        self.layer2 = nn.Linear(n_hidden, n_hidden)
        self.layer3 = nn.Linear(n_hidden, output_size)

    def forward(self, x):
        x = self.layer1(x)
        x = F.relu(x)
        x = self.layer2(x)
        x = F.relu(x)
        x = self.layer3(x)
        return F.log_softmax(x) 
        #use log_softmax to convert vector elements to have probabilistic interpretation

In [114]:
def batch_iter(batch_size, sequences, labels):
    start = -1 * batch_size
    dataset_size = sequences.size()[0]
    order = list(range(dataset_size))
    random.shuffle(order)

    while True:
        start += batch_size
        if start > dataset_size - batch_size:
            # Start another epoch.
            start = 0
            random.shuffle(order)
            break
    
        batch_indices = order[start:start + batch_size]
        batch_indices_tensor = torch.LongTensor(batch_indices)
        batch_train = sequences[batch_indices_tensor].type(torch.FloatTensor)
        batch_train_labels = labels[batch_indices_tensor]
        yield [batch_train, batch_train_labels]

def eval_iter(batch_size,sequence_tensors,labels):
    '''Returns list of length batch_size, each entry is a
    tuple with LongTensors of sequences and labels, respectively'''
    batches = []
    dataset_size = len(sequence_tensors)
    start = -1 * batch_size
    order = list(range(dataset_size))
    random.shuffle(order)

    while start < dataset_size - batch_size:
        start += batch_size
        batch_indices = order[start:start + batch_size]
        batch_indices_tensor = torch.LongTensor(batch_indices)
        batch_sequences = sequence_tensors[batch_indices_tensor].type(torch.FloatTensor)
        batch_test_labels = labels[batch_indices_tensor]
        if len(batch_sequences) == batch_size:
            batches.append((torch.stack(batch_sequences), batch_test_labels))
        else:
            continue
    return batches

# Define Training Loop

* Loop batches of samples in the training set
* Run each batch through the model (forward pass)
* Compute the loss
* Compute the gradients with respect to model parameters (backward pass)
* Update the parameters

In [115]:
#Motivation behine model.train vs. model.eval
#Sometimes, there are techniques such as Dropout (to avoid overfitting) that you only want to use 
#during training, not testing

def train(epoch, train_iter):
    model.train()
    for batch_idx, (data, target) in enumerate(train_iter):
        data, target = Variable(data), Variable(target)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(X_train),
                100 * batch_idx * len(data) / len(X_train), loss.data[0]))

# Define Testing Loop

* Loop over batches of samples in the testing set
* Run each batch through the model (forward pass)
* Compute the loss and accuracy
* Do not compute gradients or update model parameters 
* We are saving the testing data to evaluate how the model is doing on data it has not been trained on

In [116]:
#Similar to training loop, except you're not altering the parameters.

def test(test_iter):
    model.eval()
    test_loss = 0
    correct = 0
    for batch_idx, (data, target) in enumerate(test_iter):
        data, target = Variable(data, volatile=True), Variable(target)
        output = model(data)
        test_loss += F.nll_loss(output, target, size_average=False).data[0] # sum up batch loss                                                               
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability                                                                 
        correct += pred.eq(target.data.view_as(pred)).cpu().sum()

    test_loss /= len(X_test)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(X_test),
        100 * correct / len(X_test)))

In [117]:
def train_test(num_epochs, train_iter, test_iter):
    epoch = 0
    while epoch < num_epochs:
        model.train()
        for batch_idx, (data, target) in enumerate(train_iter):
            data, target = Variable(data), Variable(target)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(X_train),
                    100 * batch_idx * len(data) / len(X_train), loss.data[0]))
        
        model.eval()
        test_loss = 0
        correct = 0
        for batch_idx, (data, target) in enumerate(test_iter):
            data, target = Variable(data, volatile=True), Variable(target)
            output = model(data)
            test_loss += F.nll_loss(output, target, size_average=False).data[0] # sum up batch loss                                                               
            pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability                                                                 
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()

        test_loss /= len(X_test)
        print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            test_loss, correct, len(X_test),
            100 * correct / len(X_test)))
        
        epoch=epoch+1

# Initialize the Model and Optimizer

In [118]:
# Training settings 
input_size  = 48*48   # images are 48x48 pixels
n_hidden    = 100     # number of hidden units
output_size = 7      # there are 7 classes - seven different types of emotions

model = Network1Layer(input_size, n_hidden, output_size)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

batch_size = 50
num_epochs = 3

# Train the Model

* We will only train for a few epochs here
* Normally we would train for longer
* Depending on the dataset and model size, this can take days or weeks

In [119]:
#train_iter = batch_iter(batch_size, X_train, y_train)
#test_iter = eval_iter(batch_size, X_test, y_test)

#train_test(num_epochs, train_iter, test_iter)
#Don't use accuracy as the loss, because we want the loss function to be differentiable!
for epoch in range(1, 6):
    train_iter = batch_iter(batch_size, X_train, y_train)
    test_iter = eval_iter(batch_size, X_test, y_test)
    train(epoch, train_iter)
    test(test_iter)


Test set: Average loss: 1.9141, Accuracy: 1770/7178 (25%)


Test set: Average loss: 1.8988, Accuracy: 1770/7178 (25%)


Test set: Average loss: 1.8972, Accuracy: 1770/7178 (25%)


Test set: Average loss: 1.8919, Accuracy: 1769/7178 (25%)


Test set: Average loss: 1.8916, Accuracy: 1762/7178 (25%)

