# Project 1: CelebA Facial Attribute Recognition Challenge

In [None]:
import numpy as np
import os
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from IPython.display import clear_output
from PIL import Image
from torchvision import transforms

cudnn.benchmark = True
np.set_printoptions(edgeitems = 40, linewidth = 200)
torch.set_printoptions(edgeitems = 40, linewidth = 200)

## Hyperparameters

In [None]:
# Dataloader
num_workers = 0
batch_size = 32
test_batch = 200

# Optimizer
lr = 0.1
momentum = 0.9
weight_decay = 0.0001

# Model
crop_size = 320
arch = 'resnest50'

# Focal Loss
alpha = torch.tensor([
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25]]).cuda()
alpha_lfwa = torch.tensor([
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.25, 0.25],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.75, 0.75],
  [0.25, 0.25],
  [0.75, 0.75]]).cuda()
gamma = 2.0

# Training
epochs = 10

## Load Checkpoint (Optional)

In [None]:
resume_checkpoint = arch + '.checkpoint25.pth.tar'
checkpoint = torch.load(resume_checkpoint)
arch = checkpoint['arch']
batch_size = checkpoint['batch_size']
crop_size = checkpoint['crop_size']
alpha = checkpoint['alpha']
gamma = checkpoint['gamma']

## Utils

In [None]:
# Code from https://github.com/pytorch/examples/blob/master/imagenet/main.py
def pil_loader(path):
  # open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835)
  with open(path, 'rb') as f:
    img = Image.open(f)
    return img.convert('RGB')

