# 0. Libarary 불러오기 및 경로설정

In [193]:
#!pip install timm
#!pip install -U albumentations

Collecting timm
  Downloading timm-0.4.12-py3-none-any.whl (376 kB)
[K     |████████████████████████████████| 376 kB 2.5 MB/s eta 0:00:01
Installing collected packages: timm
Successfully installed timm-0.4.12


In [226]:
import os
import sys
from glob import glob
import requests
import random
import numpy as np
import pandas as pd
import cv2
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from tqdm.notebook import tqdm
from time import time

from albumentations import *

import torch
import torch.utils.data as data
import torch.nn as nn 
import torch.nn.functional as F
import torchvision

from pytz import timezone
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, models, transforms
from torchvision.models import resnet18 ,densenet161
from torchvision.transforms import Normalize, Resize, ToTensor

import matplotlib.pyplot as plt
import seaborn as sns
from albumentations import *
from albumentations.pytorch import ToTensorV2

import warnings
warnings.filterwarnings('ignore') #주로 버전 차이에서 오는 경고 무시하기 위함

## EDA

In [227]:
### Configurations
class cfg:
    data_dir = 'input/data/train'
    img_dir = f'{data_dir}/images'
    df_path = f'{data_dir}/train.csv'

In [228]:
num2class = ['incorrect_mask', 'mask1', 'mask2', 'mask3',
             'mask4', 'mask5', 'normal']
class2num = {k: v for v, k in enumerate(num2class)}

In [229]:
df = pd.read_csv(df_path)
df.head()

Unnamed: 0,id,gender,race,age,path
0,1,female,Asian,45,000001_female_Asian_45
1,2,female,Asian,52,000002_female_Asian_52
2,4,male,Asian,54,000004_male_Asian_54
3,5,female,Asian,58,000005_female_Asian_58
4,6,female,Asian,59,000006_female_Asian_59


# 1. Image Statistics

In [230]:
def get_ext(img_dir, img_id):
    filename = os.listdir(os.path.join(img_dir, img_id))[0]
    ext = os.path.splitext(filename)[-1].lower()
    return ext

In [231]:
def get_img_stats(img_dir, img_ids):
    img_info = dict(heights=[], widths=[], means=[], stds=[])
    for img_id in tqdm(img_ids):
        for path in glob(os.path.join(img_dir, img_id, '*')):
            img = np.array(Image.open(path))
            h, w, _ = img.shape
            img_info['heights'].append(h)
            img_info['widths'].append(w)
            img_info['means'].append(img.mean(axis=(0,1)))
            img_info['stds'].append(img.std(axis=(0,1)))
    return img_info

In [None]:
img_info = get_img_stats(img_dir, df.path.values) #데이터셋의 크기, rgb 평균, 표준편차, 

print(f'RGB Mean: {np.mean(img_info["means"], axis=0) / 255.}')
print(f'RGB Standard Deviation: {np.mean(img_info["stds"], axis=0) / 255.}')

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=2700.0), HTML(value='')))


RGB Mean: [0.56019358 0.52410121 0.501457  ]
RGB Standard Deviation: [0.23318603 0.24300033 0.24567522]


# 2. Dataset

