In [None]:
import torch
import torchvision

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models import mobilenet_v2
from torch.optim import lr_scheduler

from PIL import Image

import time
from glob import glob
from copy import deepcopy

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import *

sns.set()

In [None]:
class FaceMaskDataset(Dataset):

    def __init__(self,
                 root_dir,
                 have_label=True,
                 transform=transforms.Compose([transforms.RandomResizedCrop(224),
                                               transforms.ToTensor(),
                                               transforms.Normalize([0.485, 0.456, 0.406],
                                                                    [0.229, 0.224, 0.225])])):
        """
        Args:
            root_dir (string): Directory with all the images.
            have_label (boolean): Flag for images having the labels in their names.
            transform (callable, optional): Optional transform to be applied
                on a batch.
        """
        self.root_dir = root_dir
        self.have_label = have_label
        self.transform = transform
        self.images_paths = glob(f'{self.root_dir}/*.jpg')
        self.labels = None
        if have_label:
            self.labels = torch.LongTensor([int(image_path.split('.jpg')[0][-1]) for image_path in self.images_paths])

    def __len__(self):
        return len(self.images_paths)

    def __getitem__(self, idx):
        image_path = self.images_paths[idx]
        image = Image.open(image_path)
        if self.transform:
            image = self.transform(image)       
        if self.have_label:
            label = self.labels[idx]
            item = (image, label)
        else:
            item = image
        return item

