## Set Environment

In [None]:
# Dependency Env Setting
!apt-get install libgl1-mesa-glx -y
!pip install ipywidgets
!pip install efficientnet_pytorch
!pip install adamp

In [None]:
# Import libraries
import pandas as pd
import numpy as np
import cv2
import PIL
import os
import time
import glob
import pickle
import random
from pathlib import Path
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset, Subset
from efficientnet_pytorch import EfficientNet
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
from adamp import AdamP

# Set random seed
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
print(f'seed : {seed}')

# Set device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'device : {device}')
print(torch.cuda.get_device_properties(device))

# Set ROOT_PATH
ROOT_PATH = os.getcwd()
print(f'ROOT_PATH : {ROOT_PATH}')

# Set Training Name
'각 실험의 name을 설정하고 Directory형태로 관리하기 위한 Code입니다.'
name = 'Dropout-B4'
if not os.path.isdir(f'custom_data/{name}') :
    os.chdir(os.path.join(ROOT_PATH, 'custom_data'))
    os.mkdir(f'{name}')
    os.chdir(ROOT_PATH)

## Set Train Data

In [None]:
# Set Image Directory Path
path = Path('input/data/train/images')
img_dirs = [str(x) for x in list(path.glob('*')) if '._' not in str(x)]
img_dirs = np.array(img_dirs)

# Set Semi-label
'Class별 층화추출을 위한 Code입니다.'
semi_labels = []
for img_dir in img_dirs :
    if 'female' in img_dir :
        g = 1
    else :
        g = 0
    
    age = int(img_dir.split('_')[3][:2])
    if age < 30 :
        a = 0
    elif age < 58 :
        a = 1
    else : 
        a = 2
    semi_labels.append(3*g + a)
semi_labels = np.array(semi_labels)

# Set Train set and Valid set
'사람(ID)기준으로 층화추출하는 Code입니다.'
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
folds = []
for t,v in skf.split(img_dirs, semi_labels) :
    folds.append({'train':t, 'valid':v})

In [None]:
def labeling(img_path) :
    'Mask의 여부까지 포함하여, 최종 Labeling을 수행하는 함수입니다.'
    if 'normal' in img_path :
        m = 2
    elif 'incorrect_mask' in img_path :
        m = 1
    else :
        m = 0

    if 'female' in img_path :
        g = 1
    else :
        g = 0

    age = int(img_path.split('_')[3][:2])
    if age < 30 :
        a = 0
    elif age < 58 :
        a = 1
    else : 
        a = 2
    
    return 6*m + 3*g + a

In [None]:
# Define Dataset
class FaceDataset(Dataset) :
    '''
    training의 여부에 따라 Label의 반환 여부가 결정됩니다.
    return type은 dict입니다.
    '''
    def __init__(self, img_paths, trsf, aug=None, training=False) :
        self.img_paths = img_paths
        self.trsf = trsf
        self.aug = aug
        self.training = training
    
    def __len__(self) :
        return len(self.img_paths)

    def __getitem__(self, idx) :
        img = cv2.imread(self.img_paths[idx])
        
        if self.aug is not None :
            img = self.aug(image=img)['image']
            img = self.trsf(img)
        else : 
            img = self.trsf(img)
        
        if self.training :
            label = labeling(self.img_paths[idx])
            return {'image' : img, 'label' : label}
        else :
            return {'image' : img}

## Define Model 

In [None]:
# Deffine Model
class MyEfficientNet(nn.Module) :
    '''
    EfiicientNet-b4의 출력층만 변경합니다.
    한번에 18개의 Class를 예측하는 형태의 Model입니다.
    '''
    def __init__(self) :
        super(MyEfficientNet, self).__init__()
        self.EFF = EfficientNet.from_pretrained('efficientnet-b4', in_channels=3, num_classes=18)
    
    def forward(self, x) :
        x = self.EFF(x)
        x = F.softmax(x, dim=1)
        return x

