## **Load training and validation sets**

In [53]:
import os
import sys
from google.colab import drive
from google.colab.patches import cv2_imshow
import matplotlib.pyplot as plt
import numpy as np
import cv2
import pandas as pd
import dlib
from scipy import ndimage
import tarfile
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision

%mkdir -p /content/data/
%cd /content/data

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
train_path = '/content/drive/My Drive/pretraining/training.tar'
val_path = '/content/drive/My Drive/pretraining/validation.tar'

# Load training set and validation set
for fpath in [train_path, val_path]:
  print('Extracting {}...'.format(fpath.split('/')[-1]))
  with tarfile.open(fpath) as tar:
    tar.extractall()

Extracting training.tar...
Extracting validation.tar...


The images of the training set should now be loaded at content/data/training and the images of the validation set at content/data/validation.

## **Building training and validation sets**

In [58]:
class CelebAU(Dataset):
  """CelebAU dataset labeled by presence of action units (AU)"""

  def __init__(self, train=False, intensity=False, transform=None):
    """
    Args:
      - label_csv: Path to the csv file with action unit labels.
      - train: training set if True, otherwise validation set
      - intensity (bool): labels are intensities (between 0 and 5) rather
                          than presence (either 0 or 1).  
      - transform: transform applied to an image input
    """
    self.train = train
    if train:
      label_path = '/content/drive/My Drive/pretraining/train_labels.csv'
      self.root_dir = '/content/data/training'
    else:
      label_path = '/content/drive/My Drive/pretraining/val_labels.csv'
      self.root_dir = '/content/data/validation'
    self.au_frame = pd.read_csv(label_path, index_col=[0, 1])
    if intensity:
      self.label_cols = [' AU01_r', ' AU02_r', ' AU04_r', ' AU05_r', ' AU06_r',
                         ' AU07_r', ' AU09_r', ' AU10_r', ' AU12_r', ' AU14_r',
                         ' AU15_r', ' AU17_r', ' AU20_r', ' AU23_r', ' AU25_r',
                         ' AU26_r', ' AU28_c', ' AU45_r']
    else:
      self.label_cols = [' AU01_c', ' AU02_c', ' AU04_c', ' AU05_c', ' AU06_c',
                         ' AU07_c', ' AU09_c', ' AU10_c', ' AU12_c', ' AU14_c',
                         ' AU15_c', ' AU17_c', ' AU20_c', ' AU23_c', ' AU25_c',
                         ' AU26_c', ' AU28_c', ' AU45_c']
    self.intensity = intensity
    self.transform = transform


  def __len__(self):
    return len(self.au_frame)

  def __getitem__(self, idx):
    """
    Returns a dictionary containing the image and its label if a face is
    detected. Otherwise, return None.
    """
    # Get image at idx
    image_id = self.au_frame.iloc[idx, 0]
    image_path = self.root_dir + '/' + str(image_id).zfill(6) + '.jpg'
    image = cv2.imread(image_path)
    image = Image.fromarray(image)
    
    # Get AU labels
    aus = self.au_frame.iloc[idx][self.label_cols]
    aus = np.array(aus, dtype=float)

    if self.transform:
      try:
        image = self.transform(image)
      except ValueError as e:
        return None

    sample = {'image': image, 'labels': aus}

    return sample

