# Image Classification
사람 이미지에서 마스크 착용 여부/성별/나이를 판별한다.

||마스크|성별|나이|
|---:|:---:|:---:|:---:|
|0| Wear | Male | < 30 |
|1| Incorrect | Female | >= 30 and < 60  |
|2| Not wear | - | >= 60 |

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from glob import glob
import PIL
import torchvision
from torchvision import transforms
import torch.utils.data as data
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import StratifiedKFold
import random


from torch.utils.tensorboard import SummaryWriter

from albumentations import *
from albumentations.pytorch import ToTensorV2

import os

from adamp import AdamP
from torch.optim.lr_scheduler import StepLR
from efficientnet_pytorch import EfficientNet

import wandb

초기 설정

In [None]:
def seed_everything(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)
    
    print(f'이 실험은 seed {seed}로 고정되었습니다.')

In [None]:
class conf:
    seed = 2021
    data_dir = 'input/data/train'
    n_fold = 5
    batch_size = 8
    mask_class = 3
    gender_class = 2
    age_class = 3
    
seed_everything(conf.seed)

In [None]:
data_dir = 'input/data/train'
img_dir = f'{data_dir}/images'
df_path = f'{data_dir}/train.csv'

## Dataset
- 마스크 데이터셋
- 성별 데이터셋
- 나이 데이터셋

분류하여 생성

In [None]:
def get_mask_label(image_path):
    image_name = image_path.split('/')[-1]
    if 'incorrect' in image_name:
        return 1
    elif 'normal' in image_name:
        return 2
    elif 'mask' in image_name:
        return 0
    else:
        raise ValueError(f"No mask class for {image_name}")
        
def get_gender_label(image_path):
    image_name = image_path.split('/')[-1]
    profile = image_path.split('/')[-2]
    image_id, gender, race, age = profile.split("_")
    if 'male' == gender:
        return 0
    elif 'female' == gender:
        return 1
    else:
        raise ValueError(f"No gender class for {image_name}")

def get_age_label(image_path):
    image_name = image_path.split('/')[-1]
    profile = image_path.split('/')[-2]
    image_id, gender, race, age = profile.split("_")
    return 0 if int(age) < 30 else 1 if int(age) < 60 else 2

In [None]:
IMG_EXTENSIONS = [
    ".jpg", ".JPG", ".jpeg", ".JPEG", ".png",
    "PNG", ".ppm", ".PPM", ".bmp", ".BMP"
]

def is_image_file(filepath):
    return any(filepath.endswith(extension) for extension in IMG_EXTENSIONS)

def remove_hidden_file(filepath):
    filename = filepath.split('/')[-1]
    return False if filename.startswith('._') else True

In [None]:
def get_img(path):
    im_bgr = cv2.imread(path)
    im_rgb = im_bgr[:, :, ::-1]
    return im_rgb

#### MaskDataset

In [None]:
class MaskDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        
        self.image_paths = []
        profiles = os.listdir(self.image_dir)
        for profile in profiles:
            if remove_hidden_file(profile):
                for file_name in os.listdir(f'{image_dir}/{profile}'):
                    img_path = os.path.join(image_dir, profile, file_name)
                    if is_image_file(img_path):
                        self.image_paths.append(img_path)
        
        self.image_paths = list(filter(is_image_file, self.image_paths))
        self.image_paths = list(filter(remove_hidden_file, self.image_paths))
        
        self.labels = [get_mask_label(path) for path in self.image_paths]
        
    def __getitem__(self, idx):
        image_path = self.image_paths[idx];
        label = self.labels[idx]
        image = get_img(image_path)
        
        if self.transform:
            image = self.transform(image = image)['image']
        #label = torch.eye(18)[label]
        return image, label
    
    def __len__(self):
        return len(self.image_paths)

#### GenderDataset

In [None]:
class GenderDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        
        self.image_paths = []
        profiles = os.listdir(self.image_dir)
        for profile in profiles:
            if remove_hidden_file(profile):
                for file_name in os.listdir(f'{image_dir}/{profile}'):
                    img_path = os.path.join(image_dir, profile, file_name)
                    if is_image_file(img_path):
                        self.image_paths.append(img_path)
        
        self.image_paths = list(filter(is_image_file, self.image_paths))
        self.image_paths = list(filter(remove_hidden_file, self.image_paths))
        
        self.labels = [get_gender_label(path) for path in self.image_paths]
        
    def __getitem__(self, idx):
        image_path = self.image_paths[idx];
        label = self.labels[idx]
        image = get_img(image_path)
        
        if self.transform:
            image = self.transform(image = image)['image']
        #label = torch.eye(18)[label]
        return image, label
    
    def __len__(self):
        return len(self.image_paths)

