In [None]:
import pandas as pd
import numpy as np
import os
from glob import glob
import random

from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset, Subset
from efficientnet_pytorch import EfficientNet
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
from adamp import AdamP
from tqdm import tqdm, notebook
from PIL import Image
from glob import glob
from torch.optim.lr_scheduler import StepLR
from torch.utils.tensorboard import SummaryWriter

# random seed
seed = 37
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
print(f'seed : {seed}')

# device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'device : {device}')
print(torch.cuda.get_device_properties(device))

# current working directory
cwd = os.getcwd()
print(f' current working direcory : {cwd}')

# Training data will be stored in this folder
name = 'restoration_final_dropout' # 경로 이름 지정
if not os.path.isdir(f'data_file/{name}') :
    os.chdir(os.path.join(cwd, 'data_file'))
    os.mkdir(f'{name}')
    os.chdir(cwd)

In [None]:
path = Path('input/data/train/images')
image_dirs = [str(x) for x in list(path.glob('*')) if '._' not in str(x)]
# 'input/data/train/images/003277_female_Asian_19' 이런 형태가 나온다.
# np.array 형식으로 변환
image_dirs = np.array(image_dirs)

# 나이와 성별 구분이 문제니까 이 두 개를 기준으로 나눠서 stratified_kfold를 사용하면 imbalance를 조금 방지할 수 있지 않을까?
# 나이 성별 정보면 이용해서 데이터 나누기
# 나이가 60세 이상 정보가 너무 부족하니까 학습시에 나이 58을 기준으로 나눠서 진행

def label_fold(image_dirs):
    stratified_kfold_label = []
    for image_dir in image_dirs :
        cnt = 0
        if 'female' in image_dir : cnt += 3
        else : cnt += 0 
        
        age = int(image_dir.split('_')[3][:2])
        if age < 30 : cnt += 0
        elif age < 58 : cnt += 1
        else : cnt += 2
        stratified_kfold_label.append(cnt)
    stratified_kfold_label = np.array(stratified_kfold_label)
    stratified_kfold = StratifiedKFold(n_splits=5, random_state=seed, shuffle=True)
    # Stratified K-Fold는 층화된 folds를 반환하는 기존 K-Fold의 변형된 방식이다. 각 집합에는 전체 집합과 거의 동일하게 클래스의 표본 비율이 포함된다. 
    # 불균형 클래스의 경우 사용을 많이 한다. 이를 통해 데이터별로 가지는 클래스의 분포를 맞춰줄 수 있을 것으로 기대한다.
    # train에 2700개의 id 중에서 4/5가 들어가고 valid에 1/5가 들어간다. 이걸 다른 모양으로 train할 때 다섯번 반복하게 된다.
    fold_list = []
    for train_data, valid_data in stratified_kfold.split(image_dirs, stratified_kfold_label) :
        fold_list.append({'train':train_data, 'valid':valid_data})
    return fold_list

fold_list = label_fold(image_dirs)

In [None]:
# 'Mask까지 포함해서, 다시 라벨링해서 원래 데이터 class로 맞춰준다.
def label_func(image_paths) :
        cnt = 0
        if 'normal' in image_paths : cnt += 12
        elif 'incorrect_mask' in image_paths : cnt += 6
        else : cnt += 0

        if 'female' in image_paths : cnt += 3
        else : cnt += 0

        age = int(image_paths.split('_')[3][:2])
        if age < 30 : cnt += 0
        elif age < 58 : cnt += 1
        else : cnt += 2

        return cnt

In [None]:
# Dataset
class MaskDataset(Dataset) :
    # image_paths는 path input/data/train/images/003277_female_Asian_19/mask3.jpg 이런 식으로 들어온다.
    def __init__(self, image_paths, transform, augment = None, train_TF = False):
        self.image_paths = image_paths
        self.transform = transform
        self.augment = augment
        self.train_TF = train_TF
        
    def __len__(self) :
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = np.array(Image.open(self.image_paths[idx]))
        
        if self.augment: # augmentation하는 경우에
            image = self.transform(self.augment(image = image)['image'])
        else:    
            image = self.transform(image)
            
        if self.train_TF : # 트레이닝하는 경우
            label = label_func(self.image_paths[idx])
            return {'image' : image, 'label' : label}
            
        else:
            return {'image' : image}


