In [None]:
# default_exp models

In [None]:
#hide
%load_ext autoreload
%autoreload 2

# Models

>Description

In [None]:
#export
#imports
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from axa_hw2p2.losses import CenterLoss

In [None]:
#export
class _BottleNeck(nn.Module):
    """
    """
    def __init__(self,
                 n_input_ch: int,
                 n_output_ch: int,
                 stride: int,
                 exp_fct_t: int):
        """
        """
        super(_BottleNeck, self).__init__()
        self.n_input_ch = n_input_ch
        self.n_output_ch = n_output_ch
        self.stride = stride

        # Expansion block: 
        # (batch_size, height, width, n_input_ch) -> 
        # (batch_size, height, width, exp_fct_t*n_input_ch)
        # kernel size: 1, stride: 1, padding: 0, groups: 1.
        exp_block = []
        exp_block.append(nn.Conv2d(in_channels=n_input_ch,
                                   out_channels=exp_fct_t*n_input_ch,
                                   kernel_size=1,
                                   stride=1,
                                   padding=0,
                                   groups=1,
                                   bias=False))
        exp_block.append(nn.BatchNorm2d(num_features=exp_fct_t*n_input_ch))
        exp_block.append(nn.ReLU6())

        # Depthwise convolutional block: 
        # (batch_size, height, width, exp_fct_t*n_input_ch) -> 
        # (batch_size, height/stride, width/stride, exp_fct_t*n_input_ch)
        # kernel size: 3, stride: stride, 
        # padding: 1, groups: exp_fct_t*n_input_ch.
        dw_conv_block = []
        dw_conv_block.append(nn.Conv2d(in_channels=exp_fct_t*n_input_ch,
                                       out_channels=exp_fct_t*n_input_ch,
                                       kernel_size=3,
                                       stride=stride,
                                       padding=1,
                                       groups=exp_fct_t*n_input_ch, 
                                       bias=False))
        dw_conv_block.append(nn.BatchNorm2d(num_features=exp_fct_t*n_input_ch))
        dw_conv_block.append(nn.ReLU6())

        # Depthwise convolutional block: 
        # (batch_size, height, width, exp_fct_t*n_input_ch) -> 
        # (batch_size, height/stride, width/stride, n_output_ch)
        # kernel size: 1, stride: 1, padding: 0, groups: 1.
        proj_block = []
        proj_block.append(nn.Conv2d(in_channels=exp_fct_t*n_input_ch,
                                    out_channels=n_output_ch,
                                    kernel_size=1,
                                    stride=1,
                                    padding=0,
                                    groups=1, 
                                    bias=False))
        proj_block.append(nn.BatchNorm2d(num_features=n_output_ch))

        self.block = exp_block + dw_conv_block + proj_block
        self.block = nn.Sequential(*self.block)

    def forward(self, x):
        """
        """
        if self.stride == 1 and self.n_input_ch == self.n_output_ch:
            return x + self.block(x)
        else:
            return self.block(x)
        



In [None]:
#export
class _MobileNetV2(nn.Module):
    """
    """
    def __init__(self,
                 n_in_ch_bn: int,
                 ls_out_ch_bn: list,
                 ls_n_rep_bn: list,
                 ls_stride_bn: list,
                 ls_exp_fct_t_bn: list,
                 n_embeddings: int,
                 n_classes: int):
        
        super(_MobileNetV2, self).__init__()

        assert len(ls_out_ch_bn) == len(ls_n_rep_bn)
        assert len(ls_n_rep_bn) == len(ls_stride_bn)
        assert len(ls_stride_bn) == len(ls_exp_fct_t_bn)

        # Initial fully convolution block
        # (batch_size, 64, 64, 3) ->
        # (batch_size, 64, 64, n_in_ch_bn)
        # kernel size: 1, stride: 1, padding: 0, groups: 1.
        # (stride is 1 instead of two because images are small (64x64))
        block1 = []
        block1.append(nn.Conv2d(in_channels=3,
                                out_channels=n_in_ch_bn,
                                kernel_size=3,
                                stride=1,
                                padding=0,
                                groups=1,
                                bias=False))
        block1.append(nn.BatchNorm2d(n_in_ch_bn))
        block1.append(nn.ReLU6())

        # Bottlenecks
        bottlenecks = []
        n_input_ch = n_in_ch_bn

        for i in range(len(ls_out_ch_bn)):
            
            c = ls_out_ch_bn[i]
            n = ls_n_rep_bn[i]
            s = ls_stride_bn[i]
            t = ls_exp_fct_t_bn[i]

            for j in range(n):
                
                if j == 0: stride = 1
                else: stride = s
                bottlenecks.append(_BottleNeck(n_input_ch=n_input_ch,
                                               n_output_ch=c,
                                               stride=stride,
                                               exp_fct_t=t))
                n_input_ch = c
        
        # Last 1x1 convolution block
        blockn = []
        blockn.append(nn.Conv2d(in_channels=n_input_ch,
                                out_channels=n_embeddings,
                                kernel_size=1,
                                stride=1,
                                padding=0,
                                groups=1,
                                bias=False))
        blockn.append(nn.BatchNorm2d(n_embeddings))
        blockn.append(nn.ReLU6())

        self.net = block1 + bottlenecks + blockn
        self.net = nn.Sequential(*self.net)
        self.classifier = nn.Linear(in_features=n_embeddings, 
                                    out_features=n_classes)

    def forward(self, x):
        """
        """
        x = self.net(x)
        x = nn.adaptive_avg_pool2d(x, 1)
        embeddings = x.view(1, -1)
        cl_output = self.classifier(x)
        
        return embeddings, cl_output






