In [64]:
import argparse
import glob
import json
import multiprocessing
import os
import parser
import random
import re
from enum import Enum
from pathlib import Path
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from PIL import Image
from torch.optim import SGD
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset, Subset, random_split
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from sklearn.metrics import f1_score


## HyperParameters

In [65]:
class HyperParameter():
    EPOCH = 10
    BATCH_SIZE = 64
    RESIZE = (96,128)
    SAVE_INTERVAL = 10
    LOG_INTERVAL =20
    TRAIN_IMAGE_DIR = '/opt/ml/input/data/train/images'
    TRAIN_CSV_DIR = '/opt/ml/input/data/train/train.csv'
    TEST_IMAGE_DIR = '/opt/ml/input/data/eval/images'
    TEST_CSV_DIR = '/opt/ml/input/data/eval/info.csv'
    SAVE_DIR = '/opt/ml/model/run1'
    SEED = 43
    LEARNING_RATE = 0.001
    VALIDATION_RATIO = 0.2
    NUM_CLASS=18
    NUM_MASK_CLASS=3
    NUM_GENDER_CLASS=2
    NUM_AGE_CLASS=3


## Dataset

In [66]:
class MaskLabels():
    MASK = 0
    INCORRECT = 1
    NORMAL = 2

    @classmethod
    def from_str(self, value: str) -> int:
        value = value.lower()
        if value in ["mask1", "mask2", "mask3", "mask4", "mask5"]:
            return self.MASK
        elif value == "incorrect_mask":
            return self.INCORRECT
        else:
            return self.NORMAL


class GenderLabels():
    MALE = 0
    FEMALE = 1

    @classmethod
    def from_str(self, value: str) -> int:
        value = value.lower()
        if value == "male":
            return self.MALE
        elif value == "female":
            return self.FEMALE
        else:
            raise ValueError(
                f"Gender value should be either 'male' or 'female', {value}")


class AgeLabels():
    YOUNG = 0
    MIDDLE = 1
    OLD = 2

    @classmethod
    def from_number(self, value: int) -> int:
        if value < 30:
            return self.YOUNG
        elif value < 60:
            return self.MIDDLE
        else:
            return self.OLD


In [67]:
class SplitByHumanDataset(Dataset):
    """
        사람을 기준으로 dataset을 나눴음. Test score과 Validation Score간의 격차를 줄일 수 있다
    
    """
    def _get_images(self):
        IMG_EXTENSIONS = [".jpg", ".JPG", ".jpeg", ".JPEG",
                          ".png", ".PNG", ".ppm", ".PPM", ".bmp", ".BMP"]
        MASK_TYPES = ["mask1", "mask2", "mask3",
                      "mask4", "mask5", "incorrect_mask", "normal"]
        df = pd.read_csv(HyperParameter.TRAIN_CSV_DIR)
        for i in self.df_index:
            row = df.iloc[i]
            p = os.path.join(HyperParameter.TRAIN_IMAGE_DIR, row.path)
            files = os.listdir(p)
            for mask_type in MASK_TYPES:
                for ext in IMG_EXTENSIONS:
                    if mask_type+ext in files:
                        self.image_paths.append(os.path.join(p, mask_type+ext))
                        self.mask_labels.append(MaskLabels.from_str(mask_type))
                        self.gender_labels.append(
                            GenderLabels.from_str(row.gender))
                        self.age_labels.append(
                            AgeLabels.from_number(row.age.item()))


    def __init__(self, df_index: list ,transform=None) -> None:
        self.image_paths = []
        self.mask_labels = []
        self.gender_labels = []
        self.age_labels = []
        self.transform = transform
        self.df_index=df_index
        self._get_images()


    def __getitem__(self, index):
        img =Image.open(self.image_paths[index])
        label=[self.mask_labels[index], self.gender_labels[index], self.age_labels[index]]
        if self.transform:
            img=self.transform(img)
            
        return img,label

    def __len__(self):
        return len(self.image_paths)
    
    @staticmethod
    def split_train_val():
        df=pd.read_csv(HyperParameter.TRAIN_CSV_DIR)
        val=int(len(df)*HyperParameter.VALIDATION_RATIO)
        train=len(df)-val
        t,v=random_split(list(range(len(df))),[train,val])
        return t,v

    
    @staticmethod
    def multi_to_single(mask_label:int,gender_label:int,age_label:int) -> int:
        """
            mask, gender, age로 이루어진 label들을 하나의 정수로 변환
        """
        return mask_label*6+gender_label*3+age_label
    
    @staticmethod
    def single_to_multi(label:int):
        """
            mask, gender, age로 이루어진 label들을 하나의 정수로 변환
        """
        mask= label//6
        gender=(label//3)%2
        age=label%3
        return mask,gender,age

class TestDataset(Dataset):


    def __init__(self,df, transform):
        self.transform=transform
        self.image_paths=[os.path.join(HyperParameter.TEST_IMAGE_DIR, img_id) for img_id in df.ImageID]

    def __getitem__(self, index):
        image = Image.open(self.image_paths[index])

        if self.transform:
            image = self.transform(image)
        return image

    def __len__(self):
        return len(self.image_paths)

    

## Loss Function

In [68]:
class FocalLoss(nn.Module):
    def __init__(self, weight=None,
                 gamma=2., reduction='mean'):
        nn.Module.__init__(self)
        self.weight = weight
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, input_tensor, target_tensor):
        log_prob = F.log_softmax(input_tensor, dim=-1)
        prob = torch.exp(log_prob)
        return F.nll_loss(
            ((1 - prob) ** self.gamma) * log_prob,
            target_tensor,
            weight=self.weight,
            reduction=self.reduction
        )

