# Dataset and Dependencies Preparation

In [None]:
# download dataset uploaded to my drive
!pip3 install gdown
!gdown https://drive.google.com/uc?id=1-jrCS61Ru-fRWUe46ot5ZvHNOsagm3eF
!unzip /content/face_mask.zip

In [None]:
# install dependencies
!pip3 install pytorch-lightning timm easydict torch-metrics==1.1.7

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Setting the Library and Configuration

In [None]:
from easydict import EasyDict as edict
import os
import os.path as osp

from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import transforms

## Experiment Configuration

In [None]:
dataset_cfg = {"train": {
                  "root": "/content/Face Mask Dataset/Train",
                  "transform": transforms.Compose([
                                transforms.Resize((224, 224)),
                                transforms.RandomHorizontalFlip(),
                                transforms.RandomRotation(10),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ]),
                   "batch_size": 16,
                   "shuffle": True,
                   "num_workers": 2
                  },
               "val": {
                   "root": "/content/Face Mask Dataset/Validation",
                   "transform": transforms.Compose([
                                transforms.Resize((224, 224)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ]),
                   "batch_size": 1,
                   "shuffle": False,
                   "num_workers": 2
               },
               "test": {
                   "root": "/content/Face Mask Dataset/Test",
                   "transform": transforms.Compose([
                                transforms.Resize((224, 224)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ]),
                   "batch_size": 1,
                   "shuffle": False,
                   "num_workers": 2
               }}
dataset_cfg = edict(dataset_cfg)

# model configuration
model_cfg = edict()
model_cfg.backbone = "mnasnet_100" #backbone name {efficientnet_b0, mnasnet_100, resnet18, mobilenetv2_100}, refer to this https://rwightman.github.io/pytorch-image-models/models/
model_cfg.num_embedding = 512
model_cfg.pretrained = False # load pre-trained weight from Imagenet pre-trained
model_cfg.num_classes = 1 # number of classes, 1 for mask/non-mask
model_cfg.threshold = 0.5 # threshold for confidence outputs

# trainer configuration
trainer_cfg = edict()
trainer_cfg.batch_size = 16
trainer_cfg.seed = 0
trainer_cfg.metrics = ['Accuracy', 'F1', 'Precision', 'Recall']
trainer_cfg.lr = 0.001
if model_cfg.pretrained == True:
  trainer_cfg.experiment_name = f"{model_cfg.backbone}_pretrained"
else:
  trainer_cfg.experiment_name = f"{model_cfg.backbone}_from_scratch"

trainer_cfg.weight_decay = 0.0005
trainer_cfg.logs_path = "/content/drive/MyDrive/tb_logs"
trainer_cfg.max_epochs = 20

## Dataset and Dataloader Initialization

In [None]:
# vars dataset
datasets = edict()
dataloaders = edict()

# initialize datasets
datasets.train = ImageFolder(root=dataset_cfg.train.root, 
                             transform=dataset_cfg.train.transform) 
datasets.val = ImageFolder(root=dataset_cfg.val.root, 
                           transform=dataset_cfg.val.transform)
datasets.test = ImageFolder(root=dataset_cfg.test.root, 
                            transform=dataset_cfg.test.transform)

# initialize dataloaders
dataloaders.train = DataLoader(datasets.train, 
                               batch_size=dataset_cfg.train.batch_size, 
                               shuffle=dataset_cfg.train.shuffle, 
                               num_workers=dataset_cfg.train.num_workers)
dataloaders.val = DataLoader(datasets.val, 
                             batch_size=dataset_cfg.val.batch_size, 
                             shuffle=dataset_cfg.val.shuffle, 
                             num_workers=dataset_cfg.val.num_workers)
dataloaders.test = DataLoader(datasets.test, 
                              batch_size=dataset_cfg.test.batch_size, 
                              shuffle=dataset_cfg.test.shuffle, 
                              num_workers=dataset_cfg.test.num_workers)

## Trainer

In [None]:
from typing import Any, Callable, cast, Dict, List, Optional, Tuple

import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import timm
import torchmetrics

In [None]:
class FaceMaskClassifier(pl.LightningModule):
  def __init__(self, model_cfg: dict, trainer_cfg: dict):
    super().__init__()

    # set configs
    self.save_hyperparameters()
    self.backbone = timm.create_model(model_cfg.backbone,
                                      pretrained=model_cfg.pretrained,
                                      num_classes=model_cfg.num_classes)
    self.hparams.model_cfg.threshold = torch.Tensor([self.hparams.model_cfg.threshold])
    self.loss = nn.BCELoss()

    # metrics
    self.metrics = edict()
    for mode in ['train', 'val', 'test']:
      self.metrics[mode] = edict()
      for metric in trainer_cfg.metrics:
          self.metrics[mode][metric] = getattr(torchmetrics, metric)
          if metric == "Accuracy":
            self.metrics[mode][metric] = self.metrics[mode][metric](threshold=self.hparams.model_cfg.threshold)
          else:
            self.metrics[mode][metric] = getattr(torchmetrics, metric)(num_classes=model_cfg.num_classes, 
                                                                      threshold=self.hparams.model_cfg.threshold,
                                                                      average="macro")            
    print(self.metrics)

  def forward(self, x):
    out = self.backbone(x)
    return out
  
  def training_step(self, batch, batch_idx):
    x, y = batch
    y = torch.unsqueeze(y, 1).to(torch.float32)

    # inferencing
    logits = self.forward(x)
    y_hat = torch.sigmoid(logits)

    # calculate loss value
    loss = self.loss(y_hat, y)

    # log loss
    self.log('train_loss', loss, prog_bar=True, logger=True, on_step=True, on_epoch=True)

    # calculate metrics & log metrics
    for metric in self.metrics.train:
      y = y.to(torch.int)
      self.log(f'train_{metric}', self.metrics.train[metric](y_hat.cpu(), y.cpu()), 
               prog_bar=True, logger=True, on_step=True, on_epoch=True)
    
    return loss

  def validation_step(self, batch, batch_idx):
    x, y = batch
    y = torch.unsqueeze(y, 1).to(torch.float32)

    # inferencing
    logits = self.forward(x)
    y_hat = torch.sigmoid(logits)

    # get prediction 
    results = edict()
    results.logits = logits.cpu()
    results.preds = y_hat.cpu()
    results.y = y.cpu()

    return results

  def validation_epoch_end(self, results):

    # get vars
    len_results = len(results)
    len_outputs = len(results[0])

    # create prediction outputs tensor
    outputs = edict()
    for output in results[0]:
      outputs[output] = torch.zeros((len_results, 1))
    
    for output in results[0]:
      for i in range(len_results):
        outputs[output][i, :] = results[i][output][0][0]

    # measure val loss
    loss_val = self.loss(outputs.preds, outputs.y)

    # calculate metrics & log metrics
    for metric in self.metrics.val:
      y = outputs.y.to(torch.int)
      self.logger.log_metrics({f'val_{metric}_epoch':self.metrics.val[metric](outputs.preds.cpu(), y.cpu())}, 
               step=self.current_epoch) 
    

  def test_step(self, batch, batch_idx):

    # get vars
    x, y = batch
    y = torch.unsqueeze(y, 1).to(torch.float32)

    # inferencing
    logits = self.forward(x)
    y_hat = torch.sigmoid(logits)

    # get prediction 
    results = edict()
    results.logits = logits.cpu()
    results.preds = y_hat.cpu()
    results.y = y.cpu()

    return results

  def test_epoch_end(self, results):
  
    # get vars
    len_results = len(results)
    len_outputs = len(results[0])

    # create prediction outputs tensor
    outputs = edict()
    for output in results[0]:
      outputs[output] = torch.zeros((len_results, 1))
    
    for output in results[0]:
      for i in range(len_results):
        outputs[output][i, :] = results[i][output][0][0]

    loss_val = self.loss(outputs.preds, outputs.y)

    # measure val loss
    metrics = edict()
    metrics.loss = float(loss_val.cpu().numpy())

    # calculate metrics & log metrics
    for metric in self.metrics.test:
      y = outputs.y.to(torch.int)
      metric_val = self.metrics.test[metric](outputs.preds.cpu(), y.cpu())
      self.logger.log_metrics({f'test_{metric}_epoch':metric_val}, 
               step=self.current_epoch) 
      metrics[metric] = float(metric_val.cpu().numpy())
    
    print(metrics)
    
    return metrics
    
  def configure_optimizers(self):
    return torch.optim.Adam(self.parameters(), lr=self.hparams.trainer_cfg.lr, 
                            weight_decay=self.hparams.trainer_cfg.weight_decay)


## Training

In [None]:
from pytorch_lightning.loggers import TensorBoardLogger

# training
# seed everything
pl.seed_everything(trainer_cfg.seed)

# logger
logger = TensorBoardLogger(trainer_cfg.logs_path, name=trainer_cfg.experiment_name)

# model
model = FaceMaskClassifier(model_cfg=model_cfg, trainer_cfg=trainer_cfg)

# trainer
trainer = pl.Trainer(gpus=1, 
                     max_epochs=trainer_cfg.max_epochs,
                     logger=logger)
trainer.fit(model, dataloaders.train, dataloaders.val)


## Testing

In [None]:
# # testing
result = trainer.test(test_dataloaders=dataloaders.test)
print(result)

## Logs

In [None]:
# !kill 3952
%load_ext tensorboard
%tensorboard --logdir /content/drive/MyDrive/tb_logs