
# **0. Mask augmentation**

*   mount google drive
*   MaskTheFace github (https://github.com/aqeelanwar/MaskTheFace.git)
*   include mask augmented face image samples in report



In [None]:
# google drive mount
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# drive mount 후 colab의 현재 경로 /content
# ! git clone https://github.com/aqeelanwar/MaskTheFace.git

In [None]:
# WANDB 설치
! pip install wandb

# MaskTheFace Package 설치
# ! pip install dlib
# ! pip install face-recognition
# ! pip install face-recognition-models
# ! pip install dotmap

In [None]:
# # import packages
# from pathlib import Path
# import os, shutil

# # DATA_URL = "mask_data" # 구글 드라이브에서의 폴더 이름
# ZIP_FILENAME = "data_original.zip" # 폴더 안에 있는 data.zip 원본 파일 이름
# UNZIP_DESTINATION = "dataset" # Colab VM에 저장될 폴더 이름
# NUM_WORKERS = os.cpu_count()

# DRIVE_URL = Path('/content/drive/MyDrive')
# ORIGINAL_DATA_URL = DRIVE_URL / ZIP_FILENAME

# BASE_URL = Path("/content")

# if not (BASE_URL / UNZIP_DESTINATION).exists():
#   (BASE_URL / UNZIP_DESTINATION).mkdir()

# if not (BASE_URL / ZIP_FILENAME).exists() and ORIGINAL_DATA_URL.exists():
#   shutil.copy2(ORIGINAL_DATA_URL, BASE_URL)

# if (BASE_URL / ZIP_FILENAME).exists():
#   os.system(f"unzip {ZIP_FILENAME} -d {UNZIP_DESTINATION}")
#   (BASE_URL / ZIP_FILENAME).unlink()

# BASE_URL = BASE_URL / UNZIP_DESTINATION

In [None]:
# cd 'MaskTheFace'

In [None]:
# ! python mask_the_face.py --path "/content/dataset/train/not_wearing_mask" --mask_type "random"

In [None]:
# ! python mask_the_face.py --path "/content/dataset/val/not_wearing_mask" --mask_type "random"

In [None]:
# directories = {
#     "train": {
#         "masked": "/content/dataset/train/not_wearing_mask_masked",
#         "wearing": "/content/dataset/train/wearing_mask"
#     },
#     "val": {
#         "masked": "/content/dataset/val/not_wearing_mask_masked",
#         "wearing": "/content/dataset/val/wearing_mask"
#     }
# }

# for key, paths in directories.items():
#     SOURCE = paths["masked"]
#     DESTINATION = paths["wearing"]

#     if os.path.exists(SOURCE):
#         shutil.rmtree(DESTINATION)

#     os.rename(SOURCE, DESTINATION)

# **1. Prepare Data for Training**

*   data_loader using *torchvision.datasets.ImageFolder* for Custom dataset
*   **image augmentation** in *transforms*
*   include augmented face image samples in report



In [None]:
# import packages
# Pathlib : 파일 경로를 다루는데 사용하는 라이브러리
# shutil : 파일을 복사하거나 이동하는데 사용하는 라이브러리
# torch : 파이토치 라이브러리
# torchvision : 파이토치에서 제공하는 이미지 관련 라이브러리
# tqdm : 진행상황을 보여주는 라이브러리
# sklearn : 사이킷런 

from pathlib import Path
import os, shutil

import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader

import torchvision
from torchvision import transforms
from torchvision.transforms import v2

from tqdm import tqdm
from sklearn import metrics

# DATASET 들 경로 설정 및 압축 해제

DATA_URL = "mask_data" # 구글 드라이브에서의 폴더 이름
ZIP_FILENAME = "data.zip" # 폴더 안에 있는 zip 파일 이름
UNZIP_DESTINATION = "dataset" # Colab VM에 저장될 폴더 이름
NUM_WORKERS = os.cpu_count()

DRIVE_URL = Path('/content/drive/MyDrive') / DATA_URL
ORIGINAL_DATA_URL = DRIVE_URL / ZIP_FILENAME

BASE_URL = Path("/content")

if not (BASE_URL / UNZIP_DESTINATION).exists():
  (BASE_URL / UNZIP_DESTINATION).mkdir()

if not (BASE_URL / ZIP_FILENAME).exists() and ORIGINAL_DATA_URL.exists():
  shutil.copy2(ORIGINAL_DATA_URL, BASE_URL)

if (BASE_URL / ZIP_FILENAME).exists():
  os.system(f"unzip {ZIP_FILENAME} -d {UNZIP_DESTINATION}")
  (BASE_URL / ZIP_FILENAME).unlink()

drive.flush_and_unmount()

BASE_URL = BASE_URL / UNZIP_DESTINATION

In [None]:
# transforms for image augmentation
train_transform = transforms.v2.Compose([
    v2.CenterCrop(112),
    v2.RandomHorizontalFlip(),
    v2.RandomVerticalFlip(),
    v2.RandomRotation(degrees=(0, 180)),
    # v2.ColorJitter(),
    # v2.Grayscale(num_output_channels=3),
    # v2.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5.)),
    v2.RandomPerspective(distortion_scale=0.6, p=1.0),
    # scale=True의 의미: https://pytorch.org/vision/main/transforms.html#dtype-and-expected-value-range
    # v2를 사용한 이유: https://pytorch.org/vision/main/transforms.html#v1-or-v2-which-one-should-i-use
    # v2를 사용하면 ToTensor 대신 ToImage와 ToDtype의 조합 사용해야함.
    v2.ToImage(), v2.ToDtype(torch.float32, scale=True), # scale=True -> [0, 1] 사이로 값 조정
])

val_transform = transforms.v2.Compose([
    v2.CenterCrop(112),
    v2.RandomHorizontalFlip(),
    v2.RandomVerticalFlip(),
    v2.RandomRotation(degrees=(0, 180)),
    # v2.ColorJitter(),
    # v2.Grayscale(num_output_channels=3),
    # v2.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5.)),
    v2.RandomPerspective(distortion_scale=0.6, p=1.0),
    v2.ToImage(), v2.ToDtype(torch.float32, scale=True),
])

test_transform = transforms.v2.Compose([
    v2.Resize(128),
    v2.CenterCrop(112),
    v2.RandomHorizontalFlip(),
    v2.RandomVerticalFlip(),
    v2.RandomRotation(degrees=(0, 180)),
    # v2.ColorJitter(),
    # v2.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5.)),
    v2.RandomPerspective(distortion_scale=0.6, p=1.0),
    v2.ToImage(), v2.ToDtype(torch.float32, scale=True),
])

In [None]:
from torchvision.datasets import ImageFolder

train_path = BASE_URL / 'train'
val_path = BASE_URL / 'val'
test_path = BASE_URL / 'test'

# write ImageFolder code below
train_dataset = ImageFolder(train_path, transform=train_transform)
val_dataset = ImageFolder(val_path, transform=val_transform)
test_dataset = ImageFolder(test_path, transform=test_transform)

# check the label
# train_data.class_to_idx -> {'not_wearing_mask': 0, 'wearing_mask': 1}
# val_data.class_to_idx -> {'not_wearing_mask': 0, 'wearing_mask': 1}
print(f"{train_path} : {train_dataset.class_to_idx}")
print(f"{val_path}: {val_dataset.class_to_idx}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def imshow(img, ax=None, title=None, fontsize=8):
    if ax is None:
        ax = plt.gca()
    img = img / 2 + 0.5  # unnormalize if normalization was applied
    npimg = img.numpy()
    ax.imshow(np.transpose(npimg, (1, 2, 0)))
    if title is not None:
        ax.set_title(title, fontsize=fontsize)
    ax.axis('off')

# 이미지와 라벨 출력
def show_images(dataset, predicted_labels=None, num_images_per_class=4):
    class_indices = {class_name: [] for class_name in dataset.class_to_idx.keys()}

    for idx, (img, label) in enumerate(dataset):
        class_name = list(dataset.class_to_idx.keys())[label]
        if len(class_indices[class_name]) < num_images_per_class:
            class_indices[class_name].append(idx)
        if all(len(indices) == num_images_per_class for indices in class_indices.values()):
            break

    fig, axes = plt.subplots(2, 4, figsize=(6, 3))
    axes = axes.flatten()
    for i, class_name in enumerate(class_indices.keys()):
        for j, idx in enumerate(class_indices[class_name]):
            img, label = dataset[idx]
            title = f"{class_name} : {label}"

            if predicted_labels is not None:
              predicted_label = predicted_labels[idx]
              title += f"\nprediction : {predicted_label}"
            ax = axes[i * num_images_per_class + j]
            imshow(img, ax=ax, title=title)
    plt.tight_layout()
    plt.show()

# 이미지 출력
show_images(train_dataset)

# **2. Prepare Model**

*   Pytorch ResNet - *ref*. https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py
*   use **ResNet50** from torchvision.model_zoo
*   explore more models in https://pytorch.org/vision/stable/models.html
*   **change the dimension of the classifier**

In [None]:
# assign device cpu or gpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
import torchvision.models as models
from torchvision.models import ResNet50_Weights # for pretrained

def initialize_weights(m):
  if isinstance(m, nn.Conv2d):
    nn.init.kaiming_uniform_(m.weight.data,nonlinearity='relu')
  elif isinstance(m, nn.BatchNorm2d):
    nn.init.constant_(m.weight.data, 1)
    nn.init.constant_(m.bias.data, 0)
  elif isinstance(m, nn.Linear):
    nn.init.kaiming_uniform_(m.weight.data)
    nn.init.constant_(m.bias.data, 0)
  elif isinstance(m, nn.Sequential):
    for sm in list(m):
      initialize_weights(sm)
  elif isinstance(m, models.resnet.Bottleneck):
    for sm in m.children():
      initialize_weights(sm)

def init_resnet50(init_weights = False):
  model = models.resnet50(weights = ResNet50_Weights.IMAGENET1K_V2) #pretrain model = ResNet50_Weights.IMAGENET1K_V2
  model.fc = nn.Linear(model.fc.in_features, 1) # change the # of classes

  if init_weights:
    model.apply(initialize_weights)

  return model

In [None]:
# load model and change the # of classes
model = init_resnet50()

print(model)

# **3. Training**


*   write **training code** including belows:
   - hyper parameters such as batch size, learning rate, epoch
   - criterion(loss function such as BCELoss), optimizer(eg. Adam, SGD, etc.)  and scheduler
   - save model weight

*   **print training/validation loss and accuracy** per epoch or iteration
*   inlcude visualizer, **tensorboard**, to show training/validation accuracy and loss


In [None]:
import wandb
import sklearn
from sklearn import metrics
import datetime
import os
import torch.optim as optim
from tqdm import tqdm

WANDB_KEY = "8aa54cefa31fe5992ce2d2969f979237aae9f81a"
NUM_WORKERS = os.cpu_count()

wandb.login(key=WANDB_KEY)

def init_wandb(project_name = "ICPBL_REPORT", name="test", config = None):
  if config is None:
    config = {
      "architecture": "ResNet50",
      "learning_rate": 0.01,
      "batch_size": 64,
      "epochs": 50,
      "weight_decay": 0,
      "momentum": 0.93,
      "init_weights": False,
      "optimizer": 'SGD',
      "lr_scheduler": "CosineAnnealing",
    }

  logger = wandb.init(
    # set the wandb project where this run will be logged
    project=project_name,
    name = name,
    # track hyperparameters and run metadata
    config=config
  )

  return logger

[34m[1mwandb[0m: W&B API key is configured. Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


In [None]:
# utils
import matplotlib.pyplot as plt

def get_labels_from_probs(y_hat):
  return torch.where(F.sigmoid(y_hat.reshape(-1)) > 0.5, 1, 0)

def get_acc(y_hat, labels):
  if torch.is_floating_point(y_hat):
    y_hat = get_labels_from_probs(y_hat)

  return y_hat.eq(labels).sum().item()

def get_roc_curve_data(labels, y_hat):
  y_hat = y_hat.reshape(-1)
  fprs, tprs, _ = metrics.roc_curve(labels, y_hat)
  roc_auc = metrics.roc_auc_score(labels, y_hat)

  return fprs, tprs, roc_auc

def plot_roc_curve(fprs, tprs, roc_auc):
  # sklearn 으로 ROC Curve 그리기
  # ROC Curve를 plot 곡선으로 그림.
  plt.plot(fprs , tprs, label=f'ROC (AUC: {roc_auc:.4f})')
  # 가운데 대각선 직선을 그림.
  plt.plot([0, 1], [0, 1], 'k:', label='Random')
  plt.title('ROC curve')
  plt.xlabel('FPR')
  plt.ylabel('TPR')
  plt.legend(loc='best')
  plt.savefig('ROC.png',dpi=300);
  return

In [None]:
def val_model(model, criterion, dataloader, use_sklearn = False, plot_mask=False):
  # write validation code
  if model.training:
    model.eval()

  accuracy = 0.0
  loss = 0.0

  global_y_hat = torch.Tensor()
  global_y_hat.requires_grad = False
  global_labels = torch.Tensor()
  global_labels.requires_grad = False

  with torch.no_grad():
    for images, labels in tqdm(dataloader, total=len(dataloader)):
      images = images.to(device)
      labels = labels.to(device)

      y_hat = model(images)
      y_hat = y_hat.reshape(-1)

      _l = criterion(y_hat, labels.type(dtype=torch.float32))
      loss += _l.item()

      accuracy += get_acc(y_hat, labels)

      global_y_hat = torch.cat([global_y_hat, y_hat.cpu()], dim=-1)
      global_labels = torch.cat([global_labels, labels.cpu()], dim=-1)

  loss = float("{:.4f}".format(loss / len(dataloader)))
  accuracy = float("{:.4f}".format(accuracy / len(dataloader.dataset)))

  if plot_mask is True:
    predicted_labels = get_labels_from_probs(global_y_hat)
    show_images(dataloader.dataset, predicted_labels)

  fprs, tprs, auc = get_roc_curve_data(global_labels, global_y_hat)

  if use_sklearn:
    plot_roc_curve(fprs, tprs, auc)

  global_y_hat = torch.stack([torch.zeros_like(global_y_hat), global_y_hat], dim=1)
  roc = wandb.plot.roc_curve(global_labels, global_y_hat, labels=["not_wearing_mask", "wearing_mask"])

  return accuracy, loss, auc, roc

In [None]:
def train_model(model, optim, criterion, dataloader, save_model=True):
  if not model.training:
      model.train()

  accuracy = 0.0
  loss = 0.0

  global_y_hat = torch.Tensor()
  global_labels = torch.Tensor()

  for images, labels in tqdm(dataloader, total=len(dataloader)):
    images = images.to(device)
    labels = labels.to(device)

    y_hat = model(images)
    y_hat = y_hat.reshape(-1)

    _l = criterion(y_hat, labels.type(dtype=torch.float32))

    optim.zero_grad()
    _l.backward()
    optim.step()

    loss += _l
    accuracy += get_acc(y_hat, labels)

    global_y_hat = torch.cat([global_y_hat, y_hat.cpu()], dim=-1)
    global_labels = torch.cat([global_labels, labels.cpu()], dim=-1)

  loss = float("{:.4f}".format(loss / len(dataloader)))
  accuracy = float("{:.4f}".format(accuracy / len(dataloader.dataset)))

  global_y_hat = get_labels_from_probs(global_y_hat)
  f1_score = metrics.f1_score(global_y_hat, global_labels)
  f1_score = float("{:.4f}".format(f1_score))

  return accuracy, loss, f1_score

In [None]:
# hyper parameters
config = {
    "architecture": "ResNet50",
    "learning_rate": 0.01,
    "batch_size": 64,
    "epochs": 50,
    "weight_decay": 0,
    "momentum": 0.93,
    "init_weights": False,
    "optimizer": 'SGD',
    "scheduler_type": '',
}

In [None]:
# optimizer, loss, scheduler
model = init_resnet50(init_weights=config["init_weights"])
model.to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
if config["optimizer"] == 'SGD':
  optimizer = optim.SGD(model.parameters(), lr=config["learning_rate"], momentum=config["momentum"])
elif config["optimizer"] == 'Adam':
  optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"], weight_decay=config["weight_decay"])
else:
  pass

criterion = nn.BCEWithLogitsLoss()
criterion.to(device)

# lr_scheduler
lr_scheduler = None
if config["scheduler_type"] == 'StepLR':
  lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma=0.8)
elif config["scheduler_type"] == 'CyclicLR':
  lr_scheduler = optim.lr_scheduler.CyclicLR(optimizer, base_lr=0.005, max_lr=0.015, step_size_up=5, step_size_down=5, mode='triangular')
elif config["scheduler_type"] == 'CosineAnnealingLR':
  lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5, eta_min = 0.001)
else:
  pass

print(lr_scheduler)

In [None]:
# data_loader
train_dataloader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
val_dataloader = DataLoader(val_dataset, batch_size=config["batch_size"], shuffle=True, num_workers=NUM_WORKERS, pin_memory=False)
test_dataloader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=True, num_workers=NUM_WORKERS, pin_memory=False)

In [None]:
# training and validation code can be writed in one function. It's your taste!
from copy import deepcopy
import json

logger = init_wandb(project_name="ResNet50", config=config)

best_f1_score = 0.0
best_f1_model = None
best_model_metadata = None

for epoch in range(config['epochs']):
  train_acc, train_loss, f1_score = train_model(model, optim=optimizer, criterion=criterion, dataloader=train_dataloader)
  val_acc, val_loss, auc, roc = val_model(model, criterion=criterion, dataloader=val_dataloader)

  logger.log({"train_acc": train_acc}, step=epoch)
  logger.log({"train_loss": train_loss}, step=epoch)
  logger.log({"val_acc": val_acc}, step=epoch)
  logger.log({"val_loss": val_loss}, step=epoch)

  if epoch % 10 == 0:
    logger.log({"roc": roc, "auc": auc})

  if f1_score > best_f1_score:
    best_f1_score = f1_score
    best_f1_model = deepcopy(model.state_dict())
    best_model_metadata = {
        "epoch": epoch
    }

  print("")
  print(f"{'Epoch':<20}:  {epoch+1}")
  print(f"{'Train Accuracy':<20}: {train_acc}")
  print(f"{'Train Loss':<20}: {train_loss}")
  print(f"{'Val Accuracy':<20}: {val_acc}")
  print(f"{'Val Loss':<20}: {val_loss}")
  print("")

wandb.finish()

save_model_path = Path("./best_model.pt")

torch.save(best_f1_model, save_model_path)
with open(Path("./best_model_metadata.json"), 'w', encoding="utf-8") as f:
  json.dump(best_model_metadata, f, ensure_ascii=False, indent=4)

In [None]:
val_acc, val_loss, auc, roc = val_model(model, criterion=criterion, dataloader=val_dataloader, plot_mask=True)

In [None]:
# Evaluation code with our trained model
# ROC curve, AUC (Hint: use sklearn or wandb function, using sklearn to extract fpr, tpr will be bonus score)
model = init_resnet50()
model.load_state_dict(torch.load(save_model_path))
model.to(device)
test_acc, test_loss, roc_auc, roc = val_model(model, criterion=criterion, dataloader=test_dataloader, use_sklearn=True, plot_mask=True)

In [None]:
print(test_acc)

# 22 -> init_weight+Cos+0.93(22) 0.79
# 23 -> weight_decay = 0.0001 , momentum = 0.93 -> 0.7949
# 25 -> weight_decay = 0.0001 , momentum = 0.93, dropout = 0.5 -> 80.94
# drop_test (cosineannealingLR): 78.9퍼
# drop_test (stepLR) : 75퍼
# default_Step : 76퍼