In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SubsetRandomSampler

import torchvision.datasets as datasets
import torchvision.transforms as transforms

from torchviz import make_dot, make_dot_from_trace

from torchsummary import summary

from PIL import Image

import numpy as np
import time
from os.path import join, exists

from FaceDetector_class import *

In [2]:
# Hyperparameters
cfg = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}

labels = {
    'Angry': 0,
    'Disgust': 1,
    'Fear': 2,
    'Happy': 3,
    'Sad': 4,
    'Surprise': 5,
    'Neutral': 6
}

CIFAR_MEAN = [0.49139968, 0.48215827, 0.44653124]
CIFAR_STD = [0.2023, 0.1994, 0.2010]

EPOCHS = 20
BATCH_SIZE = 32
PRINT_FREQ = 100
TRAIN_NUMS = 30016

CUDA = True

PATH_TO_SAVE_DATA = "./"

TRAIN_PATH = "./dataset/fer2013/train/"
VAL_PATH = "./dataset/fer2013/val/"
TEST_PATH = "./dataset/fer2013/test/"

DATASET_PATH = "./dataset/fer2013/"
MODEL_PATH = "./model/model.pth"

In [3]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.Grayscale(),
        transforms.ToTensor()
    ]),
    'test': transforms.Compose([
        transforms.Grayscale(),
        transforms.ToTensor()
    ])
}

image_datasets = {x: datasets.ImageFolder(join(DATASET_PATH, x), data_transforms[x]) for x in ['train', 'test']}
n_train = len(image_datasets['train'])
dataloader = {}
dataloader['train'] = DataLoader(image_datasets['train'], batch_size=BATCH_SIZE, sampler=SubsetRandomSampler(range(TRAIN_NUMS)), num_workers=4)
dataloader['val'] = DataLoader(image_datasets['train'], batch_size=BATCH_SIZE, sampler=SubsetRandomSampler(range(TRAIN_NUMS, n_train)), num_workers=4)
dataloader['test'] = DataLoader(image_datasets['test'], batch_size=BATCH_SIZE, num_workers=4)
# dataloader = {x: DataLoader(image_datasets[x], batch_size=32, shuffle=True, num_workers=4) for x in ['train', 'test']}


In [9]:
# CUDA
# testing CUDA is available or not
if CUDA:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
    device = torch.device("cpu")
print(device)

cuda


In [6]:
# Training class
class Trainer:
    def __init__(self, criterion, optimizer, device):
        self.criterion = criterion
        self.optimizer = optimizer
        
        self.device = device
        
    def train_loop(self, model, train_loader, val_loader):
        for epoch in range(EPOCHS):
            print("---------------- Epoch {} ----------------".format(epoch+1))
            self._training_step(model, train_loader, epoch)
            
            self._validate(model, val_loader, epoch)
    
    def test(self, model, test_loader):
            print("---------------- Testing ----------------")
            self._validate(model, test_loader, 0, state="Testing")
            
    def _training_step(self, model, loader, epoch):
        model.train()
        
        for step, (X, y) in enumerate(loader):
            X, y = X.to(self.device), y.to(self.device)
            N = X.shape[0]
            
            self.optimizer.zero_grad()
            outs = model(X)
            loss = self.criterion(outs, y)
            
            if step >= 0 and (step % PRINT_FREQ == 0):
                self._state_logging(outs, y, loss, step, epoch, "Training")
            
            loss.backward()
            self.optimizer.step()
        
        scheduler.step()
            
    def _validate(self, model, loader, epoch, state="Validate"):
        model.eval()
        outs_list = []
        loss_list = []
        y_list = []
        
        with torch.no_grad():
            for step, (X, y) in enumerate(loader):
                X, y = X.to(self.device), y.to(self.device)
                N = X.shape[0]
                
                outs = model(X)
                loss = self.criterion(outs, y)
                
                y_list.append(y)
                outs_list.append(outs)
                loss_list.append(loss)
            
            y = torch.cat(y_list)
            outs = torch.cat(outs_list)
            loss = torch.mean(torch.stack(loss_list), dim=0)
            self._state_logging(outs, y, loss, step, epoch, state)
                
                
    def _state_logging(self, outs, y, loss, step, epoch, state):
        acc = self._accuracy(outs, y)
        print("[{:3d}/{}] {} Step {:03d} Loss {:.3f} Acc {:.3f}".format(epoch+1, EPOCHS, state, step, loss, acc))
            
    def _accuracy(self, output, target):
        batch_size = target.size(0)

        pred = output.argmax(1)
        correct = pred.eq(target)
        acc = correct.float().sum(0) / batch_size

        return acc

