In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
from PIL import Image

In [2]:
# Download the data (This can take a while!)
!wget -nc https://lazyprogrammer.me/course_files/fer2013.csv

--2021-07-20 15:13:06--  https://lazyprogrammer.me/course_files/fer2013.csv
Resolving lazyprogrammer.me (lazyprogrammer.me)... 172.67.213.166, 104.21.23.210, 2606:4700:3031::6815:17d2, ...
Connecting to lazyprogrammer.me (lazyprogrammer.me)|172.67.213.166|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 301072766 (287M) [text/csv]
Saving to: ‘fer2013.csv’


2021-07-20 15:18:11 (967 KB/s) - ‘fer2013.csv’ saved [301072766/301072766]



In [3]:
class NewFaceDataset(torch.utils.data.Dataset):
  """Kaggle 2015 facial expression recognition dataset."""

  def __init__(self, csv_file, split='TRAIN', transform=None):
    """
    Args:
      csv_file  (string): Path to the csv file
      train     (bool): Variable to return train/test data
      transform (callable, optional): Optional transform to be 
        applied on a sample
    """
    
    self.split = str(split.upper())
    if self.split not in {'TRAIN', 'PUBLIC_TEST', 'PRIVATE_TEST'}:
      print("Param split not in {TRAIN, PUBLIC_TEST, PRIVATE_TEST}")
      print("Assuming TRAIN")
      self.split='TRAIN'

    self.transform = transform
    in_df = pd.read_csv(csv_file)

    if self.split == 'TRAIN':
      self.data = in_df.loc[in_df['Usage']=='Training']
    elif self.split == 'PUBLIC_TEST':
      self.data = in_df.loc[in_df['Usage']=='PublicTest']
    else:
      self.data = in_df.loc[in_df['Usage']=='PrivateTest']

    self.targets = self.data['emotion'].values

    self.data['pixels'] = self.data['pixels'].apply(lambda x: x.split(" "))
    self.data['pixels'] = self.data['pixels'].apply(lambda x: [int(i) for i in x])
    self.data['pixels'] = self.data['pixels'].apply(lambda x: np.reshape(x, (48, 48)))
    self.data = self.data.drop(columns=['emotion', 'Usage'])
    self.data = self.data['pixels'].values
    #train_df = in_df.loc[in_df['Usage']=='Training']
    #public_test_df = in_df.loc[in_df['Usage']!='Training']


    #if train:
    #  self.data = np.stack(train_df['pixels'].values, axis=0)
    #  self.targets = train_df['emotion'].values
    #else:
    #  self.data = np.stack(test_df['pixels'].values, axis=0)
    #  self.targets = test_df['emotion'].values

    #self.data = torch.from_numpy(self.data).long()
    #self.targets = torch.from_numpy(self.targets).long()

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    img = self.data[idx]
    #image = []
    #for i in img:
    #  image.append(int(i))
    #image = np.array(img)
    #image = img.reshape(48, 48).astype(np.uint8)

    image = Image.fromarray(img)

    if self.transform is not None:
      image = self.transform(image)

    target = self.targets[idx]
    return image, target

In [4]:
# examples: https://pytorch.org/docs/stable/torchvision/transforms.html
transformer_train = torchvision.transforms.Compose([
    # torchvision.transforms.ColorJitter(
        #brightness=0.2, contrast=0.2, stauration=0.2, hue=0.2),
    transforms.RandomCrop(32, padding=4),
    torchvision.transforms.RandomHorizontalFlip(p=0.5),
    # torchvision.transforms.RandomRotation(degrees=15),
    torchvision.transforms.RandomAffine(0, translate=(0.1, 0.1)),
    # torchvision.transforms.RandomPerspective(),
    transforms.ToTensor(),
])

train_dataset = NewFaceDataset(
    csv_file='fer2013.csv',
    split='TRAIN',
    transform=transformer_train
)

test_dataset = NewFaceDataset(
    csv_file='fer2013.csv',
    split='PUBLIC_TEST',
    transform=transforms.ToTensor()
)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [5]:
#train_dataset.data.max()

In [6]:
temp_df = pd.read_csv('fer2013.csv')
print(temp_df)
temp_df['pixels'] = temp_df['pixels'].apply(lambda x: x.split(" "))
temp_df['pixels'] = temp_df['pixels'].apply(lambda x: [int(i) for i in x])
temp_df['pixels'] = temp_df['pixels'].apply(lambda x: np.array(x))
temp_df['pixels'] = temp_df['pixels'].apply(lambda x: x.reshape(48, 48).astype(np.uint8))
print(temp_df)
print(temp_df['pixels'].values[0].shape)

       emotion                                             pixels        Usage
