#### google colab 환경에서 진행하였습니다.

In [None]:
!nvidia-smi

# dataset unzip & library install

In [None]:
# !unzip /content/drive/MyDrive/train.zip

In [None]:
# !unzip /content/drive/MyDrive/test.zip

In [None]:
# !pip install timm
# !pip install albumentations==0.4.6
# !pip install adamp

In [None]:
# !pip install opencv-python
# !apt-get install libgl1-mesa-glx -y
# !pip install seaborn

# EDA (Exploratory Data Analysis)

In [None]:
import os
import sys
from glob import glob
import numpy as np
import pandas as pd
import cv2
from PIL import Image
from tqdm.notebook import tqdm
from time import time

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
class cfg:
    data_dir = '/content'
    tr_dir = f'{data_dir}/train'
    te_dir = f'{data_dir}/test'

In [None]:
classes = ['dog', 'elephant', 'giraffe','guitar','horse','house','person']
num2class = {v: k for v, k in enumerate(classes)}
class2num = {k: v for v, k in enumerate(classes)}
print(num2class)
print(class2num)

In [None]:
d = dict(label=[], class_name=[], path=[])
img_info = dict(heights=[], widths=[], means=[], stds=[])
for num in range(7):
    class_name = num2class[num]
    class_dir = os.path.join(cfg.tr_dir, class_name)
    filename = os.listdir(class_dir)
    
    for img_name in filename:
        path = os.path.join(class_dir, img_name)
        img = np.array(Image.open(path))
        h, w, _ = img.shape
        d['label'].append(class2num[class_name])
        d['class_name'].append(class_name)
        d['path'].append(path)
        
        img_info['heights'].append(h)
        img_info['widths'].append(w)
        img_info['means'].append(img.mean(axis=(0,1)))
        img_info['stds'].append(img.std(axis=(0,1)))

tr_df = pd.DataFrame(data=d)

In [None]:
print(f'Total number of images is {len(tr_df)}')

print(f'Minimum height for dataset is {np.min(img_info["heights"])}')
print(f'Maximum height for dataset is {np.max(img_info["heights"])}')
print(f'Average height for dataset is {int(np.mean(img_info["heights"]))}')
print(f'Minimum width for dataset is {np.min(img_info["widths"])}')
print(f'Maximum width for dataset is {np.max(img_info["widths"])}')
print(f'Average width for dataset is {int(np.mean(img_info["widths"]))}')

print(f'RGB Mean: {np.mean(img_info["means"], axis=0) / 255.}')
print(f'RGB Standard Deviation: {np.mean(img_info["stds"], axis=0) / 255.}')

In [None]:
# data
tr_df.groupby(['class_name']).count()

# Model Training & Inference

In [None]:
from glob import glob
from sklearn.model_selection import StratifiedKFold
import cv2
import torch
from torch import nn
import os
import random
import torchvision
import pandas as pd
import numpy as np
from tqdm import tqdm
import math
import sys
import copy

import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SequentialSampler, RandomSampler
from torch.cuda.amp import autocast, GradScaler
from torch.nn.modules.loss import _WeightedLoss
import torch.nn.functional as F
from torch.optim.lr_scheduler import _LRScheduler

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import timm

import sklearn
import warnings
import joblib
from sklearn import metrics

import albumentations as A
from albumentations.pytorch import ToTensorV2

from adamp import AdamP

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# hyper parameter setting
class CFG:
    seed = 2021
    num_workers = 4
    
    img_size = 227
    
    fold_num = 5
    epoch = 30
    batch_size = 32
    lr = 1e-4
    
    T_0=10
    T_max=10
    min_lr = 1e-7
    scheduler = 'CosineAnnealingWarmRestarts'
    
    optimizer = 'AdamP'
    
    model = 'tf_efficientnet_b3_ns'
    
    pretrained = True
    mean= [0.5556861, 0.50740065, 0.45690217]
    std= [0.22876642, 0.21754766, 0.22090458]
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# 데이터 불균형 문제를 해결하기 위해 F1 loss와 Cross entropy loss를 결합
class CustomLoss(nn.Module):
    def __init__(self, classes=7, epsilon=1e-7):
        super().__init__()
        self.classes = classes
        self.epsilon = epsilon
    
    def forward(self, y_pred, y_true):
        assert y_pred.ndim == 2
        assert y_true.ndim == 1
        
        ce_loss =  nn.functional.cross_entropy(y_pred, y_true)
        f1_loss = self._f1_loss(y_pred, y_true)

        return f1_loss + ce_loss

    def _f1_loss(self, y_pred, y_true):
        y_true = F.one_hot(y_true, self.classes).to(torch.float32)
        y_pred = F.softmax(y_pred, dim=1)

        tp = (y_true * y_pred).sum(dim=0).to(torch.float32)
        tn = ((1 - y_true) * (1 - y_pred)).sum(dim=0).to(torch.float32)
        fp = ((1 - y_true) * y_pred).sum(dim=0).to(torch.float32)
        fn = (y_true * (1 - y_pred)).sum(dim=0).to(torch.float32)

        precision = tp / (tp + fp + self.epsilon)
        recall = tp / (tp + fn + self.epsilon)

        f1 = 2 * (precision * recall) / (precision + recall + self.epsilon)
        f1 = f1.clamp(min=self.epsilon, max=1 - self.epsilon)
        
        f1_loss = 1 - f1.mean()
        return f1_loss