In [3]:
class VGG(nn.Module):
    def __init__(self, num_classes=7, model_type='VGG11'):
        super(VGG, self).__init__()
        self.features = self._make_layers(cfg[model_type])
        self.feature_map = []
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Linear(512 * 1 * 1, 7),
#             nn.ReLU(True),
#             nn.Dropout(),
#             nn.Linear(4096, 4096),
#             nn.ReLU(True),
#             nn.Dropout(),
#             nn.Linear(4096, num_classes),
        )
    
    def forward(self, x):
        out = None
    
        out = self.features(x)
        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        out = self.classifier(out)
        
        return out
    
    def _make_layers(self, cfg):
        layers = []
        in_channels = 1
        for x in cfg:
            if x == 'M':
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            else:
                layers += [nn.Conv2d(in_channels, x, kernel_size=3, padding=1),
                           nn.BatchNorm2d(x),
                           nn.ReLU(inplace=True)]
                in_channels = x
        return nn.Sequential(*layers)

In [8]:
model = VGG(model_type='VGG11')
model.cuda()
summary(model, (1, 48, 48))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 48, 48]             640
       BatchNorm2d-2           [-1, 64, 48, 48]             128
              ReLU-3           [-1, 64, 48, 48]               0
         MaxPool2d-4           [-1, 64, 24, 24]               0
            Conv2d-5          [-1, 128, 24, 24]          73,856
       BatchNorm2d-6          [-1, 128, 24, 24]             256
              ReLU-7          [-1, 128, 24, 24]               0
         MaxPool2d-8          [-1, 128, 12, 12]               0
            Conv2d-9          [-1, 256, 12, 12]         295,168
      BatchNorm2d-10          [-1, 256, 12, 12]             512
             ReLU-11          [-1, 256, 12, 12]               0
           Conv2d-12          [-1, 256, 12, 12]         590,080
      BatchNorm2d-13          [-1, 256, 12, 12]             512
             ReLU-14          [-1, 256,

In [9]:
# define loss, optimizer and scheduler
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(),lr=1e-2, momentum=0.9, weight_decay=1e-3) # weight_decay can be smaller
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

In [10]:
# start training
trainer = Trainer(criterion, optimizer, device)
trainer.train_loop(model, dataloader['train'], dataloader['val'])
trainer.test(model, dataloader['test'])

