In [8]:
!kaggle competitions download -c challenges-in-representation-learning-facial-expression-recognition-challenge

Downloading train.csv.zip to /content
 85% 66.0M/77.3M [00:00<00:00, 59.7MB/s]
100% 77.3M/77.3M [00:00<00:00, 93.1MB/s]
Downloading test.csv.zip to /content
 47% 9.00M/19.3M [00:00<00:00, 39.7MB/s]
100% 19.3M/19.3M [00:00<00:00, 64.5MB/s]
Downloading fer2013.tar.gz to /content
 91% 84.0M/92.0M [00:00<00:00, 90.7MB/s]
100% 92.0M/92.0M [00:00<00:00, 109MB/s] 
Downloading example_submission.csv to /content
  0% 0.00/7.01k [00:00<?, ?B/s]
100% 7.01k/7.01k [00:00<00:00, 7.40MB/s]
Downloading icml_face_data.csv.zip to /content
 94% 91.0M/96.6M [00:01<00:00, 57.7MB/s]
100% 96.6M/96.6M [00:01<00:00, 82.6MB/s]


In [9]:
!tar -xf /content/fer2013.tar.gz

In [19]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import numpy as np
import csv
from PIL import Image
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
from torch.utils.mobile_optimizer import optimize_for_mobile

device = torch.device('cuda')
shape = (44, 44)

In [20]:
class SeparableConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, padding=0, dilation=1, bias=False):
        super(SeparableConv2d, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size, stride, padding, dilation, groups=in_channels, bias=bias)
        self.pointwise = nn.Conv2d(in_channels, out_channels, 1, 1, 0, 1, 1, bias=bias)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x

In [21]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channeld, out_channels):
        super(ResidualBlock, self).__init__()
        self.residual_conv = nn.Conv2d(in_channels=in_channeld, out_channels=out_channels, kernel_size=1, stride=2, bias=False)
        self.residual_bn = nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        self.sepConv1 = SeparableConv2d(in_channels=in_channeld, out_channels=out_channels, kernel_size=3, bias=False, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        self.relu = nn.ReLU()
        self.sepConv2 = SeparableConv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        self.maxp = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        res = self.residual_conv(x)
        res = self.residual_bn(res)
        x = self.sepConv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.sepConv2(x)
        x = self.bn2(x)
        x = self.maxp(x)
        return res + x

In [22]:
class Model(nn.Module):
    def __init__(self, num_classes):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, stride=1, bias=False)
        self.bn1 = nn.BatchNorm2d(8, affine=True, momentum=0.99, eps=1e-3)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=8, kernel_size=3, stride=1, bias=False)
        self.bn2 = nn.BatchNorm2d(8, momentum=0.99, eps=1e-3)
        self.relu2 = nn.ReLU()
        self.module1 = ResidualBlock(in_channeld=8, out_channels=16)
        self.module2 = ResidualBlock(in_channeld=16, out_channels=32)
        self.module3 = ResidualBlock(in_channeld=32, out_channels=64)
        self.module4 = ResidualBlock(in_channeld=64, out_channels=128)
        self.last_conv = nn.Conv2d(in_channels=128, out_channels=num_classes, kernel_size=3, padding=1)
        self.avgp = nn.AdaptiveAvgPool2d((1, 1))

    def forward(self, input):
        x = input
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.module1(x)
        x = self.module2(x)
        x = self.module3(x)
        x = self.module4(x)
        x = self.last_conv(x)
        x = self.avgp(x)
        x = x.view((x.shape[0], -1))
        return x

In [23]:
class DataSet(torch.utils.data.Dataset):
    def __init__(self, transform=None, images=None, emotions=None):
        self.transform = transform
        self.images = images
        self.emotions = emotions

    def __getitem__(self, index):
        image = self.images[index]
        emotion = self.emotions[index]
        if self.transform is not None:
            image = self.transform(image)
        return image, emotion

    def __len__(self):
        return len(self.images)

class DataSetFactory:
    def __init__(self):
        images = []
        emotions = []
        validate_images = []
        validate_emotions = []
        with open('/content/fer2013/fer2013.csv', 'r') as csvin:
            data = csv.reader(csvin)
            next(data)
            for row in data:
                face = [int(pixel) for pixel in row[1].split()]
                face = np.asarray(face).reshape(48, 48)
                face = face.astype('uint8')
                if row[-1] == 'Training':
                    emotions.append(int(row[0]))
                    images.append(Image.fromarray(face))
                else:
                    validate_emotions.append(int(row[0]))
                    validate_images.append(Image.fromarray(face))
        train_transform = transforms.Compose([
            transforms.RandomCrop(shape[0]),
            transforms.RandomHorizontalFlip(),
            ToTensor(),
        ])
        val_transform = transforms.Compose([
            transforms.CenterCrop(shape[0]),
            ToTensor(),
        ])
        self.train = DataSet(transform=train_transform, images=images, emotions=emotions)
        self.validate = DataSet(transform=val_transform, images=validate_images, emotions=validate_emotions)