In [None]:
class MyDataset(Dataset):
    def __init__(self, df, transforms=None, output_label=True):
        super().__init__()
        self.df = df.copy()
        self.transforms = transforms
        self.output_label = output_label
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index: int):
        if self.output_label:
            target = self.df[index][0]
        
        path = self.df[index][2]
        
        img = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB)
        
        if self.transforms:
            img = self.transforms(image=img)['image']
            
        if self.output_label:
            return img, target
        else:
            return img 

In [None]:
# Data Augmentation

def get_train_transforms():
      return A.Compose([
                      A.HueSaturationValue(), # 색상(Hue), 채도(Saturation), 명도(Value) 변경
                      A.OneOf([
                                A.OpticalDistortion(p=0.4), # 광학적 왜곡
                                A.GridDistortion(p=0.2),    # 격자 왜곡
                                A.IAAPiecewiseAffine(p=0.4),# affine transform
                      ], p=0.5),
                      A.RandomBrightnessContrast(brightness_limit=(-0.3, 0.3), contrast_limit = (-0.1, 0.1), p = 0.5), # 밝기 대비
                      A.Normalize(mean=CFG.mean, std=CFG.std, max_pixel_value=255.0, p = 1.0),
                      ToTensorV2(p=1.0)
                      ], p = 1.)

def get_valid_transforms():
      return A.Compose([
                      A.Normalize(mean=CFG.mean, std=CFG.std, max_pixel_value=255.0, p=1.0),
                      ToTensorV2(p = 1.0),
                      ], p = 1.)

In [None]:
# data loader setting

def prepare_dataloader(df, train_index, valid_index):
    
    train_ = df.values[train_index]
    valid_ = df.values[valid_index]
    
    train_ds = MyDataset(train_, transforms=get_train_transforms(), output_label=True)
    valid_ds = MyDataset(valid_, transforms=get_valid_transforms(), output_label=True)
    
    train_loader = torch.utils.data.DataLoader(
        train_ds,
        batch_size=CFG.batch_size,
        pin_memory=True,
        drop_last=False,
        shuffle=True,
        num_workers=CFG.num_workers,
    )
    val_loader = torch.utils.data.DataLoader(
        valid_ds,
        batch_size=CFG.batch_size,
        num_workers=CFG.num_workers,
        shuffle=False,
        pin_memory=True,
    )
    return train_loader, val_loader

In [None]:
# pretrained model(tf_efficientnet_b3_ns)을 fine tuning을 하는 방식으로 학습

class EffNetClassifier(nn.Module):
    def __init__(self, model_arch, n_class, pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_arch, pretrained=pretrained)
        n_features = self.model.classifier.in_features
        self.model.classifier = nn.Linear(n_features, n_class)
        
    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
# training

def train_one_epoch(epoch, model, loss_fn, optimizer, train_loader, device, scheduler=None):
    model.train()

    running_loss = None
    pbar = tqdm(enumerate(train_loader), total=len(train_loader), position=0, leave=True)
    for step, (imgs, image_labels) in pbar:
        imgs = imgs.to(device).float()
        image_labels = image_labels.to(device).long()

        with autocast():
            image_preds = model(imgs.float())
            
            loss = loss_fn(image_preds, image_labels)
            
            scaler.scale(loss).backward()

            if running_loss is None:
                running_loss = loss.item()
            else:
                running_loss = running_loss * .99 + loss.item() * .01

            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad() 

            description = f'epoch {epoch} loss: {running_loss:.4f}'
            
            pbar.set_description(description)
                
    if scheduler is not None:
        scheduler.step()

In [None]:
# validation measuring