In [None]:
# Standard MobileNetV2 architecture

n_in_ch_bn = 32
ls_out_ch_bn = [16, 24, 32, 64, 96, 160, 320]
ls_n_rep_bn = [1, 2, 3, 4, 3, 3, 1]
ls_stride_bn = [1, 2, 2, 2, 1, 2, 1]
ls_exp_fct_t_bn = [1, 6, 6, 6, 6, 6, 6]
n_embeddings = 1280
n_classes = 4000

model1 = _MobileNetV2(n_in_ch_bn=n_in_ch_bn,
                      ls_out_ch_bn=ls_out_ch_bn,
                      ls_n_rep_bn=ls_n_rep_bn,
                      ls_stride_bn=ls_stride_bn,
                      ls_exp_fct_t_bn=ls_exp_fct_t_bn,
                      n_embeddings=n_embeddings,
                      n_classes=n_classes)

print(model1.net)

Sequential(
  (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), bias=False)
  (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU6()
  (3): _BottleNeck(
    (block): Sequential(
      (0): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU6()
      (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
      (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU6()
      (6): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (7): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (4): _BottleNeck(
    (block): Sequential(
      (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=Tru

In [None]:
#export

# n_in_ch_bn: int,
#                  ls_out_ch_bn: list,
#                  ls_n_rep_bn: list,
#                  ls_stride_bn: list,
#                  ls_exp_fct_t_bn: list,
#                  nembeddings: int,
#                  n_classes: int
class MobileNetV2():
    """
    """
    def __init__(self,
                 n_in_ch_bn: int,
                 ls_out_ch_bn: list,
                 ls_n_rep_bn: list,
                 ls_stride_bn: list,
                 ls_exp_fct_t_bn: list,
                 nembeddings: int,
                 n_classes: int,
                 lr: float,
                 lr_decay: float,
                 lr_cl: float,
                 alpha_cl: float,
                 n_lr_decay_steps: int,
                 n_epochs: int,
                 eval_steps: int):

        # Architecture parameters
        self.n_in_ch_bn = n_in_ch_bn
        self.ls_out_ch_bn = ls_out_ch_bn
        self.ls_n_rep_bn = ls_n_rep_bn
        self.ls_stride_bn = ls_stride_bn
        self.ls_exp_fct_t_bn = ls_exp_fct_t_bn
        self.n_embeddings = n_embeddings
        self.n_classes = n_classes

        # Optimization parameters
        self.lr = lr
        self.lr_decay = lr_decay
        self.n_lr_decay_steps = n_lr_decay_steps
        self.lr_cl = lr_cl
        self.alpha_cl = alpha_cl
        self.n_epochs = n_epochs
        self.eval_steps = eval_steps

        self.model = _MobileNetV2(n_in_ch_bn=n_in_ch_bn,
                                  ls_out_ch_bn=ls_out_ch_bn,
                                  ls_n_rep_bn=ls_n_rep_bn,
                                  ls_stride_bn=ls_stride_bn,
                                  ls_exp_fct_t_bn=ls_exp_fct_t_bn,
                                  n_embeddings=n_embeddings,
                                  n_classes=n_classes)

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def fit(self, train_loader, val_loader):

        print("="*30 + 'Start Fitting' + "="*30)
        self.model.to(self.device)
        self.model.train()
        
        cross_entroypy_loss_f = nn.CrossEntropyLoss()
        center_loss_f = CenterLoss(num_classes=n_classes,
                                 feat_dim=n_embeddings,
                                 use_gpu=torch.cuda.is_available())
    
        optimizer = Adam(self.model.parameters(), 
                         lr=self.lr, 
                         weight_decay=0.0005)
        optimizer_centerloss = Adam(center_loss_f.parameters(),
                                    lr=lr_cl)

        scheduler = StepLR(optimizer=optimizer, 
                           step_size=self.n_epochs//self.n_lr_decay_steps,
                           gamma=self.lr_decay)
        
        self.train_loss = -1
        self.val_loss = -1
        self.trajectories = {'epoch': [],
                             'train_loss': [],
                             'val_loss': []}

        for epoch in range(self.n_epochs):
            
            train_loss = 0

            for batch_idx, (img, label) in enumerate(train_loader):
                
                img = img.to(self.device)
                label = label.to(self.device)
                
                embeddings, cl_output = self.model(img)
                loss = alpha_cl * center_loss_f(cl_output, label) + \
                       cross_entroypy_loss_f(cl_output, label)

                optimizer.zero_grad()
                optimizer_centerloss.zero_grad()
                loss.backward()
                for p in center_loss_f.parameters():
                    p.grad.data *= (1./alpha_cl)

                optimizer.step()
                optimizer_centerloss.step()
                scheduler.step()

                train_loss += loss.item()
            
            train_loss /= len(train_loader)         

            if epoch % self.eval_steps == 0:
                val_loss, val_accuracy = self.evaluate_performance(val_loader)
        
                self.trajectories['epoch'].append(epoch)
                self.trajectories['train_loss'].append(train_loss)
                self.trajectories['val_loss'].append(val_loss)

                display_str = f'epoch: {epoch} '
                display_str += f'train_loss: {np.round(train_loss,4)} '
                display_str += f'val_loss: {np.round(val_loss,4)} '
                display_str += f'val_accuracy: {np.round(val_accuracy,4):.2%}'
                print(display_str)

                if self.val_loss > val_loss: self.val_loss = val_loss
                if self.train_loss > train_loss: self.train_loss = train_loss
        
        print("="*72+"\n")


    def evaluate_performance(self, val_loader):

        loss_function = nn.CrossEntropyLoss()
        self.model.to(device)
        self.model.eval()

        val_loss = 0.0
        total_predictions = 0.0
        correct_predictions = 0.0
        
        with torch.no_grad():
            for batch_idx, (img, label) in enumerate(val_loader):
                
                img = img.to(device)
                label = label.to(device)
                outputs = self.model(img)
                loss = loss_function(outputs, label).detach()
                val_loss += loss.item()

                predicted = torch.argmax(outputs, 1)
                total_predictions += img.size(0)
                correct_predictions += (predicted == label).sum().item()

        val_loss /= len(val_loader) 
        acc = correct_predictions/total_predictions
        
        return val_loss, acc


In [None]:
from axa_hw2p2.datasets import FaceClassificationDataset, FaceVerificationDataset
import os
import numpy as np
import torch
from torch.utils.data import Dataset
import torchvision
from PIL import Image
import pandas as pd
from torch.utils.data import DataLoader


In [None]:
np.random.seed(1)
sample = np.array(range(20))
train_dataset = FaceClassificationDataset(sample, mode='train')
val_dataset = FaceClassificationDataset(sample, mode='val')

In [None]:
# Architecture parameters
mc = {}
mc['n_ch_input'] = 3
mc['n_classes'] = len(np.unique([t[1] for t in train_dataset]))

# Optimization and regularization parameters
mc['batch_size'] = 1
mc['lr'] = 0.001
mc['lr_decay'] = 1
mc['n_lr_decay_steps'] = 2
mc['n_epochs'] = 16
mc['eval_steps'] = 4

print(77*'=')
print(pd.Series(mc))
print(77*'=')

In [None]:
model = FaceClassificationCNN(n_ch_input=mc['n_ch_input'],
                              n_classes=mc['n_classes'],
                              lr=mc['lr'],
                              lr_decay=float(mc['lr_decay']),
                              n_lr_decay_steps=int(mc['n_lr_decay_steps']),
                              n_epochs=mc['n_epochs'],
                              eval_steps=mc['eval_steps'])
print(model.model)

In [None]:
train_loader = DataLoader(train_dataset, 
                          shuffle=True, 
                          batch_size=mc['batch_size'], 
                          drop_last=True)
val_loader = DataLoader(val_dataset, 
                        shuffle=False, 
                        batch_size=mc['batch_size'], 
                        drop_last=True)



In [None]:
model.fit(train_loader=train_loader, val_loader=val_loader)