---------------- Epoch 1 ----------------
[  1/20] Training Step 000 Loss 2.292 Acc 0.094
[  1/20] Training Step 100 Loss 1.888 Acc 0.188
[  1/20] Training Step 200 Loss 1.734 Acc 0.344
[  1/20] Training Step 300 Loss 1.901 Acc 0.125
[  1/20] Training Step 400 Loss 1.862 Acc 0.250
[  1/20] Training Step 500 Loss 1.786 Acc 0.250
[  1/20] Training Step 600 Loss 1.726 Acc 0.312
[  1/20] Training Step 700 Loss 1.645 Acc 0.281
[  1/20] Training Step 800 Loss 1.480 Acc 0.438
[  1/20] Training Step 900 Loss 1.291 Acc 0.562
[  1/20] Validate Step 071 Loss 2.214 Acc 0.090
---------------- Epoch 2 ----------------
[  2/20] Training Step 000 Loss 1.614 Acc 0.344
[  2/20] Training Step 100 Loss 1.394 Acc 0.406
[  2/20] Training Step 200 Loss 1.516 Acc 0.406
[  2/20] Training Step 300 Loss 1.392 Acc 0.438
[  2/20] Training Step 400 Loss 1.427 Acc 0.438
[  2/20] Training Step 500 Loss 1.531 Acc 0.375
[  2/20] Training Step 600 Loss 1.163 Acc 0.500
[  2/20] Training Step 700 Loss 1.343 Acc 0.438
[  2

[ 15/20] Training Step 400 Loss 0.256 Acc 0.875
[ 15/20] Training Step 500 Loss 0.345 Acc 0.875
[ 15/20] Training Step 600 Loss 0.395 Acc 0.875
[ 15/20] Training Step 700 Loss 0.287 Acc 0.906
[ 15/20] Training Step 800 Loss 0.246 Acc 0.938
[ 15/20] Training Step 900 Loss 0.417 Acc 0.844
[ 15/20] Validate Step 071 Loss 1.468 Acc 0.571
---------------- Epoch 16 ----------------
[ 16/20] Training Step 000 Loss 0.394 Acc 0.844
[ 16/20] Training Step 100 Loss 0.158 Acc 0.938
[ 16/20] Training Step 200 Loss 0.395 Acc 0.844
[ 16/20] Training Step 300 Loss 0.256 Acc 0.906
[ 16/20] Training Step 400 Loss 0.398 Acc 0.844
[ 16/20] Training Step 500 Loss 0.146 Acc 0.938
[ 16/20] Training Step 600 Loss 0.213 Acc 0.938
[ 16/20] Training Step 700 Loss 0.577 Acc 0.750
[ 16/20] Training Step 800 Loss 0.151 Acc 0.969
[ 16/20] Training Step 900 Loss 0.326 Acc 0.844
[ 16/20] Validate Step 071 Loss 1.658 Acc 0.530
---------------- Epoch 17 ----------------
[ 17/20] Training Step 000 Loss 0.267 Acc 0.938
[ 

In [12]:
torch.save(model.state_dict(), MODEL_PATH)

In [4]:
new_model = VGG(model_type='VGG11')
new_model.load_state_dict(torch.load(MODEL_PATH))
new_model.eval()
new_model.cuda()

VGG(
  (features): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): ReLU(inplace=True)
    (14): MaxPool2d(ke

In [5]:
import cv2
from FaceDetector_class import *

In [12]:
face_detector = FaceDetector()
img = cv2.imread('test2.jpg')
faces = face_detector.FaceDetect(img)
print(len(faces))

crop_images = []
if len(faces) > 0:
    for (x,y,width,height) in faces:
        #cv2.rectangle(img, (x,y), (x + width,y + height), (255,0,0), 3)
        crop_images.append(img[y:y + height, x:x+width])
print(len(crop_images))
# for crop_image in crop_images:
#     cv2.imshow('Face Detection',crop_image)
#     cv2.waitKey(0)
# cv2.destroyAllWindows()

3
3


In [10]:
# new_img = Image.open('00000.jpg').convert('LA')
# new_img = loader(new_img).unsqueeze(1) # why squeeze 1?
# print(new_img.shape)

# print(type(crop_images[1]))
img = crop_images[0]
# cv2.imshow('Face Detection',crop_image)
# cv2.waitKey(0)
img = Image.fromarray(img)
img = img.convert('LA')
img = img.resize((48, 48))
loader = transforms.Compose([
    transforms.ToTensor()])
img = loader(img).unsqueeze(1)
print(img.shape)
img = img.to(device)
print(type(img))
out = new_model(img)

torch.Size([2, 1, 48, 48])
<class 'torch.Tensor'>


In [11]:
print(out)

tensor([[ 2.2418, -9.1218, -6.8590, 19.2342, -2.8608, -2.1292, -0.2990],
        [-0.5662, -4.4643,  1.7400,  0.1814,  0.4606,  0.1083,  2.4321]],
       device='cuda:0', grad_fn=<AddmmBackward>)