class AverageMeter(object):
    """Computes and stores the average and current value
       Imported from https://github.com/pytorch/examples/blob/master/imagenet/main.py#L247-L262
    """
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def accuracy(output, target, topk=(1,)):
    """Computes the precision@k for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

## Main

### CelebA and LFW Dataset

In [None]:
# Code from https://github.com/d-li14/face-attribute-prediction/blob/master/celeba.py
class CelebA(data.Dataset):
  def __init__(self, root, ann_file, image_folder, transform=None, target_transform=None):
    images = []
    targets = []

    for line in open(os.path.join(root, 'Anno', ann_file), 'r'):
      sample = line.split()
      if len(sample) != 41:
        raise(RuntimeError("# Annotated face attributes of CelebA dataset should not be different from 40"))
      images.append(sample[0])
      # targets.append([int(i) for i in sample[1:]])
      targets.append([*map(lambda x: 0 if int(x) < 0 else int(x), sample[1:])])
    self.images = [os.path.join(root, image_folder, img) for img in images]
    self.targets = targets
    self.transform = transform
    self.target_transform = target_transform
    self.loader = pil_loader

  def __getitem__(self, index):
    path = self.images[index]
    sample = self.loader(path)
    target = self.targets[index]
    target = torch.LongTensor(target)
    if self.transform is not None:
      sample = self.transform(sample)
    if self.target_transform is not None:
      target = self.target_transform(target)
    return sample, target

  def __len__(self):
    return len(self.images)

class LFWA(data.Dataset):
  def __init__(self, root, ann_file, image_folder, transform=None):
    images = []

    for line in open(os.path.join(root, 'Anno', ann_file), 'r'):
      sample = line.split()
      images.append(sample[0])
    self.images = [os.path.join(root, image_folder, img) for img in images]
    self.transform = transform
    self.loader = pil_loader

  def __getitem__(self, index):
    path = self.images[index]
    sample = self.loader(path)
    if self.transform is not None:
      sample = self.transform(sample)
    return sample, torch.empty(0)

  def __len__(self):
    return len(self.images)

### Linear Classifier

In [None]:
class Classifier(nn.Module):
  def __init__(self, in_features, out_features):
    super(Classifier, self).__init__()
    for i in range(out_features):
      setattr(self, 'classifier' + str(i).zfill(2), nn.Linear(in_features, 2))
    self.num_attributes = out_features

  def forward(self, x):
    x = x.view(x.size(0), -1)
    y = []
    for i in range(self.num_attributes):
      classifier = getattr(self, 'classifier' + str(i).zfill(2))
      y.append(classifier(x))
    return y

### Focal Loss

In [None]:
class FocalLoss(nn.Module):
  def __init__(self, alpha, gamma = 2):
    super(FocalLoss, self).__init__()
    self.alpha = alpha
    self.gamma = gamma

  def forward(self, inputs, targets, j):
    CE_loss = F.cross_entropy(inputs, targets, weight = self.alpha[j])
    pt = torch.exp(-CE_loss)
    F_loss = (1 - pt)**self.gamma * CE_loss
    return F_loss

### Train, Validate, Prediction Routines

In [None]:
def train(train_loader, model, criterion, optimizer):
  model.train()
  losses = [AverageMeter() for _ in range(40)]
  top1 = [AverageMeter() for _ in range(40)]

  print("Batch 1/%d" % len(train_loader))
  for i, (input, target) in enumerate(train_loader):
    input = input.cuda(non_blocking = True)
    target = target.cuda(non_blocking = True)

    output = model(input)

    loss = []
    prec1 = []
    for j in range(len(output)):
      loss.append(criterion(output[j], target[:, j], j))
      prec1.append(accuracy(output[j], target[:, j], topk = (1,)))

      losses[j].update(loss[j].item(), input.size(0))
      top1[j].update(prec1[j][0].item(), input.size(0))

    losses_avg = [losses[k].avg for k in range(len(losses))]
    top1_avg = [top1[k].avg for k in range(len(top1))]
    loss_avg = sum(losses_avg) / len(losses_avg)
    prec1_avg = sum(top1_avg) / len(top1_avg)
    clear_output(wait = True)
    print("Batch %d/%d\nAverage Loss: %.10f\nAverage Precision: %.10f" % (i + 1, len(train_loader), loss_avg, prec1_avg))
    print("Accuracy: %s" % ['%.2f' % top1_avg_j for top1_avg_j in top1_avg])

    optimizer.zero_grad()
    loss_sum = sum(loss)
    loss_sum.backward()
    optimizer.step()

  return loss_avg, prec1_avg

def validate(val_loader, model, criterion):
  model.eval()
  losses = [AverageMeter() for _ in range(40)]
  top1 = [AverageMeter() for _ in range(40)]

  with torch.no_grad():
    print("Batch 1/%d" % len(val_loader))
    for i, (input, target) in enumerate(val_loader):
      input = input.cuda(non_blocking = True)
      target = target.cuda(non_blocking = True)

      output = model(input)

      loss = []
      prec1 = []
      for j in range(len(output)):
        loss.append(criterion(output[j], target[:, j], j))
        prec1.append(accuracy(output[j], target[:, j], topk = (1,)))

        losses[j].update(loss[j].item(), input.size(0))
        top1[j].update(prec1[j][0].item(), input.size(0))

      losses_avg = [losses[k].avg for k in range(len(losses))]
      top1_avg = [top1[k].avg for k in range(len(top1))]
      loss_avg = sum(losses_avg) / len(losses_avg)
      prec1_avg = sum(top1_avg) / len(top1_avg)
      clear_output(wait = True)
      print("Batch %d/%d\nAverage Loss: %.10f\nAverage Accuracy: %.10f" % (i + 1, len(val_loader), loss_avg, prec1_avg))
      print("Accuracy: %s" % ['%.2f' % top1_avg_j for top1_avg_j in top1_avg])

  return loss_avg, prec1_avg

def predict(eval_loader, model, eval_dataset):
  model.eval()
  pred = []

  with torch.no_grad():
    print("Batch 1/%d" % len(eval_loader))
    for i, (input, target) in enumerate(eval_loader):
      input = input.cuda(non_blocking = True)

      output = model(input)

      pred_batch = []
      for j in range(len(output)):
        _, pred_j = output[j].topk(1,1,True,True)
        pred_j[pred_j == 0] = -1
        pred_batch.append(pred_j)
      if i == 0:
        pred = torch.cat(pred_batch, dim = 1).cpu().numpy().astype(np.int8)
      else:
        pred = np.append(pred, torch.cat(pred_batch, dim = 1).cpu().numpy().astype(np.int8), 0)
      clear_output(wait = True)
      print("Batch %d/%d" % (i + 1, len(eval_loader)))

  with open('predictions.txt', 'w') as outFile:
    for i, row in enumerate(pred):
      filename = os.path.basename(eval_dataset.images[i])
      outFile.write(filename + ' ' + ' '.join(map("{:2d}".format, pred[i])) + '\n')
  return pred

## Load Data

In [None]:
# ImageNet mean and std
normalize = transforms.Normalize(
  mean = [0.485, 0.456, 0.406],
  std = [0.229, 0.224, 0.225]
)

train_dataset = CelebA(
  '',
  'train_40_att_list.txt',
  'Img',
  transforms.Compose([
    transforms.RandomResizedCrop(crop_size, scale=(0.5, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness = (0.6,1.4), hue = (-0.5,0.5), saturation = (0.6,1.4)),
    transforms.ToTensor(),
    normalize
  ]))

val_dataset = CelebA(
  '',
  'val_40_att_list.txt',
  'Img',
  transforms.Compose([
    transforms.CenterCrop(178),
    transforms.Resize(crop_size),
    transforms.ToTensor(),
    normalize
  ]))

test_dataset = CelebA(
  '',
  'test_40_att_list.txt',
  'Img',
  transforms.Compose([
    transforms.CenterCrop(178),
    transforms.Resize(crop_size),
    transforms.ToTensor(),
    normalize
  ]))

eval_dataset = LFWA(
  '',
  'testset.txt',
  'testset',
  transforms.Compose([
    transforms.CenterCrop(178),
    transforms.Resize(crop_size),
    transforms.ToTensor(),
    normalize
  ]))

train_loader = torch.utils.data.DataLoader(
  train_dataset, batch_size = batch_size, shuffle = True,
  num_workers = num_workers, pin_memory=True)

val_loader = torch.utils.data.DataLoader(
  val_dataset,
  batch_size = test_batch, shuffle = False,
  num_workers = num_workers, pin_memory = True)

test_loader = torch.utils.data.DataLoader(
  test_dataset,
  batch_size = test_batch, shuffle = False,
  num_workers = num_workers, pin_memory = True)

eval_loader = torch.utils.data.DataLoader(
  eval_dataset,
  batch_size = test_batch, shuffle = False,
  num_workers = num_workers, pin_memory = True)

## Load Pre-Trained ResNeSt

In [None]:
model = torch.hub.load('zhanghang1989/ResNeSt', arch, pretrained = True)
model.fc = Classifier(model.fc.in_features, 40)
model.cuda()
criterion = FocalLoss(alpha, gamma).cuda()
optimizer = torch.optim.SGD(model.parameters(), lr,
                                momentum = momentum,
                                weight_decay = weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience = 1)

## Load States (Optional)

In [None]:
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
scheduler.load_state_dict(checkpoint['scheduler'])

## Train

In [None]:
for epoch in range(epochs):
  train_loss, train_acc = train(train_loader, model, criterion, optimizer)
  val_loss, val_acc = validate(val_loader, model, criterion)
  scheduler.step(val_loss)
  torch.save({
  'arch': arch,
  'batch_size': batch_size,
  'crop_size': crop_size,
  'train_loss': train_loss,
  'train_acc': train_acc,
  'val_loss': val_loss,
  'val_acc': val_acc,
  'model': model.state_dict(),
  'optimizer': optimizer.state_dict(),
  'scheduler': scheduler.state_dict(),
  'alpha': alpha,
  'gamma': gamma
}, arch + '.checkpoint' + str(scheduler.state_dict()['last_epoch']) + '.pth.tar')

## Test

In [None]:
test_loss, test_acc = validate(test_loader, model, criterion)

## Predict

In [None]:
pred = predict(eval_loader, model, eval_dataset)

# Misc. Utils

## Freeze Model

In [None]:
for child in model.children():
  for param in child.parameters():
    param.requires_grad = False

for param in model.fc.parameters():
  param.requires_grad = True

## Compare Checkpoints

In [None]:
last_checkpoint = None
for i in range(1,20+1):
  checkpoint = torch.load('resnest50.checkpoint' + str(i) + '.pth.tar')
  display(i)
  display('Train Loss: ' + str(checkpoint['train_loss']))
  display('Train Acc: ' + str(checkpoint['train_acc']))
  display('Val Loss: ' + str(checkpoint['val_loss']))
  display('Val Acc: ' + str(checkpoint['val_acc']))
  display('Margin: ' + str(abs(checkpoint['train_acc']-checkpoint['val_acc'])))
  if last_checkpoint != None:
    display('Val Acc Delta: ' + str(checkpoint['val_acc']-last_checkpoint['val_acc']))
  print('')
  last_checkpoint = checkpoint