In [None]:
def get_transforms(need=('train', 'val'), img_size=(512, 384), mean=(0.56019358, 0.52410121, 0.50145), std=(0.23318603 ,0.24300033, 0.24567522)):

    """
    train 혹은 validation의 augmentation 함수를 정의합니다. train은 데이터에 많은 변형을 주어야하지만, validation에는 최소한의 전처리만 주어져야합니다.
    
    Args:
        need: 'train', 혹은 'val' 혹은 둘 다에 대한 augmentation 함수를 얻을 건지에 대한 옵션입니다.
        img_size: Augmentation 이후 얻을 이미지 사이즈입니다.
        mean: 이미지를 Normalize할 때 사용될 RGB 평균값입니다.
        std: 이미지를 Normalize할 때 사용될 RGB 표준편차입니다.

    Returns:
        transformations: Augmentation 함수들이 저장된 dictionary 입니다. transformations['train']은 train 데이터에 대한 augmentation 함수가 있습니다.
    """
    transformations = {}
    if 'train' in need:
        transformations['train'] = Compose([
            Resize(img_size[0], img_size[1], p=1.0),
            GaussNoise(p=0.5),
            Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.0)
    if 'val' in need:
        transformations['val'] = Compose([
            Resize(img_size[0], img_size[1]),
            Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.0)
    return transformations

In [None]:
### 마스크 여부, 성별, 나이를 mapping할 클래스를 생성합니다.

class MaskLabels:
    mask = 0
    incorrect = 1
    normal = 2

class GenderLabels:
    male = 0
    female = 1

class AgeGroup:
    map_label = lambda x: 0 if int(x) < 30 else 1 if int(x) < 55 else 2

In [None]:
mean, std =(0.56019358, 0.52410121, 0.50145), (0.23318603 ,0.24300033, 0.24567522)

class MaskBaseDataset(data.Dataset):
    num_classes = 3 * 2 * 3

    _file_names = {
        "mask1.jpg": MaskLabels.mask,
        "mask2.jpg": MaskLabels.mask,
        "mask3.jpg": MaskLabels.mask,
        "mask4.jpg": MaskLabels.mask,
        "mask5.jpg": MaskLabels.mask,
        "incorrect_mask.jpg": MaskLabels.incorrect,
        "normal.jpg": MaskLabels.normal
    }

    image_paths = []
    mask_labels = []
    gender_labels = []
    age_labels = []

    def __init__(self, img_dir, transform=None):
        """
        MaskBaseDataset을 initialize 합니다.

        Args:
            img_dir: 학습 이미지 폴더의 root directory 입니다.
            transform: Augmentation을 하는 함수입니다.
        """
        self.img_dir = img_dir
        self.mean = mean
        self.std = std
        self.transform = transform

        self.setup()

    def set_transform(self, transform):
        """
        transform 함수를 설정하는 함수입니다.
        """
        self.transform = transform
        
    def setup(self):
        """
        image의 경로와 각 이미지들의 label을 계산하여 저장해두는 함수입니다.
        """
        profiles = os.listdir(self.img_dir)
        for profile in profiles:
            for file_name, label in self._file_names.items():
                img_path = os.path.join(self.img_dir, profile, file_name)  # (resized_data, 000004_male_Asian_54, mask1.jpg)
                if os.path.exists(img_path):
                    self.image_paths.append(img_path)
                    self.mask_labels.append(label)

                    id, gender, race, age = profile.split("_")
                    gender_label = getattr(GenderLabels, gender)
                    age_label = AgeGroup.map_label(age)

                    self.gender_labels.append(gender_label)
                    self.age_labels.append(age_label)

    def __getitem__(self, index):
        """
        데이터를 불러오는 함수입니다. 
        데이터셋 class에 데이터 정보가 저장되어 있고, index를 통해 해당 위치에 있는 데이터 정보를 불러옵니다.
        
        Args:
            index: 불러올 데이터의 인덱스값입니다.
        """
        # 이미지를 불러옵니다.
        image_path = self.image_paths[index]
        image = Image.open(image_path)
        
        # 레이블을 불러옵니다.
        mask_label = self.mask_labels[index]
        gender_label = self.gender_labels[index]
        age_label = self.age_labels[index]
        multi_class_label = mask_label * 6 + gender_label * 3 + age_label
        
        # 이미지를 Augmentation 시킵니다.
        image_transform = self.transform(image=np.array(image))['image']
        return image_transform, multi_class_label

    def __len__(self):
        return len(self.image_paths)

In [None]:
# 정의한 Augmentation 함수와 Dataset 클래스 객체를 생성합니다.
transform = get_transforms()

dataset = MaskBaseDataset(img_dir=img_dir)

# train dataset과 validation dataset을 8:2 비율로 나눕니다.
n_val = int(len(dataset) * 0.2)
n_train = len(dataset) - n_val
train_dataset, val_dataset = data.random_split(dataset, [n_train, n_val])

# 각 dataset에 augmentation 함수를 설정합니다.
train_dataset.dataset.set_transform(transform['train'])
val_dataset.dataset.set_transform(transform['val'])

In [None]:
# training dataloader은 데이터를 섞어주어야 합니다. (shuffle=True)
train_loader = data.DataLoader(
    train_dataset,
    batch_size=32,
    num_workers=4,
    shuffle=True
)

val_loader = data.DataLoader(
    val_dataset,
    batch_size=32,
    num_workers=4,
    shuffle=False
)

dataloaders = {"train": train_loader, "test": val_loader}

# 3. 학습 파라미터 및 모델 정의

class MyModel(nn.Module):
    def __init(self,num_classes: int = 18):
        super(MyModel, self).__init()
        self.features = nn.Sequential(
            nn.Conv2d(3,128, kernel_size=3,padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3,stride=2),
            nn.Conv2d(128,64,kernel_size=3,padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3,stride=2),
        )
        self.avgpool =nn.AdaptiveAvgPool2d((3,3))
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(64*3*3,  2048),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(2048,2048),
            nn.ReLU(inplace=True),
            nn.Linear(2048, 18),
        )
    
    def forward(self,x:torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x,1)
        x = self.classifier(x)
        return x

In [None]:
from torchvision.models import vgg19_bn

num_classes = 18
model = vgg19_bn(pretrained=True)
model.classifier = nn.Sequential(
    nn.Linear(512 * 7 * 7, 4096),
    nn.ReLU(True),
    nn.Dropout(),
    nn.Linear(4096, 4096),
    nn.ReLU(True),
    nn.Dropout(),
    nn.Linear(4096, num_classes),
)

#model

In [None]:
#하이퍼파라미터 지정 

# 학습 때 GPU 사용여부 결정
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# 학습률
LEARNING_RATE = 0.001
NUM_EPOCH = 1

# 모델 할당
model.to(device)

# weight 업데이트를 위한 optimizer를 Adam으로 사용함
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)  