0            0  70 80 82 72 58 58 60 63 54 58 60 48 89 115 121...     Training
1            0  151 150 147 155 148 133 111 140 170 174 182 15...     Training
2            2  231 212 156 164 174 138 161 173 182 200 106 38...     Training
3            4  24 32 36 30 32 23 19 20 30 41 21 22 32 34 21 1...     Training
4            6  4 0 0 0 0 0 0 0 0 0 0 0 3 15 23 28 48 50 58 84...     Training
...        ...                                                ...          ...
35882        6  50 36 17 22 23 29 33 39 34 37 37 37 39 43 48 5...  PrivateTest
35883        3  178 174 172 173 181 188 191 194 196 199 200 20...  PrivateTest
35884        0  17 17 16 23 28 22 19 17 25 26 20 24 31 19 27 9...  PrivateTest
35885        3  30 28 28 29 31 30 42 68 79 81 77 67 67 71 63 6...  PrivateTest
35886        2  19 13 14 12 13 16 21 33 50 57 71 84 97 108 122...  PrivateTest

[35887 rows x 3 columns]
       emotion            

In [7]:
#print(train_dataset.data['pixels'])

In [8]:
print(train_dataset.data)

[array([[ 70,  80,  82, ...,  52,  43,  41],
       [ 65,  61,  58, ...,  56,  52,  44],
       [ 50,  43,  54, ...,  49,  56,  47],
       ...,
       [ 91,  65,  42, ...,  72,  56,  43],
       [ 77,  82,  79, ..., 105,  70,  46],
       [ 77,  72,  84, ..., 106, 109,  82]])
 array([[151, 150, 147, ..., 129, 140, 120],
       [151, 149, 149, ..., 122, 141, 137],
       [151, 151, 156, ..., 109, 123, 146],
       ...,
       [188, 188, 121, ..., 185, 185, 186],
       [188, 187, 196, ..., 186, 182, 187],
       [186, 184, 185, ..., 193, 183, 184]])
 array([[231, 212, 156, ...,  44,  27,  16],
       [229, 175, 148, ...,  27,  35,  27],
       [214, 156, 157, ...,  28,  22,  28],
       ...,
       [241, 245, 250, ...,  57, 101, 146],
       [246, 250, 252, ...,  78, 105, 162],
       [250, 251, 250, ...,  88, 110, 152]])
 ...
 array([[ 74,  81,  87, ..., 189, 191, 192],
       [ 78,  82,  89, ..., 185, 189, 193],
       [ 81,  86,  94, ..., 176, 185, 193],
       ...,
       [ 90,  99

In [9]:
train_dataset.data.shape

(28709,)

In [10]:
train_dataset.targets

array([0, 0, 2, ..., 4, 0, 4])

In [11]:
# Number of classes
K = len(set(train_dataset.targets))
print("Number of classes:", K)

Number of classes: 7


In [12]:
# Define the model
class CNN(nn.Module):
  def __init__(self, K):
    super(CNN, self).__init__()
    self.conv_layers = nn.Sequential(
        nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=2),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2),
        nn.ReLU(),
        nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2),
        nn.ReLU()
    )
    # http://deeplearning.net/software/theano/tutorial/conv_arithmetic.html
    # "No zero padding, non-unit strides"
    # https://pytorch.org/docs/stable/nn.html
    self.dense_layers = nn.Sequential(
        nn.Dropout(0,2),
        nn.Linear(1152, 512),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(512, K)
    )

  def forward(self, X):
    out = self.conv_layers(X)
    out = out.view(out.size(0), -1)
    out = self.dense_layers(out)
    return out

In [13]:
# Instantiate the model
model = CNN(K)

In [14]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

cuda:0


CNN(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2))
    (5): ReLU()
  )
  (dense_layers): Sequential(
    (0): Dropout(p=0, inplace=2)
    (1): Linear(in_features=1152, out_features=512, bias=True)
    (2): ReLU()
    (3): Dropout(p=0.2, inplace=False)
    (4): Linear(in_features=512, out_features=7, bias=True)
  )
)

In [15]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

