In [1]:
import os
import random
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
#import cv2

import torch
import torch.utils.data as data
import torch.nn as nn

import torchvision
from torchvision import datasets, models, transforms
from torchvision.transforms import Resize, ToTensor, Normalize, CenterCrop

import timm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
# random seed 고정
def seed_everything(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)

seed_everything(3033)

class AgeGroup:
    map_label = lambda x: 0 if int(x) < 30 else 1 if int(x) < 58 else 2

class MaskBaseDataset(data.Dataset):
    num_classes = 3 * 2 * 3

    image_paths = []
    mask_labels = []
    gender_labels = []
    age_labels = []

    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.setup()

    def set_transform(self, transform):
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)
    
    def setup(self):
        # mislabel 데이터들의 폴더명을 수정해서 폴더명을 기준으로 label 진행
        profiles = os.listdir(self.img_dir)
        for profile in profiles:
            for file_name in os.listdir(os.path.join(self.img_dir, profile)):
                _file_name, ext = os.path.splitext(file_name)
                img_path = os.path.join(self.img_dir, profile, file_name)
                self.image_paths.append(img_path)

                # mask_label
                mask_label = 0 if _file_name.startswith('mask') else ['mask','incorrect_mask','normal'].index(_file_name)
                self.mask_labels.append(mask_label)

                # gender_label
                user_id, gender, area, age = profile.split("_") # 000001_male_Asian_01
                gender_label = ['male', 'female'].index(gender)
                self.gender_labels.append(gender_label)
                
                # age_label
                age_label = AgeGroup.map_label(age)
                self.age_labels.append(age_label)

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        image = Image.open(image_path)
        
        mask_label = self.mask_labels[index]
        gender_label = self.gender_labels[index]
        age_label = self.age_labels[index]

        image_transform = self.transform(image)
        return image_transform, mask_label,gender_label,age_label

In [3]:
# Custom CosineAnnealingWarmRestarts (warm up start, max 감소 추가)
# 출처: https://gaussian37.github.io/dl-pytorch-lr_scheduler/#custom-cosineannealingwarmrestarts-1

import math
from torch.optim.lr_scheduler import _LRScheduler

class CosineAnnealingWarmUpRestarts(_LRScheduler):
    def __init__(self, optimizer, T_0, T_mult=1, eta_max=0.1, T_up=0, gamma=1., last_epoch=-1):
        if T_0 <= 0 or not isinstance(T_0, int):
            raise ValueError("Expected positive integer T_0, but got {}".format(T_0))
        if T_mult < 1 or not isinstance(T_mult, int):
            raise ValueError("Expected integer T_mult >= 1, but got {}".format(T_mult))
        if T_up < 0 or not isinstance(T_up, int):
            raise ValueError("Expected positive integer T_up, but got {}".format(T_up))
        self.T_0 = T_0
        self.T_mult = T_mult
        self.base_eta_max = eta_max
        self.eta_max = eta_max
        self.T_up = T_up
        self.T_i = T_0
        self.gamma = gamma
        self.cycle = 0
        self.T_cur = last_epoch
        super(CosineAnnealingWarmUpRestarts, self).__init__(optimizer, last_epoch)
    
    def get_lr(self):
        if self.T_cur == -1:
            return self.base_lrs
        elif self.T_cur < self.T_up:
            return [(self.eta_max - base_lr)*self.T_cur / self.T_up + base_lr for base_lr in self.base_lrs]
        else:
            return [base_lr + (self.eta_max - base_lr) * (1 + math.cos(math.pi * (self.T_cur-self.T_up) / (self.T_i - self.T_up))) / 2
                    for base_lr in self.base_lrs]

    def step(self, epoch=None):
        if epoch is None:
            epoch = self.last_epoch + 1
            self.T_cur = self.T_cur + 1
            if self.T_cur >= self.T_i:
                self.cycle += 1
                self.T_cur = self.T_cur - self.T_i
                self.T_i = (self.T_i - self.T_up) * self.T_mult + self.T_up
        else:
            if epoch >= self.T_0:
                if self.T_mult == 1:
                    self.T_cur = epoch % self.T_0
                    self.cycle = epoch // self.T_0
                else:
                    n = int(math.log((epoch / self.T_0 * (self.T_mult - 1) + 1), self.T_mult))
                    self.cycle = n
                    self.T_cur = epoch - self.T_0 * (self.T_mult ** n - 1) / (self.T_mult - 1)
                    self.T_i = self.T_0 * self.T_mult ** (n)
            else:
                self.T_i = self.T_0
                self.T_cur = epoch
                
        self.eta_max = self.base_eta_max * (self.gamma**self.cycle)
        self.last_epoch = math.floor(epoch)
        for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
            param_group['lr'] = lr

In [4]:
class cfg:
    # 경로 설정
    path = '../input/data/'
    train_path = path + 'train/'
    train_img_path = train_path + 'images/'
    test_path = path + 'eval/'
    test_img_path = test_path + 'images/'
    
    # 학습 관련
    epochs = 5
    batch_size = 16

In [5]:
# 아키텍쳐 선택
# 출처: https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/efficientnet.py
class ModelEfficientNet(nn.Module):
    # 큰 모델들보다 b4가 좋았음
    def __init__(self, model_arch='tf_efficientnet_b4_ns', n_class=18, pretrained=True):
        super().__init__()
        self.model = timm.create_model(model_arch, pretrained=pretrained)
        n_features = self.model.classifier.in_features
        # 1792, 18로 설정
        self.model.classifier = nn.Linear(n_features, n_class)
        
    def forward(self, x):
        return self.model(x)
    