#### AgeDataset

In [None]:
class AgeDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        
        self.image_paths = []
        profiles = os.listdir(self.image_dir)
        for profile in profiles:
            if remove_hidden_file(profile):
                for file_name in os.listdir(f'{image_dir}/{profile}'):
                    img_path = os.path.join(image_dir, profile, file_name)
                    if is_image_file(img_path):
                        self.image_paths.append(img_path)
        self.image_paths = list(filter(is_image_file, self.image_paths))
        self.image_paths = list(filter(remove_hidden_file, self.image_paths))
        
        self.labels = [get_age_label(path) for path in self.image_paths]
        
    def __getitem__(self, idx):
        image_path = self.image_paths[idx];
        label = self.labels[idx]
        image = get_img(image_path)
        
        if self.transform:
            image = self.transform(image = image)['image']
        #label = torch.eye(18)[label]
        return image, label
    
    def __len__(self):
        return len(self.image_paths)

## Transforms

In [None]:
def get_transforms(need=('train', 'val'), img_size=(512, 384), mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246)):
    transformations = {}
    if 'train' in need:
        transformations['train'] = Compose([
            CenterCrop(448, 336, p=1.0),
            Resize(img_size[0], img_size[1], p=1.0),
            HorizontalFlip(p=0.5),
            ShiftScaleRotate(p=0.3),
            HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2, p=0.3),
            RandomBrightnessContrast(brightness_limit=(-0.1, 0.1), contrast_limit=(-0.1, 0.1), p=0.3),
            Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
            CoarseDropout(p=0.3),
            GaussNoise(p=0.3),
            Cutout(p=0.3),
            ToTensorV2(p=1.0),
        ], p=1.0)
    if 'val' in need:
        transformations['val'] = Compose([
            CenterCrop(448, 336, p=1.0),
            Resize(img_size[0], img_size[1], p=1.0),
            Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.0)
    return transformations

In [None]:
custom_transforms = get_transforms()

In [None]:
mean, std = (0.485, 0.456, 0.406), (0.229, 0.224, 0.225)

## 모델
- Mask/Gender/Age 모델을 생성
  - pretrained된 efficientnet-b4
- Loss 함수 설정
- Optimizer 설정

In [None]:
MaskModel = EfficientNet.from_pretrained("efficientnet-b4", num_classes=conf.mask_class)
GenderModel = EfficientNet.from_pretrained("efficientnet-b4", num_classes=conf.gender_class)
AgeModel = EfficientNet.from_pretrained("efficientnet-b4", num_classes=conf.age_class)

In [None]:
criterion = CrossEntropyLoss().cuda()
mask_optimizer = AdamP(MaskModel.parameters(), lr=1e-4)
gender_optimizer = AdamP(GenderModel.parameters(), lr=1e-4)
age_optimizer = AdamP(AgeModel.parameters(), lr=1e-4)

In [None]:
MaskModel.cuda()
GenderModel.cuda()
AgeModel.cuda()

## DataLoader
Imbalance한 데이터에 CrossEntropyLoss를 사용하기 위해 WeightedRandomSampler를 사용하기로 결정

Counter를 이용해 각 라벨의 분포를 확인하고 weight 값을 계산

In [None]:
from collections import Counter
import seaborn as sns

#### Mask Train dataset
Mask의 경우 WeightedRandomSampler 사용이 필요

In [None]:
mask_train_dataset = MaskDataset(img_dir, custom_transforms['train'])

In [None]:
mask_counter = Counter(mask_train_dataset.labels)

In [None]:
plt.figure(figsize=(8,8))
sns.barplot(data = pd.DataFrame.from_dict([mask_counter]).melt(), x = "variable", y="value", hue="variable").set_title('Natural Images Class Distribution')

In [None]:
mask_class_count = [i for i in mask_counter.values()]
mask_class_weights = 1./torch.tensor(mask_class_count, dtype=torch.float)
mask_class_weights

In [None]:
mask_class_weights_all = mask_class_weights[mask_train_dataset.labels]
mask_class_weights_all

In [None]:
mask_weighted_sampler = WeightedRandomSampler(
    weights=mask_class_weights_all,
    num_samples=len(mask_class_weights_all),
    replacement=True
)

In [None]:
mask_train_loader = DataLoader(mask_train_dataset, batch_size = conf.batch_size, shuffle=False, num_workers=3, sampler=mask_weighted_sampler)

#### Gender Train dataset
Gender의 경우 분포가 거의 비슷하기 때문에 WeightedRandomSampler가 필요하지 않음

In [None]:
gender_train_dataset = GenderDataset(img_dir, custom_transforms['train'])
gender_train_loader = DataLoader(gender_train_dataset, batch_size = conf.batch_size, shuffle=True)

#### Age Train dataset
Age의 경우 WeightedRandomSampler 사용이 필요

In [None]:
age_train_dataset = AgeDataset(img_dir, custom_transforms['train'])

In [None]:
age_counter = Counter(age_train_dataset.labels)

In [None]:
plt.figure(figsize=(24,24))
sns.barplot(data = pd.DataFrame.from_dict([age_counter]).melt(), x = "variable", y="value", hue="variable").set_title('Natural Images Class Distribution')

In [None]:
age_class_count = [i for i in age_counter.values()]
age_class_weights = 1./torch.tensor(age_class_count, dtype=torch.float)

In [None]:
age_class_weights_all = age_class_weights[age_train_dataset.labels]
age_class_weights_all

In [None]:
age_weighted_sampler = WeightedRandomSampler(
    weights=age_class_weights_all,
    num_samples=len(age_class_weights_all),
    replacement=True
)

In [None]:
age_train_loader = DataLoader(age_train_dataset, batch_size = conf.batch_size, shuffle=False, num_workers=3, sampler=age_weighted_sampler)

## 학습

#### 학습을 위한 초기 설정

In [None]:
num_epochs = 5  # 학습할 epoch의 수
lr = 1e-4
lr_decay_step = 10
train_log_interval = 20  # logging할 iteration의 주기

# train_log_interval = 20  # logging할 iteration의 주기
mask_name = "mask_model"  # 결과를 저장하는 폴더의 이름
gender_name = "gender_model"
age_name = "age_model"

### MaskModel 학습

In [None]:
mask_scheduler = StepLR(mask_optimizer, lr_decay_step, gamma=0.5)

In [None]:
os.makedirs(os.path.join(os.getcwd(), 'results', mask_name), exist_ok=True)

counter = 0
patience = 10
accumulation_steps = 2
best_val_acc = 0
best_val_loss = np.inf
for epoch in range(num_epochs):
    # train loop
    MaskModel.train()
    loss_value = 0
    matches = 0
    for idx, train_batch in enumerate(mask_train_loader):
        inputs, labels = train_batch
        inputs = inputs.cuda()
        labels = labels.cuda()

        outs = MaskModel(inputs)
        pred = outs.data.cpu().numpy()
        preds = torch.argmax(outs, dim=-1)
        loss = criterion(outs, labels)

        loss.backward()
        
        # -- Gradient Accumulation
        if (idx+1) % accumulation_steps == 0:
            mask_optimizer.step()
            mask_optimizer.zero_grad()

        loss_value += loss.item()
        matches += (preds == labels).sum().item()
        if (idx + 1) % train_log_interval == 0:
            train_loss = loss_value / train_log_interval
            train_acc = matches / conf.batch_size / train_log_interval
            current_lr = mask_scheduler.get_last_lr()
            print(
                f"Epoch[{epoch}/{num_epochs}]({idx + 1}/{len(mask_train_loader)}) || "
                f"training loss {train_loss:4.4} || training accuracy {train_acc:4.2%} || lr {current_lr}"
            )
            
            if train_acc > best_val_acc:
                print("New best model for val accuracy! saving the model..")
                torch.save(MaskModel.state_dict(), f"results/{mask_name}/{epoch:03}_accuracy_{train_acc:4.2%}.ckpt")
                best_val_acc = train_acc
                counter = 0

            loss_value = 0
            matches = 0

    mask_scheduler.step()

### GenderModel 학습

In [None]:
gender_scheduler = StepLR(gender_optimizer, lr_decay_step, gamma=0.5)

In [None]:
os.makedirs(os.path.join(os.getcwd(), 'results', gender_name), exist_ok=True)

counter = 0
patience = 10
accumulation_steps = 2
best_val_acc = 0
best_val_loss = np.inf
for epoch in range(num_epochs):
    # train loop
    GenderModel.train()
    loss_value = 0
    matches = 0
    for idx, train_batch in enumerate(gender_train_loader):
        inputs, labels = train_batch
        inputs = inputs.cuda()
        labels = labels.cuda()

        outs = GenderModel(inputs)
        pred = outs.data.cpu().numpy()
        preds = torch.argmax(outs, dim=-1)
        loss = criterion(outs, labels)

        loss.backward()
        
        # -- Gradient Accumulation
        if (idx+1) % accumulation_steps == 0:
            gender_optimizer.step()
            gender_optimizer.zero_grad()

        loss_value += loss.item()
        matches += (preds == labels).sum().item()
        if (idx + 1) % train_log_interval == 0:
            train_loss = loss_value / train_log_interval
            train_acc = matches / conf.batch_size / train_log_interval
            current_lr = gender_scheduler.get_last_lr()
            print(
                f"Epoch[{epoch}/{num_epochs}]({idx + 1}/{len(gender_train_loader)}) || "
                f"training loss {train_loss:4.4} || training accuracy {train_acc:4.2%} || lr {current_lr}"
            )
            
            if train_acc > best_val_acc:
                print("New best model for val accuracy! saving the model..")
                torch.save(GenderModel.state_dict(), f"results/{gender_name}/{epoch:03}_accuracy_{train_acc:4.2%}.ckpt")
                best_val_acc = train_acc
                counter = 0

            loss_value = 0
            matches = 0

    gender_scheduler.step()

### AgeModel 학습

In [None]:
age_scheduler = StepLR(age_optimizer, lr_decay_step, gamma=0.5)

In [None]:
os.makedirs(os.path.join(os.getcwd(), 'results', age_name), exist_ok=True)

counter = 0
patience = 10
accumulation_steps = 2
best_val_acc = 0
best_val_loss = np.inf
for epoch in range(num_epochs):
    # train loop
    AgeModel.train()
    loss_value = 0
    matches = 0
    for idx, train_batch in enumerate(age_train_loader):
        inputs, labels = train_batch
        inputs = inputs.cuda()
        labels = labels.cuda()

        outs = AgeModel(inputs)
        pred = outs.data.cpu().numpy()
        preds = torch.argmax(outs, dim=-1)
        
        loss = criterion(outs, labels)

        loss.backward()
        
        # -- Gradient Accumulation
        if (idx+1) % accumulation_steps == 0:
            age_optimizer.step()
            age_optimizer.zero_grad()

        loss_value += loss.item()
        matches += (preds == labels).sum().item()
        if (idx + 1) % train_log_interval == 0:
            train_loss = loss_value / train_log_interval
            train_acc = matches / conf.batch_size / train_log_interval
            current_lr = age_scheduler.get_last_lr()
            print(
                f"Epoch[{epoch}/{num_epochs}]({idx + 1}/{len(age_train_loader)}) || "
                f"training loss {train_loss:4.4} || training accuracy {train_acc:4.2%} || lr {current_lr}"
            )
            
            if train_acc > best_val_acc:
                print("New best model for val accuracy! saving the model..")
                torch.save(AgeModel.state_dict(), f"results/{age_name}/{epoch:03}_accuracy_{train_acc:4.2%}.ckpt")
                best_val_acc = train_acc
                counter = 0

            loss_value = 0
            matches = 0

    age_scheduler.step()

### Inference를 위한 모델
Image를 입력받아 위에 정의한 MaskModel, GenderModel, AgeModel를 통해 출력한 각 결과들을 하나의 라벨로 합쳐 반환

In [None]:
class MyEnsemble(nn.Module):
    def __init__(self, modelA, modelB, modelC, nb_classes=18):
        super(MyEnsemble, self).__init__()
        self.modelA = modelA
        self.modelB = modelB
        self.modelC = modelC
        
    def forward(self, x):
        x1 = self.modelA(x.clone())  # clone to make sure x is not changed by inplace methods
        x2 = self.modelB(x.clone())
        x3 = self.modelC(x.clone())
        
        return x1.argmax(dim=-1).item() * 6 + x2.argmax(dim=-1).item() * 3 + x3.argmax(dim=-1).item()

In [None]:
ensemble = MyEnsemble(MaskModel, GenderModel, AgeModel)

### Inference

In [None]:
eval_dir = 'input/data/eval'
eval_img_dir = f'{eval_dir}/images'
eval_df_path = f'{eval_dir}/info.csv'

In [None]:
class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        image_path = self.img_paths[index];
        image = get_img(image_path)
        
        if self.transform:
            image = self.transform(image = image)['image']
        return image

    def __len__(self):
        return len(self.img_paths)

In [None]:
import torchvision

submission = pd.read_csv(os.path.join(eval_dir, 'info.csv'))
image_dir = os.path.join(eval_dir, 'images')

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]

tf = torchvision.transforms.Compose([
    CenterCrop(448, 336, p=1.0),
    Resize(512, 384, p=1.0),
    Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
    ToTensorV2(p=1.0),
])

test_dataset = TestDataset(image_paths, custom_transforms['val'])

test_loader = DataLoader(
    test_dataset,
    shuffle=False
)

ensemble.eval()

# 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
all_predictions = []
for images in test_loader:
    with torch.no_grad():
        images = images.cuda()
        pred = ensemble(images)
        all_predictions.append(pred)
submission['ans'] = all_predictions

# 제출할 파일을 저장합니다.
submission.to_csv(os.path.join(eval_dir, 'submission_modelsplit.csv'), index=False)
print('test inference is done!')