In [16]:
# Data loader
# Useful because it automatically generates batches in the training loop
# and takes care of shuffling

batch_size = 128
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

In [17]:
# A function to encapsulate the training loop
def batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs):
  train_losses = np.zeros(epochs)
  test_losses = np.zeros(epochs)

  for it in range(epochs):
    model.train()
    t0 = datetime.now()
    train_loss = []
    for inputs, targets in train_loader:
      # move data to GPU
      inputs, targets = inputs.to(device), targets.to(device)

      # zero the parameter gradients
      optimizer.zero_grad()

      # Forward pass
      outputs = model(inputs)
      loss = criterion(outputs, targets)

      # Backward and optimize
      loss.backward()
      optimizer.step()

      train_loss.append(loss.item())

    # Get train loss and test loss
    train_loss = np.mean(train_loss) # a little misleading

    model.eval()
    test_loss = []
    for inputs, targets in test_loader:
      inputs, targets = inputs.to(device), targets.to(device)
      outputs = model(inputs)
      loss = criterion(outputs, targets)
      test_loss.append(loss.item())
    test_loss = np.mean(test_loss)

    # Save losses
    train_losses[it] = train_loss
    test_losses[it] = test_loss

    dt = datetime.now() - t0
    print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, \
      Test Loss: {test_loss:.4f}, Duration: {dt}')
    
  return train_losses, test_losses

In [18]:
train_losses, test_losses = batch_gd(
    model, criterion, optimizer, train_loader, test_loader, epochs=80
)

TypeError: ignored

In [None]:
# Plot the train loss and test loss per iteration
plt.plot(train_losses, label='train loss')
plt.plot(test_losses, label='test loss')
plt.legend()
plt.show()

In [None]:
# Accuracy

model.eval()
n_correct = 0.
n_total = 0.
for inputs, targets in train_loader:
  # move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  # torch.max returns both max and argmax
  _, predictions = torch.max(outputs, 1)

  # update counts
  n_correct += (predictions == targets).sum().item()
  n_total += targets.shape[0]

train_acc = n_correct / n_total

n_correct = 0.
n_total = 0.
for inputs, targets in test_loader:
  # move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  # torch.max returns both max and argmax
  _, predictions = torch.max(outputs, 1)

  # update counts
  n_correct += (predictions == targets).sum().item()
  n_total += targets.shape[0]

test_acc = n_correct / n_total
print(f"Train acc: {train_acc:.4f}, Test acc: {test_acc:.4f}")

In [None]:
# Plot confusion matrix
from sklearn.metrics import confusion_matrix
import numpy as np
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
  
  """
  This function prints and plots the confusion matrix.
  Normalization can be applied by setting 'normalize=True'.
  """
  if normalize:
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    print("Normalized confusion matrix")
  else:
    print("Confusion matrix, without normalization")

  print(cm)

  plt.imshow(cm, interpolation='nearest', cmap=cmap)
  plt.title(title)
  plt.colorbar()
  tick_marks = np.arange(len(classes))
  plt.xticks(tick_marks, classes, rotation=45)
  plt.yticks(tick_marks, classes)

  fmt = '.2f' if normalize else 'd'
  thresh = cm.max() / 2.
  for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(j, i, format(cm[i, j], fmt),
             horizontalalignment="center",
             color="white" if cm[i, j] > thresh else "black")
    
  plt.tight_layout()
  plt.ylabel("True label")
  plt.xlabel("Predicted label")
  plt.show()

In [None]:
# Get all predictions in an array and plot confusion matrix

x_test = test_dataset.data.values
y_test = test_dataset.targets
p_test = np.array([])
for inputs, targets in test_loader:
  # Move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  _, predictions = torch.max(outputs, 1)

  # Update p_test
  p_test = np.concatenate((p_test, predictions.cpu().numpy()))

cm = confusion_matrix(y_test, p_test)
plot_confusion_matrix(cm, list(range(7)))

In [None]:
# Label mapping
labels = '''Angry, 
Disgust, 
Fear, 
Happy, 
Sad, 
Surprise, 
Neutral
'''.split("\n")

In [None]:
# Show some misclassified examples
misclassified_idx = np.where(p_test != y_test)[0]
i = np.random.choice(misclassified_idx)
print(x_test[i])
plt.imshow(x_test[i].reshape(48, 48), cmap='gray')
plt.title("True label: %s Predicted: %s" % (labels[y_test[i]], int(p_test[i])))