In [137]:
class align_faces(object):

    def __call__(self, image):

        start = time.time()

        # Since the images are loaded it PIL
        image = np.asarray(image)


        # The percentage value of how far in the picture the left eye should be
        LEFT_EYE_CORD = (0.25, 0.2)
        DIMENSIONS = 64

        predictor_path = '/content/drive/My Drive/AUDetector/shape_predictor_68_face_landmarks.dat'
        shape_predictor = dlib.shape_predictor(predictor_path)
        face_detector = dlib.get_frontal_face_detector()

        

        faces = face_detector(image, 0)
        print(time.time() - start)

        if not faces:
          raise ValueError("Image has no detectable faces")

        # assumption is made that there is only one
        for face in faces:
            landmarks = shape_predictor(image, face)

            # iterating and converting from points object due to limitations
            landmarks = landmarks.parts()
            landmarks = self.convert_to_np(landmarks)

            # To Gauge Scale
            maximum = np.max(landmarks[17:, :], axis=0)
            minimum = np.min(landmarks[17:, :], axis=0)

            # eye landmarks
            left = landmarks[36:42]
            right = landmarks[42:48]

            # pupil coordinates
            left = np.mean(left, axis=0, dtype=np.int)
            right = np.mean(right, axis=0, dtype=np.int)

            centre = np.vstack((left, right))
            centre = np.mean(centre, axis=0, dtype=np.int)

            diff = right - left
            angle = np.degrees(np.arctan2(diff[1], diff[0]))

            # find the length of the face, and use that for our scale
            y_scale = maximum[1] - minimum[1]
            y_scale = y_scale + 0.2 * y_scale


            M = cv2.getRotationMatrix2D((centre[0], centre[1]), angle, DIMENSIONS / y_scale)

            # translate the image by eye location
            # align the x to the center
            #
            tX = DIMENSIONS // 2
            tY = DIMENSIONS * LEFT_EYE_CORD[1]

            M[0, 2] += (tX - centre[0])
            M[1, 2] += (tY - centre[1])

            image2 = cv2.warpAffine(image, M, (DIMENSIONS, DIMENSIONS),
                                    flags=cv2.INTER_CUBIC)
          

            # convert back to PIL
            return Image.fromarray(image2)

    @staticmethod
    def convert_to_np(points):
        np_points = np.array([], dtype=np.int)
        while points:
            point = points.pop()
            np_points = np.append(np_points, (point.x, point.y))

        np_points = np_points.reshape((-1, 2))
        np_points = np.flip(np_points, axis=0)
        return np_points

In [138]:
transform = torchvision.transforms.Compose(
    [torchvision.transforms.Grayscale(),
     align_faces(),
     torchvision.transforms.ToTensor(),
     torchvision.transforms.Normalize(0.5, 0.5)
     ]
)

In [139]:
val_set = CelebAU(train=False, transform=transform)
val_loader = DataLoader(val_set, batch_size=32, collate_fn=collate_fn, 
                        shuffle=True, num_workers=2)

def imshow(img):
    # img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

dataiter = iter(val_loader)
batch = dataiter.next()
imshow(torchvision.utils.make_grid(batch['image']))

In [None]:
# start = time.time()
for i_batch, sample_batched in enumerate(val_loader):
    # print(i_batch, sample_batched['image'].size(),
    #       sample_batched['labels'].size())
    if i_batch == 1:
      break
# print(time.time() - start)

# Roughly 50 sec per minibatch -> 62.5 hours per epoch 

##**Transforms**

## **Defining the model**

In [None]:
class AUDetector(nn.Module):

  def __init__(self):
    super(AUDetector, self).__init__()
    self.conv1 = nn.Sequential(nn.Conv2d(1, 64, 3), nn.ReLU(),
                               nn.Conv2d(64, 64, 3), nn.ReLU(),
                               nn.MaxPool2d(2, stride=2), nn.Dropout2d(p=0.25))

    self.conv2 = nn.Sequential(nn.Conv2d(64, 128, 3), nn.ReLU(),
                               nn.Conv2d(128, 128, 3), nn.ReLU(),
                               nn.MaxPool2d(2, stride=2), nn.Dropout2d(p=0.25))

    self.conv3 = nn.Sequential(nn.Conv2d(128, 256, 3), nn.ReLU(),
                               nn.Conv2d(256, 256, 3), nn.ReLU(),
                               nn.Conv2d(256, 256, 3), nn.ReLU(),
                               nn.MaxPool2d(2, stride=2), nn.Dropout2d(p=0.25))

    self.conv4 = nn.Sequential(nn.Conv2d(256, 256, 3), nn.ReLU(),
                               nn.Conv2d(256, 256, 3), nn.ReLU(),
                               nn.Conv2d(256, 256, 3), nn.ReLU(),
                               nn.MaxPool2d(2, stride=2), nn.Dropout2d(p=0.25))

    self.fc5 = nn.Sequential(nn.Flatten(), nn.Linear(4096, 1024), nn.ReLU(),
                             nn.Dropout(p=0.5))

    self.fc6 = nn.Sequential(nn.Flatten(), nn.Linear(1024, 1024), nn.ReLU(),
                             nn.Dropout(p=0.5))

    self.out = nn.Linear(1024, 18)

  def forward(self, x):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)
    x = self.conv4(x)
    x = self.fc5(x)
    x = self.fc6(x)
    x = self.out(x)
    return x

  def predict(self, x):
    logits = self.forward(x)
    return F.sigmoid(logits) 

## **Training loop**

In [None]:
class AttrDict(dict):
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(*args, **kwargs)
        self.__dict__ = self