In [38]:
def save_model():
    traced = torch.jit.trace(network, torch.rand(1, 1, shape[0], shape[1]))
    optimize_for_mobile(traced)
    traced.save("model.pt")

In [42]:
batch_size = 128
lr = 0.01
epochs = 300
learning_rate_decay_start = 80
learning_rate_decay_every = 5
learning_rate_decay_rate = 0.9
classes = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
network = Model(num_classes=len(classes)).to(device)
optimizer = torch.optim.SGD(network.parameters(), lr=lr, momentum=0.9, weight_decay=5e-3)
criterion = nn.CrossEntropyLoss()
factory = DataSetFactory()
train_loader = DataLoader(factory.train, batch_size=batch_size, shuffle=True, num_workers=1)
validate_loader = DataLoader(factory.validate, batch_size=batch_size, shuffle=True, num_workers=1)
min_validation_loss = 10000

In [43]:
for epoch in range(epochs):
    network.train()
    total = 0
    correct = 0
    total_train_loss = 0
    if epoch > learning_rate_decay_start and learning_rate_decay_start >= 0:
        frac = (epoch - learning_rate_decay_start) // learning_rate_decay_every
        decay_factor = learning_rate_decay_rate ** frac
        current_lr = lr * decay_factor
        for group in optimizer.param_groups:
            group['lr'] = current_lr
    else:
        current_lr = lr
    print(f'learning_rate={str(current_lr)}')
    for i, (x_train, y_train) in enumerate(train_loader):
        optimizer.zero_grad()
        x_train = x_train.to(device)
        y_train = y_train.to(device)
        y_predicted = network(x_train)
        loss = criterion(y_predicted, y_train)
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(y_predicted.data, 1)
        total_train_loss += loss.data
        total += y_train.size(0)
        correct += predicted.eq(y_train.data).sum()
    accuracy = 100. * float(correct) / total
    print(f'epoch={epoch + 1} total_train_loss={total_train_loss / (i + 1)} accuracy={accuracy}')
    network.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        total_validation_loss = 0
        for j, (x_val, y_val) in enumerate(validate_loader):
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            y_val_predicted = network(x_val)
            val_loss = criterion(y_val_predicted, y_val)
            _, predicted = torch.max(y_val_predicted.data, 1)
            total_validation_loss += val_loss.data
            total += y_val.size(0)
            correct += predicted.eq(y_val.data).sum()
        accuracy = 100. * float(correct) / total
        if total_validation_loss <= min_validation_loss:
            if epoch >= 10:
                print('saving model...')
                network.to('cpu')
                save_model()
                network.to('cuda')
            min_validation_loss = total_validation_loss
        print(f'epoch={epoch + 1} total_validation_loss={total_validation_loss / (j + 1)} accuracy={accuracy}')

learning_rate=0.01
epoch=1 total_train_loss=1.7412625551223755 accuracy=30.861402347695844
epoch=1 total_validation_loss=1.6147561073303223 accuracy=37.55920869322931
learning_rate=0.01
epoch=2 total_train_loss=1.4860056638717651 accuracy=42.68347904838204
epoch=2 total_validation_loss=1.3857476711273193 accuracy=46.851490665923656
learning_rate=0.01
epoch=3 total_train_loss=1.3754483461380005 accuracy=47.35448813960779
epoch=3 total_validation_loss=1.384629726409912 accuracy=47.40874895514071
learning_rate=0.01
epoch=4 total_train_loss=1.295423150062561 accuracy=50.447594830889265
epoch=4 total_validation_loss=1.2947362661361694 accuracy=50.57118974644748
learning_rate=0.01
epoch=5 total_train_loss=1.2493195533752441 accuracy=52.4469678498032
epoch=5 total_validation_loss=1.225332260131836 accuracy=52.911674561159096
learning_rate=0.01
epoch=6 total_train_loss=1.2156296968460083 accuracy=53.73924553275976
epoch=6 total_validation_loss=1.2243950366973877 accuracy=53.719699080523824
lea