In [None]:
def train_model(model, train_dataloader, eval_dataloaders, datasets_sizes, criterion, optimizer, device='cuda:0', num_epochs=20, print_epoch=1, scheduler=None, data_types=['train', 'test'], save_model=False):
    start_time = time.time()
    best_f1 = 0.0
    best_model = deepcopy(model.state_dict())
    
    for epoch in range(0, num_epochs + 1):
        start_epoch = time.time()
        print("Epoch [{}/{}]".format(str(epoch).zfill(len(str(num_epochs))), num_epochs))
        
        if epoch != 0:
            model.train()

            for inputs, labels in train_dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(True):
                    outputs = model(inputs)
                    
                    
                    loss = criterion(outputs, labels)
                    
                    loss.backward()
                    optimizer.step()
            if scheduler is not None:
                scheduler.step()
           
        model.eval()
        
        for data_type in data_types:
            epoch_loss = 0.0
            
            y_true = np.array([])
            y_pred = np.array([])
            y_score = np.array([])
        
            for inputs, labels in eval_dataloaders[data_type]:
                y_true = np.append(y_true, labels.numpy())
                
                inputs = inputs.to(device)
                labels = labels.to(device)
                                
                with torch.set_grad_enabled(False):
                    outputs = model(inputs)
            
                    loss = criterion(outputs, labels)
                
                    scores = F.softmax(outputs, 1)[:,1]
                    y_score = np.append(y_score, scores.to('cpu').numpy())
                    
                    _, pred = torch.max(outputs, 1)
                    y_pred = np.append(y_pred, pred.to('cpu').numpy())
                        
                epoch_loss += loss.item() * inputs.size(0)
                
            epoch_loss = epoch_loss / datasets_sizes[data_type]
            epoch_acc = accuracy_score(y_true, y_pred) * 100
            epoch_p = precision_score(y_true, y_pred, zero_division=0) * 100
            epoch_r = recall_score(y_true, y_pred, zero_division=0) * 100
            epoch_f1 = f1_score(y_true, y_pred, average='binary', zero_division=0) * 100
            epoch_roc_auc = roc_auc_score(y_true, y_score) * 100
            
            print('{} Loss: {:.4f} F1: {:2.2f} Precision: {:2.2f} Recall: {:2.2f} Accuracy: {:2.2f} ROC-AUC: {:2.2f}'.format(
                data_type.ljust(5), epoch_loss, epoch_f1, epoch_p, epoch_r, epoch_acc, epoch_roc_auc))
            
            if data_type == 'test' and epoch_f1 > best_f1:
                best_f1 = epoch_f1
                best_model = deepcopy(model.state_dict())
        
        epoch_elapsed = time.time() - start_epoch
        print('Epoch {} took {}m {:.0f}s'.format(
        epoch, int(epoch_elapsed // 60), epoch_elapsed % 60))
        
        print('-' * 90)
        
    time_elapsed = time.time() - start_time
    print('Training complete in {}m {:.0f}s'.format(
    int(time_elapsed // 60), time_elapsed % 60))
    
    print('Best test F1: {:2f}'.format(best_f1))
    if save_model:
        torch.save(best_model, 'model_{:2f}.pkl'.format(best_f1))
    model.load_state_dict(best_model)
    return model

In [None]:
root_dir='/StudentData/hw2_data'
num_classes = 2
phases = ['train', 'eval']
data_types = ['train', 'test']
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
data_transforms = {
    'train' : transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(), 
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]), 
    'eval' : transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

train_dataset = FaceMaskDataset(root_dir=f'{root_dir}/train', have_label=True, transform=data_transforms['train'])
eval_datasets = {data_type : FaceMaskDataset(root_dir=f'{root_dir}/{data_type}', have_label=True, transform=data_transforms['eval']) for data_type in data_types}

datasets_sizes = {data_type : len(eval_datasets[data_type]) for data_type in data_types}

In [None]:
batch_sizes = {
    'train' : 128,
    'eval' : 64
}

shuffles = {
    'train' : True,
    'eval' : False
}

train_dataloader = DataLoader(train_dataset, batch_sizes['train'], shuffles['train'])
eval_dataloaders = {data_type : DataLoader(eval_datasets[data_type], batch_sizes['eval'], shuffles['eval']) for data_type in data_types}

In [None]:
def conv_bn(inp, oup, stride, conv_layer=nn.Conv2d, norm_layer=nn.BatchNorm2d, nlin_layer=nn.ReLU):
    return nn.Sequential(
        conv_layer(inp, oup, 3, stride, 1, bias=False),
        norm_layer(oup),
        nlin_layer(inplace=True)
    )


def conv_1x1_bn(inp, oup, conv_layer=nn.Conv2d, norm_layer=nn.BatchNorm2d, nlin_layer=nn.ReLU):
    return nn.Sequential(
        conv_layer(inp, oup, 1, 1, 0, bias=False),
        norm_layer(oup),
        nlin_layer(inplace=True)
    )


class Hswish(nn.Module):
    def __init__(self, inplace=True):
        super(Hswish, self).__init__()
        self.inplace = inplace

    def forward(self, x):
        return x * F.relu6(x + 3., inplace=self.inplace) / 6.


class Hsigmoid(nn.Module):
    def __init__(self, inplace=True):
        super(Hsigmoid, self).__init__()
        self.inplace = inplace

    def forward(self, x):
        return F.relu6(x + 3., inplace=self.inplace) / 6.


class SEModule(nn.Module):
    def __init__(self, channel, reduction=4):
        super(SEModule, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            Hsigmoid()
            # nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)


class Identity(nn.Module):
    def __init__(self, channel):
        super(Identity, self).__init__()

    def forward(self, x):
        return x


def make_divisible(x, divisible_by=8):
    import numpy as np
    return int(np.ceil(x * 1. / divisible_by) * divisible_by)


class MobileBottleneck(nn.Module):
    def __init__(self, inp, oup, kernel, stride, exp, se=False, nl='RE'):
        super(MobileBottleneck, self).__init__()
        assert stride in [1, 2]
        assert kernel in [3, 5]
        padding = (kernel - 1) // 2
        self.use_res_connect = stride == 1 and inp == oup

        conv_layer = nn.Conv2d
        norm_layer = nn.BatchNorm2d
        if nl == 'RE':
            nlin_layer = nn.ReLU # or ReLU6
        elif nl == 'HS':
            nlin_layer = Hswish
        else:
            raise NotImplementedError
        if se:
            SELayer = SEModule
        else:
            SELayer = Identity

        self.conv = nn.Sequential(
            # pw
            conv_layer(inp, exp, 1, 1, 0, bias=False),
            norm_layer(exp),
            nlin_layer(inplace=True),
            # dw
            conv_layer(exp, exp, kernel, stride, padding, groups=exp, bias=False),
            norm_layer(exp),
            SELayer(exp),
            nlin_layer(inplace=True),
            # pw-linear
            conv_layer(exp, oup, 1, 1, 0, bias=False),
            norm_layer(oup),
        )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNetV3(nn.Module):
    def __init__(self, n_class=1000, input_size=224, dropout=0.8, mode='small', width_mult=1.0):
        super(MobileNetV3, self).__init__()
        input_channel = 16
        last_channel = 1280
        if mode == 'large':
            # refer to Table 1 in paper
            mobile_setting = [
                # k, exp, c,  se,     nl,  s,
                [3, 16,  16,  False, 'RE', 1],
                [3, 64,  24,  False, 'RE', 2],
                [3, 72,  24,  False, 'RE', 1],
                [5, 72,  40,  True,  'RE', 2],
                [5, 120, 40,  True,  'RE', 1],
                [5, 120, 40,  True,  'RE', 1],
                [3, 240, 80,  False, 'HS', 2],
                [3, 200, 80,  False, 'HS', 1],
                [3, 184, 80,  False, 'HS', 1],
                [3, 184, 80,  False, 'HS', 1],
                [3, 480, 112, True,  'HS', 1],
                [3, 672, 112, True,  'HS', 1],
                [5, 672, 160, True,  'HS', 2],
                [5, 960, 160, True,  'HS', 1],
                [5, 960, 160, True,  'HS', 1],
            ]
        elif mode == 'small':
            # refer to Table 2 in paper
            mobile_setting = [
                # k, exp, c,  se,     nl,  s,
                [3, 16,  16,  True,  'RE', 2],
                [3, 72,  24,  False, 'RE', 2],
                [3, 88,  24,  False, 'RE', 1],
                [5, 96,  40,  True,  'HS', 2],
                [5, 240, 40,  True,  'HS', 1],
                [5, 240, 40,  True,  'HS', 1],
                [5, 120, 48,  True,  'HS', 1],
                [5, 144, 48,  True,  'HS', 1],
                [5, 288, 96,  True,  'HS', 2],
                [5, 576, 96,  True,  'HS', 1],
                [5, 576, 96,  True,  'HS', 1],
            ]
        else:
            raise NotImplementedError

        # building first layer
        assert input_size % 32 == 0
        last_channel = make_divisible(last_channel * width_mult) if width_mult > 1.0 else last_channel
        self.features = [conv_bn(3, input_channel, 2, nlin_layer=Hswish)]
        self.classifier = []

        # building mobile blocks
        for k, exp, c, se, nl, s in mobile_setting:
            output_channel = make_divisible(c * width_mult)
            exp_channel = make_divisible(exp * width_mult)
            self.features.append(MobileBottleneck(input_channel, output_channel, k, s, exp_channel, se, nl))
            input_channel = output_channel

        # building last several layers
        if mode == 'large':
            last_conv = make_divisible(960 * width_mult)
            self.features.append(conv_1x1_bn(input_channel, last_conv, nlin_layer=Hswish))
            self.features.append(nn.AdaptiveAvgPool2d(1))
            self.features.append(nn.Conv2d(last_conv, last_channel, 1, 1, 0))
            self.features.append(Hswish(inplace=True))
        elif mode == 'small':
            last_conv = make_divisible(576 * width_mult)
            self.features.append(conv_1x1_bn(input_channel, last_conv, nlin_layer=Hswish))
            # self.features.append(SEModule(last_conv))  # refer to paper Table2, but I think this is a mistake
            self.features.append(nn.AdaptiveAvgPool2d(1))
            self.features.append(nn.Conv2d(last_conv, last_channel, 1, 1, 0))
            self.features.append(Hswish(inplace=True))
        else:
            raise NotImplementedError

        # make it nn.Sequential
        self.features = nn.Sequential(*self.features)

        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(p=dropout),    # refer to paper section 6
            nn.Linear(last_channel, n_class),
        )

        self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = x.mean(3).mean(2)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

In [None]:
# model = mobilenet_v2(pretrained=False)
# model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)

model = MobileNetV3(n_class=2, input_size=224, dropout=0.0, mode='large', width_mult=1.0)

In [None]:
model = model.to(device)

In [None]:
print('Total Number of Parameters: {:.2f}M'.format(sum(param.numel() for param in model.parameters()) / 1e6))

In [None]:
# learning_rate *= gamma every (step_size) epochs
learning_rate = 1e-3
step_size = 5
gamma = 1e-1

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

In [None]:
best_model = train_model(model, train_dataloader, eval_dataloaders, datasets_sizes, criterion, optimizer, scheduler=scheduler)