## Define Transform and Augs

In [None]:
class ToTensor(object) :
    'Input Image를 255로 나눈 후, FloatTensor로 반환합니다.'
    def __call__(self, img) :
        img = np.array(img)/255
        img = img.transpose((2,0,1))
        img = torch.FloatTensor(img)
        return img

In [None]:
# Set Transform
'Train에 경우에만, 일부 Augmentation을 추가합니다.'
'다른 실험의 경우 CenterCrop의 Size를 변경합니다.'
train_transform = T.Compose([
    T.ToPILImage(),
    T.CenterCrop([300,256]),
    T.RandomHorizontalFlip(0.5),
    T.RandomRotation(10),
    ToTensor(),
    T.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

valid_transform = T.Compose([
    T.ToPILImage(),
    T.CenterCrop([300,256]),
    ToTensor(),
    T.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

## Set Train Options

In [None]:
# Set Hyper-params
'Batch Size는 Center Crop의 크기에 따라 변경될 수 있습니다.'
batch_size = 32
lr = 1e-4
epochs = 20

In [None]:
# Set Weight
'전체 Class의 분포에 따라 가중치를 구합니다.'
img_paths = [str(x) for x in list(path.glob('*/*')) if '._' not in str(x)]
img_labels = list(map(labeling, img_paths))

label_weights = pd.Series(img_labels).value_counts().sort_index()
label_weights = torch.FloatTensor([1-(x/(sum(label_weights))) for x in label_weights])
label_weights = label_weights.to(device)

## Training

In [None]:
# Training and Validating
'일부 Fold만을 사용해 Testing할 경우를 대비한 Code 설계입니다.'
folds_index = [0,1,2,3,4] # 0 ~ 4
'Loss Function 실험 시 쉽게 변경할 수 있도록 한 Code 설계입니다.'
loss_fn = torch.nn.CrossEntropyLoss(weight=label_weights)

for fold in folds_index :
    print(f'Now Training Fold is {fold}')
    valid_loss_min = 3
    early_stop_cnt = 0
    
    '각 Fold에 해당하는 Dataset과 Dataloader가 정의되는 Code입니다.'
    train_img_paths, valid_img_paths = [], []
    for train_dir in img_dirs[folds[fold]['train']] :
        train_img_paths.extend(glob.glob(train_dir+'/*'))
    for valid_dir in img_dirs[folds[fold]['valid']] :
        valid_img_paths.extend(glob.glob(valid_dir+'/*'))
    
    train_dataset = FaceDataset(train_img_paths, train_transform, training=True)
    valid_dataset = FaceDataset(valid_img_paths, valid_transform, training=True)
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=3)
    valid_loader = DataLoader(dataset=valid_dataset, batch_size=batch_size//4, shuffle=True, num_workers=3)
    
    # Re-training Code
    '학습이 도중에 종료될 경우에도, 재학습이 가능하도록 설계한 Code입니다.'
    retrain_model = glob.glob(f'custom_data/{name}/{fold}fold*{name}.pth')
    if retrain_model :
        _, _, ep, loss, _ = ''.join(retrain_model).split('_')
        ep = int(ep.replace('epoch', ''))
        valid_loss_min = float(loss)
        model = MyEfficientNet()
        model.EFF._dropout = torch.nn.Dropout(p=0.7, inplace=False)
        model.load_state_dict(torch.load(retrain_model[0]))
        model.to(device)
    else : 
        ep = 0
        model = MyEfficientNet()
        model.EFF._dropout = torch.nn.Dropout(p=0.7, inplace=False)
        model = model.to(device)
    
    'Optimizer 실험 시 변경이 쉽도록 설계한 Code입니다.'
    optim = AdamP(model.parameters(), lr=lr)
    
    for epoch in range(epochs) :
        # If Re-training
        if epoch < ep :
            continue
            
        # Training
        '학습의 진행 상황을 tqdm으로 표시합니다.'
        with tqdm(train_loader,
                 total=train_loader.__len__(),
                 unit='batch') as train_bar :
            train_f1 = []
            train_loss = []
            for sample in train_bar :
                train_bar.set_description(f'Epoch {epoch+1} / {epochs}')
                imgs = sample['image'].float().to(device)
                labels = sample['label'].long().to(device)
                
                model.train()
                optim.zero_grad()
                pred = model(imgs)
                loss = loss_fn(pred, labels)
                loss.backward()
                optim.step()
                
                'f1 score와 loss가 표시됩니다.'
                train_f1.append(f1_score(labels.cpu().detach().float(), torch.argmax(pred.cpu().detach(), 1), average='macro'))
                train_loss.append(loss.item())
                train_bar.set_postfix(f1=np.mean(train_f1), loss=np.mean(train_loss), Train=epoch+1)
        
        # Validating
        '검증의 진행 상황을 tqdm으로 표시합니다.'
        with tqdm(valid_loader,
                 total=valid_loader.__len__(),
                 unit='batch') as valid_bar :
            valid_f1 = []
            valid_loss = []
            for sample in valid_bar :
                valid_bar.set_description(f'Epoch {epoch+1} / {epochs}')
                imgs = sample['image'].float().to(device)
                labels = sample['label'].long().to(device)
                
                model.eval()
                optim.zero_grad()
                with torch.no_grad() : 
                    pred = model(imgs)
                    loss = loss_fn(pred, labels)

                'f1 score와 loss가 표시됩니다.'
                valid_f1.append(f1_score(labels.cpu().detach().float(), torch.argmax(pred.cpu().detach(), 1), average='macro'))
                valid_loss.append(loss.item())
                valid_bar.set_postfix(f1=np.mean(valid_f1), loss=np.mean(valid_loss), Valid=epoch+1)
        
        '검증 단계에서 Loss가 더 낮아지는 경우, 해당 Model을 저장하는 Code입니다.'
        if np.mean(valid_loss) < valid_loss_min :
            valid_loss_min = np.mean(valid_loss)
            early_stop_cnt = 0
            for f in glob.glob(f'custom_data/{name}/{fold}fold_*{name}.pth') :
                open(f, 'w').close()
                os.remove(f)
            torch.save(model.state_dict(), f'custom_data/{name}/{fold}fold_{epoch+1}epoch_{np.mean(valid_loss):2.4f}_{name}.pth')
        else :
            '학습이 진전되지 않는 경우 조기종료하는 Code입니다.'
            early_stop_cnt += 1
            if early_stop_cnt >= 5 : break

## Inference

In [None]:
# Ensemble Inference
'해당 실험에서 학습한 Model을 Ensemble하는 Code입니다.'
os.chdir(ROOT_PATH)
submission = pd.read_csv('input/data/eval/info.csv')
test_img_paths = [os.path.join('input/data/eval/images', img_file) for img_file in submission.ImageID]

test_dataset = FaceDataset(test_img_paths, valid_transform, training=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

prediction_lst = []
for best_model in glob.glob(f'custom_data/{name}/*{name}.pth') :
    model = MyEfficientNet()
    model.load_state_dict(torch.load(best_model))
    model.to(device)
    model.eval()
    prediction_array=[]
    
    with tqdm(test_loader,
             total=test_loader.__len__(),
             unit='batch') as test_bar :
        for sample in test_bar :
            imgs = sample['image'].float().to(device)
            probs = model(imgs)
            probs = probs.cpu().detach().numpy()
            prediction_array.extend(probs)
    
    prediction_lst.append(np.array(prediction_array)[...,np.newaxis])

submission['ans'] = np.argmax(np.mean(np.concatenate(prediction_lst, axis=2), axis=2), axis=1)
submission.to_csv(f'custom_data/{name}/{name}.csv', index=False)