def valid_one_epoch(epoch, model, loss_fn, val_loader, device):
    model.eval()

    loss_sum = 0
    sample_num = 0
    image_preds_all = []
    image_targets_all = []
    
    pbar = tqdm(enumerate(val_loader), total=len(val_loader), position=0, leave=True)
    for step, (imgs, image_labels) in pbar:
        imgs = imgs.to(device).float()
        image_labels = image_labels.to(device).long()
        
        image_preds = model(imgs)

        image_preds_all += [torch.argmax(image_preds, 1).detach().cpu().numpy()]
        image_targets_all += [image_labels.detach().cpu().numpy()]
        
        loss = loss_fn(image_preds, image_labels)
        
        loss_sum += loss.item()*image_labels.shape[0]
        sample_num += image_labels.shape[0]  

        if ((step + 1) % 1 == 0) or ((step + 1) == len(val_loader)):
            description = f'epoch {epoch} loss: {loss_sum/sample_num:.4f}'
            pbar.set_description(description)
    
    image_preds_all = np.concatenate(image_preds_all)
    image_targets_all = np.concatenate(image_targets_all)
    accuracy = (image_preds_all==image_targets_all).mean()
    f1 = f1_score(image_preds_all, image_targets_all, average='macro')
    print('validation multi-class accuracy = {:.4f}, f1 score = {:.4f}'.format(accuracy, f1))
    
    return accuracy, f1

In [None]:
class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        path = self.img_paths[index]
        img = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB)

        if self.transform:
            img = self.transform(image=img)['image']
        return img

    def __len__(self):
        return len(self.img_paths)

In [None]:
# test dataset setting

test_img_root = f'{cfg.te_dir}/0'  # 테스트 이미지 폴더의 경로
submission = pd.read_csv('/content/drive/MyDrive/test_answer_sample_.csv')

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(test_img_root, img_id) for img_id in os.listdir(test_img_root)]

test_dataset = TestDataset(image_paths, transform=get_valid_transforms())

test_loader = DataLoader(
    test_dataset,
    shuffle=False
)

In [None]:
seed_everything(CFG.seed)

oof_pred = None

# label별 분포를 고려하여 5개의 fold로 나누어 학습 
kfold = StratifiedKFold(n_splits=CFG.fold_num)
for fold , (train_index, valid_index) in enumerate(kfold.split(tr_df, tr_df["label"])):
    print('Training with {} started'.format(fold))
    
    train_loader, val_loader = prepare_dataloader(tr_df, train_index, valid_index)
    
    device = CFG.device
    model = EffNetClassifier(CFG.model, tr_df.label.nunique(), pretrained=True).to(device)
    
    optimizer = AdamP(model.parameters(), lr=CFG.lr)
        
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=CFG.T_0, T_mult=1, eta_min=CFG.min_lr ,last_epoch=-1)     
    
    loss_fn = CustomLoss().to(device)

    scaler = GradScaler()
    
    best_accuracy = 0
    best_f1 = 0
    best_epoch = 0
    stop_count = 0
    for epoch in range(CFG.epoch):
        train_one_epoch(epoch, model, loss_fn, optimizer, train_loader, device, scheduler=scheduler)

        with torch.no_grad():
            epoch_accuracy, epoch_f1 = valid_one_epoch(epoch, model, loss_fn, val_loader, device)

        if epoch_f1 > best_f1:
            stop_count = 0
            best_state_dict = copy.deepcopy(model.state_dict())
            
            best_f1 = epoch_f1
            best_epoch = epoch
            print('The model is saved!')
        else:
            # early stopping
            stop_count += 1
            if stop_count > 5:
                break
    
    # 저장되어 있는 best model을 load 해옵니다.
    model.load_state_dict(best_state_dict)
    
    # 각 fold에서 생성된 모델을 사용해 Test 데이터를 예측합니다.
    all_predictions = []
    with torch.no_grad():
        for images in test_loader:
            images = images.to(device)
            
            # Test Time Augmentation
            pred = model(images) / 2 # 원본 이미지를 예측하고
            pred += model(torch.flip(images, dims=(2, 3))) / 2 # flip으로 뒤집어 예측합니다. 
            all_predictions.extend(pred.cpu().numpy())

        fold_pred = np.array(all_predictions)
    
    if oof_pred is None:
        oof_pred = fold_pred / CFG.fold_num
    else:
        oof_pred += fold_pred / CFG.fold_num
        
    del model, optimizer, train_loader, val_loader, scaler, scheduler
    torch.cuda.empty_cache()
    print('Best Accuracy: {} in epoch {}'.format(best_f1, best_epoch))

In [None]:
oof_pred = np.argmax(oof_pred, axis=1)

In [None]:
# test dataset 정렬

def path2id(image_path):
    return int(image_path.split('/')[-1][:-4])

id_list = list(map(path2id,image_paths))

predictions = []
for i, pred in zip(id_list,oof_pred):
    predictions.append((i, pred))
predictions.sort(key=lambda x:x[0])

predictions = np.array([pred for i, pred in predictions])

submission['answer value'] = predictions
submission.to_csv('/content/drive/MyDrive/final_submission.csv', index=False)
print('test inference is done!')