In [None]:
# Transform
'''
transforms.ToPILImage() - csv 파일로 데이터셋을 받을 경우, PIL image로 바꿔준다.
transforms.CenterCrop(size) - 가운데 부분을 size 크기로 자른다.
transforms.Grayscale(num_output_channels=1) - grayscale로 변환한다.
transforms.RandomAffine(degrees) - 랜덤으로 affine 변형을 한다.
transforms.RandomCrop(size) -이미지를 랜덤으로 아무데나 잘라 size 크기로 출력한다.
transforms.RandomResizedCrop(size) - 이미지 사이즈를 size로 변경한다
transforms.Resize(size) - 이미지 사이즈를 size로 변경한다
transforms.RandomRotation(degrees) 이미지를 랜덤으로 degrees 각도로 회전한다.
transforms.RandomResizedCrop(size, scale=(0.08, 1.0), ratio=(0.75, 1.3333333333333333)) - 이미지를 랜덤으로 변형한다.
transforms.RandomVerticalFlip(p=0.5) - 이미지를 랜덤으로 수직으로 뒤집는다. p =0이면 뒤집지 않는다.
transforms.RandomHorizontalFlip(p=0.5) - 이미지를 랜덤으로 수평으로 뒤집는다.
transforms.ToTensor() - 이미지 데이터를 tensor로 바꿔준다.
transforms.Normalize(mean, std, inplace=False) - 이미지를 정규화한다.
'''
# centercrop을 통해 얼굴에서 벗어나는 부분을 줄여준다. 
# 멘토님이 중요하게 augmentation해야 한다한 부분 중, rotation, flip이 이미지 데이터 생긴 형태와 가장 잘 맞는다고 판단해서 이 두개로 진행

train_transform = T.Compose([
    T.ToPILImage(),
    T.CenterCrop([300,250]),
    T.RandomRotation(10),
    T.RandomHorizontalFlip(0.5),
    T.ToTensor(),
    T.Normalize(mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246))
])

# valid_transform에서는 기본적인 것만 가져가고 augmentation한 것을 빼고 검증을 거친다.
valid_transform = T.Compose([
    T.ToPILImage(),
    T.CenterCrop([300,250]),
    T.ToTensor(),
    T.Normalize(mean=(0.56, 0.51, 0.48), std=(0.22, 0.24, 0.25))
])

In [None]:
# Model
# -- model
# 왜 relu를 이걸 만들 때 넣었는지 모르겠지만 생각보다 잘 동작하긴 했다.
# efficientnet 중에서 파라미터 수가 적당하고 성능도 괜찮은 b3, b4로 테스트를 진행했다.
class Myeff4Model(nn.Module) :
    def __init__(self) :
        super().__init__()
        self.model_name = EfficientNet.from_pretrained('efficientnet-b4', 
                                                in_channels=3, 
                                                num_classes=18) # weight가져오고 num_classes(두번째 파라미터로 학습시키는 class 수)
        self.model_name._dropout = nn.Dropout(p=0.7, inplace=False)
        
    def forward(self, x) :
        x = F.relu(self.model_name(x))
        return x

    
class Myeff3Model(nn.Module) :
    def __init__(self) :
        super().__init__()
        self.model_name = EfficientNet.from_pretrained('efficientnet-b3', 
                                                in_channels=3, 
                                                num_classes=18) # weight가져오고 num_classes(두번째 파라미터로 학습시키는 class 수)
    def forward(self, x) :
        x = F.relu(self.model_name(x))
        return x        

In [None]:
# Hyper-parameters
batch_size = 16
lr = 1e-4
epochs = 10

counter = 0
patience = 10
accumulation_steps = 2
best_val_acc = 0
best_val_loss = np.inf
lr_decay_step = 10

In [None]:
# Training and Validating

folds_index = [1, 2, 3, 4, 5] # 총 5개

for fold in folds_index :
    print(f'Fold number {fold}')
    min_loss = 3
    early_stop = 0