transform = torchvision.transforms.Compose(
    [torchvision.transforms.Grayscale(),
     align_faces(),
     torchvision.transforms.ToTensor(),
     torchvision.transforms.Normalize(0.5, 0.5)
     ]
)



In [88]:
def collate_fn(batch):
  """
  Used to process the list of samples to form a batch. Ignores images
  where no faces were detected.
  """
  batch = list(filter(lambda x: x is not None, batch))
  return torch.utils.data.dataloader.default_collate(batch)

In [None]:
def validation_step(convnet, val_loader, criterion, bs):
  num_matches = 0.0
  total = 0.0
  losses = []

  with torch.no_grad():
    for i, data in enumerate(val_loader, 0): 
      imgs, labels = data
      imgs, labels = imgs.cuda(), labels.cuda()
      outputs = convnet(imgs)

      # Compute batch loss
      val_loss = criterion(outputs, labels)
      losses.append(val_loss.data.item())

      # Compute batch accuracy, set probabilities > 0.5 to 1
      t = torch.Tensor([0.5])
      num_matches += ((F.sigmoid(outputs) > t) == labels).sum()
      total += labels.size(0) * 18

  val_loss = np.mean(losses)
  val_acc = 100 * num_matches / total
  return val_loss, val_acc

In [4]:
def train(args, soft_start=False):

  convnet = AUDetector()
  criterion = nn.BCEWithLogitsLoss()
  optimizer = optim.Adam(convnet.parameters(), args.learn_rate)

  train_losses = []
  valid_losses = []
  best_val_loss = 0

  if soft_start:
    checkpoint = torch.load(args.checkpoint)
    convnet.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    train_losses = checkpoint['train_losses']
    valid_losses = checkpoint['valid_losses']
    best_val_loss = checkpoint['best_val_loss']

  train_set = CelebAU(train=True, args.transform)
  train_loader = DataLoader(train_set, args.batch_size, collate_fn=collate_fn, 
                            shuffle=True, num_workers=args.num_workers)
  val_set = CelebAU(train=False, args.transform)
  val_loader = DataLoader(val_set, args.batch_size, collate_fn=collate_fn,
                          shuffle=True, num_workers=args.num_workers)
  
  start = time.time()
  if torch.cuda.is_available:
    print('Running on GPU.') 
    convnet.cuda()
  else:
    print('Running on CPU.')
  print('Beginning training...')
  for epoch in range(args.epochs):

    convnet.train() # Set to training mode
    losses = []
    for i, data in enumerate(train_loader, 0):

      ###################
      # train the model #
      ###################
      imgs, labels = data
      imgs, labels = imgs.cuda(), labels.cuda()
      # reset parameter gradients to 0
      optimizer.zero_grad()
      outputs = convnet(imgs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      losses.append(loss.data.item())
    
    # Evalutate training metrics on epoch
    avg_loss = np.mean(losses)
    train_losses.append(avg_loss)
    time_elapsed = time.time() - start
    print('Epoch [%d/%d], Loss: %.4f, Time (s): %d' % (
            epoch+1, args.epochs, avg_loss, time_elapsed))
    
    ######################
    # validate the model #
    ######################
    cnn.eval()
    val_loss, val_acc = validation_step(convnet, val_loader, criterion, 
                                        args.batch_size)
    time_elapsed = time.time() - start
    valid_losses.append(val_loss)
    valid_accs.append(val_acc)
    print('Epoch [%d/%d], Val Loss: %.4f, Val Acc: %.1f%%, Time(s): %.2f' % (
        epoch+1, args.epochs, val_loss, val_acc, time_elapsed))
    
    if -val_loss >= best_loss:
      # SAVE MODEL
      best_loss = -val_loss
      checkpoint = {
          'model_state_dict': convnet.state_dict(),
          'optimizer_state_dict': optimizer.state_dict(),
          'train_losses': train_losses,
          'valid_losses': valid_losses,
          'best_val_loss': best_val_loss
      }
      torch.save(checkpoint, args.checkpoint)

  print(f'Best model achieves accuracy: {best_acc:.4f}')

In [None]:
args = AttrDict()
args_dict = {
              'gpu':True,
              'transform': transform 
              'checkpoint':"/content/drive/My Drive/pretraining/checkpoints", 
              'learn_rate':0.3, 
              'batch_size':100, 
              'epochs':35, 
              'num_workers':0,
}
args.update(args_dict)

torch.cuda.empty_cache()
train(args, model)