### f1_loss

In [None]:
def f1_loss(y_true:torch.Tensor, y_pred:torch.Tensor, is_training=False) -> torch.Tensor:
    '''Calculate F1 score. Can work with gpu tensors
    
    The original implmentation is written by Michal Haltuf on Kaggle.
    
    Returns
    -------
    torch.Tensor
        `ndim` == 1. 0 <= val <= 1
    
    Reference
    ---------
    - https://www.kaggle.com/rejpalcz/best-loss-function-for-f1-score-metric
    - https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html#sklearn.metrics.f1_score
    - https://discuss.pytorch.org/t/calculating-precision-recall-and-f1-score-in-case-of-multi-label-classification/28265/6
    
    '''
    assert y_true.ndim == 1
    assert y_pred.ndim == 1 or y_pred.ndim == 2
    
    if y_pred.ndim == 2:
        y_pred = y_pred.argmax(dim=1)
        
    
    tp = (y_true * y_pred).sum().to(torch.float32)
    tn = ((1 - y_true) * (1 - y_pred)).sum().to(torch.float32)
    fp = ((1 - y_true) * y_pred).sum().to(torch.float32)
    fn = (y_true * (1 - y_pred)).sum().to(torch.float32)
    
    epsilon = 1e-7
    
    precision = tp / (tp + fp + epsilon)
    recall = tp / (tp + fn + epsilon)
    
    f1 = 2* (precision*recall) / (precision + recall + epsilon)
    f1.requires_grad = is_training
    return f1

# 4. train 

In [248]:
loss_fn = (torch.nn.CrossEntropyLoss())
### 학습 코드 시작
for epoch in range(NUM_EPOCH):
	for phase in ["train", "test"]:
		running_loss = 0.0
		running_acc = 0.0
		if phase == "train":
			my_resnet.train()
		elif phase == "test":
			my_resnet.eval()
		
		# 배치 단위로 data load하여서 작업 -> 이때 transpose 및 여러 함수가 적용된다.
		for ind, (images, labels) in enumerate(tqdm.tqdm(dataloaders[phase], leave=False)):
			images = torch.stack(list(images), dim=0).to(device)
			labels = torch.tensor(list(labels)).to(device)

			optimizer.zero_grad()  # parameter gradient를 업데이트 전 초기화함

			with torch.set_grad_enabled(
				phase == "train"
			):  
				logits = my_resnet(images)
				_, preds = torch.max(
					logits, 1
				)
				loss = loss_fn(logits, labels)

				if phase == "train":
					loss.backward()  # 모델의 예측 값과 실제 값의 CrossEntropy 차이를 통해 gradient
					optimizer.step()  # 계산된 gradient를 가지고 모델 업데이트

			running_loss += loss.item() * images.size(0)
			running_acc += torch.sum(
				preds == labels.data
			)

		# 한 epoch이 모두 종료되었을 때,
		epoch_loss = running_loss / len(dataloaders[phase].dataset)
		epoch_acc = running_acc / len(dataloaders[phase].dataset)

		print(f"현재 epoch-{epoch}의 {phase}-데이터 셋에서 평균 Loss : {epoch_loss:.3f}, 평균 Accuracy : {epoch_acc:.3f}")
print("학습 종료!")

  0%|          | 0/113 [00:00<?, ?it/s]          

현재 epoch-0의 train-데이터 셋에서 평균 Loss : 1.067, 평균 Accuracy : 0.704


                                                 

현재 epoch-0의 test-데이터 셋에서 평균 Loss : 0.945, 평균 Accuracy : 0.721
학습 종료!




# 5. Test Dataset 정의

In [251]:
class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
        return image

    def __len__(self):
        return len(self.img_paths)

# Test image path 설정
test_dir = "input/data/eval"
submission = pd.read_csv(os.path.join(test_dir, "info.csv"))
image_dir = os.path.join(test_dir, "images")

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]

# Test Dataset을 위한 준비 작업
transform = Compose([ToTensor()])
dataset = TestDataset(image_paths, transform)
loader = data.DataLoader(dataset, shuffle=False)

# 6. Inference

In [252]:
all_predictions = []
my_resnet.eval()
for images in loader:
    with torch.no_grad():
        images = images.to(device)
        pred = my_resnet(images)
        pred = pred.argmax(dim=-1)
        all_predictions.extend(pred.cpu().numpy())
submission['ans'] = all_predictions
print("test inference is done!")
submission.to_csv('.')
submission

KeyError: 'You have to pass data to augmentations as named arguments, for example: aug(image=image)'