# -- kfold를 이용해서 dataset을 만들기 위한 경로 리스트
    train_image_paths, valid_image_paths = [], []

    # -- train_data
    for train_dir in image_dirs[fold_list[fold-1]['train']] :
        train_image_paths.extend(glob(train_dir+'/*'))
    train_dataset = MaskDataset(train_image_paths, train_transform, train_TF=True)
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=3)
    
    # -- valid_data
    for valid_dir in image_dirs[fold_list[fold-1]['valid']] :
        valid_image_paths.extend(glob(valid_dir+'/*'))
    valid_dataset = MaskDataset(valid_image_paths, valid_transform, train_TF=True)
    valid_loader = DataLoader(dataset=valid_dataset, batch_size=batch_size//4, shuffle=True, num_workers=3)
    
    # -- model
    model = MyModel()
    model = model.to(device)
    
    # -- loss & metric
    optimizer = AdamP(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()
    scheduler = StepLR(optimizer, lr_decay_step, gamma=0.5)

    # -- logging
    logger = SummaryWriter(log_dir=f"data_file/cv{fold}_{name}")
    for epoch in range(epochs) :
        
        # -- Train start
        with tqdm(train_loader, total=train_loader.__len__(), unit='batch') as train_depth :
            train_f1 = []
            train_loss = []
            for file_ in train_depth :
                train_depth.set_description(f'Epoch {epoch+1} / {epochs}')
                images = file_['image'].float().to(device)
                labels = file_['label'].long().to(device)
                
                model.train()
                optimizer.zero_grad()
                pred = model(images)
                loss = criterion(pred, labels)
                loss.backward()
                optimizer.step()
                
                # print f1 score and loss
                train_f1.append(f1_score(labels.cpu().detach().float(), torch.argmax(pred.cpu().detach(), 1), average='macro'))
                train_loss.append(loss.item())
                train_depth.set_postfix(f1=np.mean(train_f1), loss=np.mean(train_loss), Train=epoch+1)
        
        
        # --  Validation start
        with tqdm(valid_loader, total=valid_loader.__len__(), unit='batch') as valid_depth :
            
            valid_f1 = []
            valid_loss = []
            for file_ in valid_depth :
                valid_depth.set_description(f'Epoch {epoch+1} / {epochs}')
                imgs = file_['image'].float().to(device)
                labels = file_['label'].long().to(device)
                
                model.eval()
                optimizer.zero_grad()
                with torch.no_grad() : 
                    pred = model(imgs)
                    loss = criterion(pred, labels)

                # print f1 score and loss
                valid_f1.append(f1_score(labels.cpu().detach().float(), torch.argmax(pred.cpu().detach(), 1), average='macro'))
                valid_loss.append(loss.item())
                valid_depth.set_postfix(f1=np.mean(valid_f1), loss=np.mean(valid_loss), Valid=epoch+1)
        
        # 조기종료 조건 : 학습에서 Loss가 5번 이상 줄지 않으면 조기종료, 그렇지 않으면 저장
        if np.mean(valid_loss) < min_loss :
            min_loss = np.mean(valid_loss)
            early_stop = 0
            for f in glob(f'data_file/{name}/{fold}fold_*{name}.ckpt') :
                open(f, 'w').close()
                os.remove(f)
            torch.save(model.state_dict(), f'data_file/{name}/{fold}fold_{epoch+1}epoch_{np.mean(valid_loss):2.4f}_{name}.ckpt')
        else :
            early_stop += 1
            if early_stop >= 5 : break

In [None]:
# Inference
# test는 validation과 동일한 구조를 가지게 진행하고 불러오는 이미지만 변경
test_dir = '/opt/ml/input/data/eval'
submission = pd.read_csv(os.path.join(test_dir, 'info.csv'))
image_dir = os.path.join(test_dir, 'images')

test_image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]
test_dataset = MaskDataset(test_image_paths, valid_transform, train_TF = False)
test_loader = DataLoader(dataset=test_dataset, batch_size = batch_size, shuffle = False)

all_predictions = []
for best_model in glob(f'data_file/{name}/*{name}.ckpt') :
    
    model = MyModel()
    model.load_state_dict(torch.load(best_model))
    model.to(device)
    model.eval()
    prediction_array=[]
    
    with tqdm(test_loader, total=test_loader.__len__(), unit='batch') as test_depth :
        
        for file_ in test_depth :
            imgs = file_['image'].float().to(device)
            pred = model(imgs)
            pred = pred.cpu().detach().numpy()
            prediction_array.extend(pred)
    
    all_predictions.append(np.array(prediction_array)[...,np.newaxis])
submission['ans'] = np.argmax(np.mean(np.concatenate(all_predictions, axis=2), axis=2), axis=1)

submission.to_csv(f'data_file/{name}/{name}.csv', index=False)
print('test inference is done!')