class MaskViTClassifier(nn.Module):
    def __init__(self, model_arch, n_classes, pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_arch, pretrained=pretrained)
        n_features = self.model.head.in_features
        self.model.head = nn.Linear(n_features, n_classes)

    def forward(self, x):
        x = self.model(x)
        return x

In [6]:
# loss 설정

# https://discuss.pytorch.org/t/is-this-a-correct-implementation-for-focal-loss-in-pytorch/43327/8
class FocalLoss(nn.Module):
    def __init__(self, weight=None, gamma=5., reduction='mean'):
        nn.Module.__init__(self)
        self.weight = weight
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, input_tensor, target_tensor):
        log_prob = F.log_softmax(input_tensor, dim=-1)
        prob = torch.exp(log_prob)
        return F.nll_loss(
            ((1 - prob) ** self.gamma) * log_prob,
            target_tensor,
            weight=self.weight,
            reduction=self.reduction
        )
    
    
class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes=18, smoothing=0.0, dim=-1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes
        self.dim = dim

    def forward(self, pred, target):
        pred = pred.log_softmax(dim=self.dim)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.cls - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))

In [7]:
# CenterCrop
# Normalize
# Resize
# https://pytorch.org/vision/stable/transforms.html
transform = transforms.Compose([ToTensor(), 
                                Normalize(mean=(0.558, 0.524, 0.499), 
                                          std=(0.234, 0.243, 0.246))
                                ])

# Dataset
dataset = MaskBaseDataset(img_dir=cfg.train_img_path, transform=transform)


#train : val -> 8 : 2
n_val = int(len(dataset) * 0.2)
n_train = len(dataset) - n_val
train_dataset, val_dataset = data.random_split(dataset, [n_train, n_val])


# DataLoader
# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
train_loader = data.DataLoader(
    train_dataset,
    batch_size=cfg.batch_size,
    num_workers=4,
    shuffle=True
)

val_loader = data.DataLoader(
    val_dataset,
    batch_size=cfg.batch_size,
    num_workers=4,
    shuffle=False
)


# 학습 안나누고 수행
full_loader = data.DataLoader(
    dataset,
    batch_size=cfg.batch_size,
    num_workers=4,
    shuffle=True
)


# Model
model = ModelEfficientNet().to(device)



# CrossEntropy Loss 적용
# https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
ce_loss = nn.CrossEntropyLoss()
ls_loss = LabelSmoothingLoss()


# Optimizer
# https://pytorch.org/docs/stable/optim.html#constructing-it
optimizer = torch.optim.Adam(model.parameters(), lr = 1e-4, weight_decay = 1e-6)
#optimizer = torch.optim.SGD(model.parameters(), lr=1e-4, momentum=0.9)

# lr_Scheduler
#scheduler = CosineAnnealingWarmUpRestarts(optimizer, T_0=cfg.epochs, T_mult=1, eta_max=1e-5, T_up=1, gamma=0.5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=cfg.epochs, T_mult=1, eta_min=1e-7)

for epoch in range(cfg.epochs):
    model.train()
    
    for inputs, mask_label,gender_label,age_label in tqdm(full_loader):
        inputs = inputs.to(device)
        mask_label = mask_label.to(device)
        gender_label = gender_label.to(device)
        age_label = age_label.to(device)
        outputs = model(inputs)
        optimizer.zero_grad()
        
        labels = torch.cat([mask_label,gender_label,age_label],dim=-1)

        mask_loss = ce_loss(outputs[:,:3],mask_label)
        gender_loss = ce_loss(outputs[:,3:5],gender_label)
        age_loss = ls_loss(outputs[:,5:8],age_label)        
        
        loss = (mask_loss+gender_loss+age_loss)/3
        loss.backward()
        optimizer.step()

    scheduler.step()

100%|██████████| 1182/1182 [07:06<00:00,  2.77it/s]
100%|██████████| 1182/1182 [07:06<00:00,  2.77it/s]
100%|██████████| 1182/1182 [07:05<00:00,  2.78it/s]
100%|██████████| 1182/1182 [07:05<00:00,  2.78it/s]
100%|██████████| 1182/1182 [07:04<00:00,  2.78it/s]


In [8]:
from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize,CenterCrop
class TestDataset(data.Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
        return image

    def __len__(self):
        return len(self.img_paths)


# meta 데이터와 이미지 경로를 불러옵니다.
submission = pd.read_csv(cfg.test_path + 'info.csv')
image_dir = os.path.join(cfg.test_img_path)


# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]

transform = transforms.Compose([ToTensor(), 
                                Normalize(mean=(0.558, 0.524, 0.499), 
                                          std=(0.234, 0.243, 0.246))
                                ])

dataset = TestDataset(image_paths, transform)

loader = data.DataLoader(
    dataset,
    shuffle=False
)
# 모델을 정의합니다. (학습한 모델이 있다면 torch.load로 모델을 불러주세요!)


model.eval()


# 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
all_predictions = []
for images in tqdm(loader):
    with torch.no_grad():
        images = images.to(device)
        pred = model(images)
        mask_pred = pred[:,:3].argmax(dim=-1) # mask classification
        gender_pred = pred[:,3:5].argmax(dim=-1) # gender classiification
        age_pred = pred[:,5:].argmax(dim=-1) # age classification
        pred = (mask_pred * 6 + gender_pred * 3 + age_pred).cpu().numpy()
        all_predictions.extend(pred)
submission['ans'] = all_predictions



# 제출할 파일을 저장합니다. / submission(033_2_5_0303).csv
submission.to_csv(cfg.test_path + 'submission(full_effb4_05_16).csv', index=False)

100%|██████████| 12600/12600 [10:11<00:00, 20.60it/s]


In [9]:
torch.save(model,'full_effb4_05_16.pth')