## Config

In [69]:
def seed_config(seed):
    # https://hoya012.github.io/blog/reproducible_pytorch/
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
seed_config(HyperParameter.SEED)

In [70]:
model=torchvision.models.resnet50(pretrained=True)
model.fc=nn.Linear(model.fc.in_features,HyperParameter.NUM_CLASS)
model.to(device)

transform=transforms.Compose([
    transforms.Resize(HyperParameter.RESIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
])


train_idx,val_idx=SplitByHumanDataset.split_train_val()
train_dataset=SplitByHumanDataset(train_idx, transform)
val_dataset=SplitByHumanDataset(val_idx, transform)

criterion=FocalLoss()
optimizer=torch.optim.Adam(params=model.parameters(),lr=HyperParameter.LEARNING_RATE)

train_loader = DataLoader(
    train_dataset,
    batch_size=HyperParameter.BATCH_SIZE,
    num_workers=multiprocessing.cpu_count() // 2,
    shuffle=True,
    pin_memory=use_cuda,
    drop_last=True,
)

val_loader = DataLoader(
    val_dataset,
    batch_size=HyperParameter.BATCH_SIZE,
    num_workers=multiprocessing.cpu_count() // 2,
    shuffle=False,
    pin_memory=use_cuda,
    drop_last=True,
)
logger = SummaryWriter(log_dir=HyperParameter.SAVE_DIR)

## Train

In [71]:
best_val_acc = 0
best_val_loss = float('inf')
best_val_f1=0
for epoch in range(HyperParameter.EPOCH):
    # train loop
    model.train()
    loss_value = 0
    matches = 0
    label_list=[]
    pred_list=[]
    for idx, train_batch in enumerate(train_loader):
        inputs, labels = train_batch
        inputs = inputs.to(device)
        labels = SplitByHumanDataset.multi_to_single(*labels).to(device)

        optimizer.zero_grad()

        outs = model(inputs)
        preds = torch.argmax(outs, dim=-1)
        loss = criterion(outs, labels)

        label_list+=labels.detach().cpu()
        pred_list+=preds.detach().cpu()

        loss.backward()
        optimizer.step()

        loss_value += loss.item()
        matches += (preds == labels).sum().item()
        if (idx + 1) % HyperParameter.LOG_INTERVAL == 0:
            train_loss = loss_value / HyperParameter.LOG_INTERVAL
            train_acc = matches / HyperParameter.BATCH_SIZE / HyperParameter.LOG_INTERVAL
            train_f1=f1_score(label_list,pred_list,average="macro")
            print(
                f"Epoch[{epoch}/{HyperParameter.EPOCH}]({idx + 1}/{len(train_loader)}) || "
                f"training F1 Score {train_f1:4.4} || training loss {train_loss:4.4} || training accuracy {train_acc:4.2%}"
            )
            logger.add_scalar("Train/Loss", train_loss, epoch * len(train_loader) + idx)
            logger.add_scalar("Train/Accuracy", train_acc, epoch * len(train_loader) + idx)
            logger.add_scalar("Train/F1_Score", train_acc, epoch * len(train_loader) + idx)
            loss_value = 0
            matches = 0
            label_list=[]
            pred_list=[]
    # evaluation session
    with torch.no_grad():
        model.eval()
        val_loss=0
        val_acc=0
        label_list=[]
        pred_list=[]
        for val_batch in val_loader:
            inputs, labels = val_batch
            inputs = inputs.to(device)
            labels = SplitByHumanDataset.multi_to_single(*labels).to(device)

            outs = model(inputs)
            preds = torch.argmax(outs, dim=-1)
            label_list+=labels.detach().cpu()
            pred_list+=preds.detach().cpu()
            val_acc+=(labels==preds).sum().item()
            val_loss+= criterion(outs, labels).item()


        val_loss = val_loss / len(val_loader)
        val_acc = val_acc / len(label_list)
        val_f1= f1_score(label_list,pred_list,average="macro")
        best_val_loss = min(best_val_loss, val_loss)
        best_val_acc = max(best_val_acc, val_acc)
        if val_f1 > best_val_f1:
            print(f"New best model for f1 score : {val_f1:4.2%}! saving the best model..")
            torch.save(model.state_dict(), f"{HyperParameter.SAVE_DIR}/best_model.pth")
            best_val_f1 = val_f1
        torch.save(model.state_dict(), f"{HyperParameter.SAVE_DIR}/last_model.pth")
        print(f"[Val] f1 : {val_f1:4.2%}, acc : {val_acc:4.2%}, loss: {val_loss:4.2} || best f1 : {best_val_f1:4.2%}, best acc : {best_val_acc:4.2%}, best loss: {best_val_loss:4.2}")
        logger.add_scalar("Val/Loss", val_loss, epoch)
        logger.add_scalar("Val/Accuracy", val_acc, epoch)
        logger.add_scalar("Val/F1_Score",val_f1,epoch)
        print("-"*60)

Epoch[0/10](20/236) || training F1 Score 0.279 || training loss 1.169 || training accuracy 51.64%
Epoch[0/10](40/236) || training F1 Score 0.4348 || training loss 0.5959 || training accuracy 66.95%
Epoch[0/10](60/236) || training F1 Score 0.5341 || training loss 0.457 || training accuracy 72.66%
Epoch[0/10](80/236) || training F1 Score 0.6135 || training loss 0.3489 || training accuracy 77.97%
Epoch[0/10](100/236) || training F1 Score 0.5662 || training loss 0.3616 || training accuracy 76.88%
Epoch[0/10](120/236) || training F1 Score 0.5933 || training loss 0.3393 || training accuracy 78.20%
Epoch[0/10](140/236) || training F1 Score 0.622 || training loss 0.268 || training accuracy 81.09%
Epoch[0/10](160/236) || training F1 Score 0.6356 || training loss 0.225 || training accuracy 83.83%
Epoch[0/10](180/236) || training F1 Score 0.677 || training loss 0.229 || training accuracy 83.44%
Epoch[0/10](200/236) || training F1 Score 0.6462 || training loss 0.2588 || training accuracy 82.73%
Ep

## Inference

In [72]:
model.load_state_dict(torch.load(os.path.join(HyperParameter.SAVE_DIR,"best_model.pth"),map_location=device))
model.eval()
test_df=pd.read_csv(HyperParameter.TEST_CSV_DIR)
test_dataset=TestDataset(df=test_df,transform=transform)
test_loader = DataLoader(
    test_dataset,
    batch_size=HyperParameter.BATCH_SIZE,
    num_workers=multiprocessing.cpu_count() // 2,
    shuffle=False,
    pin_memory=use_cuda,
    drop_last=False,
)
preds = []
with torch.no_grad():
    for idx, images in enumerate(test_loader):
        images = images.to(device)
        pred = model(images)
        pred = pred.argmax(dim=-1)
        preds.extend(pred.cpu().numpy())
test_df['ans'] = preds
save_path = os.path.join(HyperParameter.SAVE_DIR,'output.csv')
test_df.to_csv(save_path, index=False)
print(f"Inference Done! Inference result saved at {save_path}")

Inference Done! Inference result saved at /opt/ml/model/run1/output.csv
