In [31]:
import os
import cv2
import math
import random
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from scipy.stats import beta

from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

from transformers import get_cosine_schedule_with_warmup

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, sampler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torchvision
import torchvision.models as models
import torchvision.transforms.functional

import warnings
warnings.filterwarnings(action='ignore')

In [32]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Hyperparameter Setting

In [33]:
CFG = {
    'IMG_SIZE_B':384,
    'IMG_SIZE_L':224,
    'EPOCHS':400,
    'LEARNING_RATE':0.00002,
    'BATCH_SIZE':4,
    'SEED':41
}

# Fixed Random Seed

In [34]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED']) # Seed 고정

# Data Pre-processing

In [35]:
data_path = '../../data/'

In [69]:
train_df = pd.read_csv(os.path.join(data_path, 'train.csv'))
artists_df = pd.read_csv(os.path.join(data_path, 'artists_info.csv'))
artists_df.loc[artists_df['name'] == 'Albrecht Dürer', 'name'] = 'Albrecht Du rer'
train_df.loc[3896, 'artist'] = 'Titian'
train_df.loc[3986, 'artist'] = 'Alfred Sisley'

In [37]:
train_df = pd.merge(train_df, artists_df.loc[:, ['name', 'genre']], left_on='artist', right_on='name', how='left').drop(columns='name')
train_df.loc[train_df['artist']=='Diego Rivera', 'genre'] = 'Social Realism'

In [38]:
train_df.to_csv(os.path.join(data_path, 'train2.csv'), index=False)

In [39]:
df = pd.read_csv(os.path.join(data_path, 'train2.csv'))
df.head()

Unnamed: 0,id,img_path,artist,genre
0,0,./train/0000.jpg,Diego Velazquez,Baroque
1,1,./train/0001.jpg,Vincent van Gogh,Post-Impressionism
2,2,./train/0002.jpg,Claude Monet,Impressionism
3,3,./train/0003.jpg,Edgar Degas,Impressionism
4,4,./train/0004.jpg,Hieronymus Bosch,Northern Renaissance


In [40]:
def onehot_encoding_smoothing(x):
    if x['High Renaissance,Mannerism'] == 1:
        x['High Renaissance'] = 0.5
        x['Mannerism'] = 0.5
    elif x['Impressionism,Post-Impressionism'] == 1:
        x['Impressionism'] = 0.5
        x['Post-Impressionism'] = 0.5

    return x

def get_data(df, infer=False):
    if infer:
        return df['img_path'].apply(lambda p: os.path.join(data_path, p)).values

    onehot_encoding = pd.get_dummies(df['genre'])
    onehot_encoding = onehot_encoding.apply(lambda x: onehot_encoding_smoothing(x), axis=1)
    onehot_encoding.drop(['High Renaissance,Mannerism', 'Impressionism,Post-Impressionism'], axis=1, inplace=True)
    onehot_encoding = onehot_encoding.values

    return df['img_path'].apply(lambda p: os.path.join(data_path, p)).values, df['artist'].values, onehot_encoding.astype(float)

In [41]:
# Label Encoding
le = preprocessing.LabelEncoder()
df['artist'] = le.fit_transform(df['artist'].values)

# Train/Validation Split

In [42]:
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['artist'].values, random_state=CFG['SEED'])

train_df = train_df.sort_values(by=['id'])
val_df = val_df.sort_values(by=['id'])

train_img_paths, train_labels, train_genre_labels = get_data(train_df)
val_img_paths, val_labels, val_genre_labels = get_data(val_df)

# CustomDataset

In [43]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, genre_labels, transforms=None):
        self.img_paths = img_paths
        self.labels = labels
        self.genre_labels = genre_labels
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transforms is not None:
            image = self.transforms(image=image)['image']

        if self.labels is not None:
            label = self.labels[index]
            genre_label = self.genre_labels[index]
            return (image, label, genre_label)
        else:
            return image

    def __len__(self):
        return len(self.img_paths)

In [44]:
# EfficientNet_V2_M, ViT_B_16을 위한 변환기
train_transform_b = A.Compose([
    A.RandomResizedCrop(CFG['IMG_SIZE_B'], CFG['IMG_SIZE_B'], scale=(0.2, 0.8)),
    A.Transpose(p=0.5),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ShiftScaleRotate(p=0.5),
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=20, val_shift_limit=20, p=0.5),
    A.RandomBrightnessContrast(brightness_limit=(-0.1,0.1), contrast_limit=(-0.1, 0.1), p=0.5),
    A.ChannelShuffle(),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
    A.CoarseDropout(p=0.5),
    ToTensorV2()
])

test_transform_b = A.Compose([
    A.Resize(CFG['IMG_SIZE_B'],CFG['IMG_SIZE_B']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
    ToTensorV2()
])

# ViT_L_16을 위한 변환기
train_transform_l = A.Compose([
    A.RandomResizedCrop(CFG['IMG_SIZE_L'], CFG['IMG_SIZE_L'], scale=(0.2, 0.8)),
    A.Transpose(p=0.5),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ShiftScaleRotate(p=0.5),
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=20, val_shift_limit=20, p=0.5),
    A.RandomBrightnessContrast(brightness_limit=(-0.1,0.1), contrast_limit=(-0.1, 0.1), p=0.5),
    A.ChannelShuffle(),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
    A.CoarseDropout(p=0.5),
    ToTensorV2()
])

test_transform_l = A.Compose([
    A.Resize(CFG['IMG_SIZE_L'],CFG['IMG_SIZE_L']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
    ToTensorV2()
])

# Weighted Random Sampling

In [45]:
def make_weights(labels, nclasses):
    labels = np.array(labels)
    weight_arr = np.zeros_like(labels)

    _, counts = np.unique(labels, return_counts=True)
    for cls in range(nclasses):
        weight_arr = np.where(labels == cls, 1/counts[cls], weight_arr)
        # 각 클래스의의 인덱스를 산출하여 해당 클래스 개수의 역수를 확률로 할당한다.
        # 이를 통해 각 클래스의 전체 가중치를 동일하게 한다.

    return weight_arr

weights = make_weights(train_labels, len(np.unique(train_labels)))
weights = torch.DoubleTensor(weights)

In [46]:
# EfficientNet_V2_M, ViT_B_16을 위한 data loader
train_dataset_b = CustomDataset(train_img_paths, train_labels, train_genre_labels, train_transform_b)
train_loader_b = DataLoader(train_dataset_b, batch_size = CFG['BATCH_SIZE'], num_workers=0,
                          sampler=sampler.WeightedRandomSampler(weights, len(weights)))

val_dataset_b = CustomDataset(val_img_paths, val_labels, val_genre_labels, test_transform_b)
val_loader_b = DataLoader(val_dataset_b, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

# ViT_L_16을 위한 data loader
train_dataset_l = CustomDataset(train_img_paths, train_labels, train_genre_labels, train_transform_l)
train_loader_l = DataLoader(train_dataset_l, batch_size = CFG['BATCH_SIZE'], num_workers=0,
                          sampler=sampler.WeightedRandomSampler(weights, len(weights)))

val_dataset_l = CustomDataset(val_img_paths, val_labels, val_genre_labels, test_transform_l)
val_loader_l = DataLoader(val_dataset_l, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

# Model Define
* 장르 정보를 활용하지 않은 EfficientNet_V2_M
* 장르 정보를 활용하지 않은 ViT_B_16
* 장르 정보를 활용한 ViT_B_16
* 장르 정보를 활용하지 않은 ViT_L_16
* 장르 정보를 활용한 ViT_L_16

In [47]:
# EfficientNet_V2_M
class BaseModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_v2_m(weights=models.EfficientNet_V2_M_Weights)
        self.backbone.classifier = nn.Sequential(
            nn.Dropout(p=0.3, inplace=True),
            nn.Linear(in_features=1280, out_features=num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)
        return x

In [48]:
# ViT_B_16 & ViT_L_16
# Attention 기법이 사용된 ViT 모델
# torchvision.models.ViT_B_16_Weights.IMAGENET1K_SWAG_E2E_V1 기본 이미지 크기 = 384 
# torchvision.models.ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1 기본 이미지 크기 = 224
models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_SWAG_E2E_V1).encoder.pos_embedding.size()
# pos_embedding = (1, seq_length, hidden_dim) = (1, 577, 768)
# 이미지 크기 384, 패치 크기 16
# ... 이미지 패치 개수 = (384//16)**2 = 576
# + 클래스 토큰 = 577

torch.Size([1, 577, 768])

In [49]:
# 이미지 크기 400, 패치크기 16으로 변경시
# 이미지 패치 개수 = (400//16)**2 = 625
# + 클래스 토큰1 = 626
image_resize_test_model = models.vision_transformer.VisionTransformer(
    image_size=400,
    patch_size=16,
    num_layers=12,
    num_heads=12,
    hidden_dim=768,
    mlp_dim=3072,
)
image_resize_test_model.encoder.pos_embedding.size()
# (1, 626, 768)

torch.Size([1, 626, 768])

In [50]:
from collections import OrderedDict

# ViT 논문 - 3.2 FINE-TUNING AND HIGHER RESOLUTION
image_resize_test_model.load_state_dict(
    torchvision.models.vision_transformer.interpolate_embeddings(
        image_size=400,
        patch_size=16,
        model_state=OrderedDict(models.ViT_B_16_Weights.IMAGENET1K_SWAG_E2E_V1.get_state_dict(progress=True))
    )
)

<All keys matched successfully>

In [51]:
class ViTModelB16(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(ViTModelB16, self).__init__()
        self.backbone = models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_SWAG_E2E_V1)
        self.backbone.heads = nn.Sequential(
            nn.Linear(in_features=768, out_features=num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)
        return x


class ViTModelL16(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(ViTModelL16, self).__init__()
        self.backbone = models.vit_l_16(weights=models.ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1)
        self.backbone.heads = nn.Sequential(
            nn.Linear(in_features=1024, out_features=num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)
        return x

# ViT_B_16_Genre & ViT_L_16_Genre

In [52]:
class ViTModelB16Genre(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(ViTModelB16Genre, self).__init__()
        self.backbone = models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_SWAG_E2E_V1)
        self.backbone.heads = nn.Identity() # 기존 Classifer를 사용하지 않기 위해 입력값을 그대로 출력해주는 역할

        self.genre_classifier = nn.Linear(in_features=768, out_features=23)

        self.layer_norm = nn.LayerNorm(768 + 23, eps=1e-6)
        self.artist_classifier = nn.Linear(in_features=768 + 23, out_features=num_classes)

    def forward(self, x, genre_label=None):
        x = self.backbone(x)
        genre_pred = self.genre_classifier(x)

        if genre_label is not None:
            x = torch.cat([x, genre_label], dim=1)   # Teacher Forcing
        else:
            x = torch.cat([x, genre_pred], dim=1)

        x = self.layer_norm(x)
        x = self.artist_classifier(x)

        return x, genre_pred


class ViTModelL16Genre(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(ViTModelL16Genre, self).__init__()
        self.backbone = models.vit_l_16(weights=models.ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1)
        self.backbone.heads = nn.Identity()

        self.genre_classifier = nn.Linear(in_features=1024, out_features=23)

        self.layer_norm = nn.LayerNorm(1024 + 23, eps=1e-6)
        self.artist_classifier = nn.Linear(in_features=1024 + 23, out_features=num_classes)

    def forward(self, x, genre_label=None):
        x = self.backbone(x)
        genre_pred = self.genre_classifier(x)

        if genre_label is not None:
            x = torch.cat([x, genre_label], dim=1)   # Teacher Forcing
        else:
            x = torch.cat([x, genre_pred], dim=1)

        x = self.layer_norm(x)
        x = self.artist_classifier(x)

        return x, genre_pred

# Cutmix, FMix, MixUp
* 데이터 증강 기법
* Cutmix, FMix 참고링크 : https://www.kaggle.com/code/ar2017/pytorch-efficientnet-train-aug-cutmix-fmix
* MixUp 참고링크 : https://dacon.io/competitions/official/235842/codeshare/3665

In [72]:
def rand_bbox(size, lam):
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1. - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)

    # uniform
    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)
    return bbx1, bby1, bbx2, bby2


def fftfreqnd(h, w=None, z=None):
    """ Get bin values for discrete fourier transform of size (h, w, z)
    :param h: Required, first dimension size
    :param w: Optional, second dimension size
    :param z: Optional, third dimension size
    """
    fz = fx = 0
    fy = np.fft.fftfreq(h)

    if w is not None:
        fy = np.expand_dims(fy, -1)

        if w % 2 == 1:
            fx = np.fft.fftfreq(w)[: w // 2 + 2]
        else:
            fx = np.fft.fftfreq(w)[: w // 2 + 1]

    if z is not None:
        fy = np.expand_dims(fy, -1)
        if z % 2 == 1:
            fz = np.fft.fftfreq(z)[:, None]
        else:
            fz = np.fft.fftfreq(z)[:, None]

    return np.sqrt(fx * fx + fy * fy + fz * fz)


def get_spectrum(freqs, decay_power, ch, h, w=0, z=0):
    """ Samples a fourier image with given size and frequencies decayed by decay power
    :param freqs: Bin values for the discrete fourier transform
    :param decay_power: Decay power for frequency decay prop 1/f**d
    :param ch: Number of channels for the resulting mask
    :param h: Required, first dimension size
    :param w: Optional, second dimension size
    :param z: Optional, third dimension size
    """
    scale = np.ones(1) / (np.maximum(freqs, np.array([1. / max(w, h, z)])) ** decay_power)

    param_size = [ch] + list(freqs.shape) + [2]
    param = np.random.randn(*param_size)

    scale = np.expand_dims(scale, -1)[None, :]

    return scale * param


def make_low_freq_image(decay, shape, ch=1):
    """ Sample a low frequency image from fourier space
    :param decay_power: Decay power for frequency decay prop 1/f**d
    :param shape: Shape of desired mask, list up to 3 dims
    :param ch: Number of channels for desired mask
    """
    freqs = fftfreqnd(*shape)
    spectrum = get_spectrum(freqs, decay, ch, *shape)#.reshape((1, *shape[:-1], -1))
    spectrum = spectrum[:, 0] + 1j * spectrum[:, 1]
    mask = np.real(np.fft.irfftn(spectrum, shape))

    if len(shape) == 1:
        mask = mask[:1, :shape[0]]
    if len(shape) == 2:
        mask = mask[:1, :shape[0], :shape[1]]
    if len(shape) == 3:
        mask = mask[:1, :shape[0], :shape[1], :shape[2]]

    mask = mask
    mask = (mask - mask.min())
    mask = mask / mask.max()
    return mask


def sample_lam(alpha, reformulate=False):
    """ Sample a lambda from symmetric beta distribution with given alpha
    :param alpha: Alpha value for beta distribution
    :param reformulate: If True, uses the reformulation of [1].
    """
    if reformulate:
        lam = beta.rvs(alpha+1, alpha)
    else:
        lam = beta.rvs(alpha, alpha)

    return lam


def binarise_mask(mask, lam, in_shape, max_soft=0.0):
    """ Binarises a given low frequency image such that it has mean lambda.
    :param mask: Low frequency image, usually the result of `make_low_freq_image`
    :param lam: Mean value of final mask
    :param in_shape: Shape of inputs
    :param max_soft: Softening value between 0 and 0.5 which smooths hard edges in the mask.
    :return:
    """
    idx = mask.reshape(-1).argsort()[::-1]
    mask = mask.reshape(-1)
    num = math.ceil(lam * mask.size) if random.random() > 0.5 else math.floor(lam * mask.size)

    eff_soft = max_soft
    if max_soft > lam or max_soft > (1-lam):
        eff_soft = min(lam, 1-lam)

    soft = int(mask.size * eff_soft)
    num_low = num - soft
    num_high = num + soft

    mask[idx[:num_high]] = 1
    mask[idx[num_low:]] = 0
    mask[idx[num_low:num_high]] = np.linspace(1, 0, (num_high - num_low))

    mask = mask.reshape((1, *in_shape))
    return mask


def sample_mask(alpha, decay_power, shape, max_soft=0.0, reformulate=False):
    """ Samples a mean lambda from beta distribution parametrised by alpha, creates a low frequency image and binarises
    it based on this lambda
    :param alpha: Alpha value for beta distribution from which to sample mean of mask
    :param decay_power: Decay power for frequency decay prop 1/f**d
    :param shape: Shape of desired mask, list up to 3 dims
    :param max_soft: Softening value between 0 and 0.5 which smooths hard edges in the mask.
    :param reformulate: If True, uses the reformulation of [1].
    """
    if isinstance(shape, int):
        shape = (shape,)

    # Choose lambda
    lam = sample_lam(alpha, reformulate)

    # Make mask, get mean / std
    mask = make_low_freq_image(decay_power, shape)
    mask = binarise_mask(mask, lam, shape, max_soft)

    return lam, mask

In [73]:
def cutmix(data, image_target, genre_target, alpha):
    indices = torch.randperm(data.size(0))
    shuffled_data = data[indices]
    shuffled_image_target = image_target[indices]
    shuffled_genre_target = genre_target[indices]

    lam = np.clip(np.random.beta(alpha, alpha),0.3,0.4)
    bbx1, bby1, bbx2, bby2 = rand_bbox(data.size(), lam)
    new_data = data.clone()
    new_data[:, :, bby1:bby2, bbx1:bbx2] = data[indices, :, bby1:bby2, bbx1:bbx2]
    # adjust lambda to exactly match pixel ratio
    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (data.size()[-1] * data.size()[-2]))
    image_targets = (image_target, shuffled_image_target, lam)
    genre_targets = (genre_target, shuffled_genre_target, lam)

    return new_data, image_targets, genre_targets

def fmix(data, image_target, genre_target, alpha, decay_power, shape, max_soft=0.0, reformulate=False):
    indices = torch.randperm(data.size(0))
    shuffled_data = data[indices]
    shuffled_image_target = image_target[indices]
    shuffled_genre_target = genre_target[indices]

    lam, mask = sample_mask(alpha, decay_power, shape, max_soft, reformulate)
    x1 = torch.from_numpy(mask).to(device)*data
    x2 = torch.from_numpy(1-mask).to(device)*shuffled_data
    image_targets=(image_target, shuffled_image_target, lam)
    genre_targets = (genre_target, shuffled_genre_target, lam)

    return (x1+x2).float(), image_targets, genre_targets

def mixup(data, image_target, genre_target, alpha=1.):
    indices = torch.randperm(data.size(0))
    shuffled_image_target = image_target[indices]
    shuffled_genre_target = genre_target[indices]

    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    mixed_data = lam * data + (1 - lam) * data[indices, :]
    image_targets = (image_target, shuffled_image_target, lam)
    genre_targets = (genre_target, shuffled_genre_target, lam)

    return mixed_data, image_targets, genre_targets

# Train

In [74]:
def train(model, model_name, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device)

    criterion = nn.CrossEntropyLoss().to(device)

    best_score = 0
    best_model = None

    for epoch in range(1,CFG['EPOCHS']+1):
        if model_name != 'StackingModel':
            model.train()
        train_loss = []
        for img, label, genre_label in tqdm(train_loader):
            img, label = img.float().to(device), label.to(device)

            mix_decision = np.random.rand()
            if mix_decision < 0.25:
                img, label, _ = cutmix(img, label, genre_label, 1.)
            elif 0.25 <= mix_decision < 0.5:
                img_size = CFG['IMG_SIZE_L'] if 'l16' in model_name else CFG['IMG_SIZE_B']
                img, label, _ = fmix(img, label, genre_label, alpha=1., decay_power=5., shape=(img_size,img_size))
            elif 0.5 <= mix_decision <0.75:
                img, label, _ = mixup(img, label, genre_label, 1.)
                
                
            print(label)
            print(genre_label)

            optimizer.zero_grad()

            model_pred = model(img)

            if mix_decision < 0.75:
                loss = criterion(model_pred, label[0]) * label[2] + criterion(model_pred, label[1]) * (1. - label[2])
            else:
                loss = criterion(model_pred, label)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        tr_loss = np.mean(train_loss)

        val_loss, val_score = validation(model, criterion, test_loader, device)

        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')

        if scheduler is not None:
            scheduler.step()

        if best_score < val_score:
            torch.save(model.state_dict(), f'{model_name}.pt')
            best_model = model
            best_score = val_score

    return best_model

In [75]:
# genre 정보를 활용한 모델을 위한 train 함수
def genre_train(model, model_name, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device)

    criterion = nn.CrossEntropyLoss().to(device)

    best_score = 0
    best_model = None

    for epoch in range(1,CFG['EPOCHS']+1):
        model.train()
        train_loss, train_artist_loss, train_genre_loss = [], [], []
        for img, label, genre_label in tqdm(train_loader):
            img, label, genre_label = img.float().to(device), label.to(device), genre_label.float().to(device)

            mix_decision = np.random.rand()
            if mix_decision < 0.25:
                img, label, genre_label = cutmix(img, label, genre_label, 1.)
            elif 0.25 <= mix_decision < 0.5:
                img_size = CFG['IMG_SIZE_L'] if 'l16' in model_name else CFG['IMG_SIZE_B']
                img, label, genre_label = fmix(img, label, genre_label, alpha=1., decay_power=5., shape=(img_size,img_size))
            elif 0.5 <= mix_decision <0.75:
                img, label, genre_label = mixup(img, label, genre_label, 1.)

            optimizer.zero_grad()

            if mix_decision < 0.75:
                model_pred, genre_pred = model(img, genre_label[0] * genre_label[2] + genre_label[1] * (1. - genre_label[2]))
                artist_loss = criterion(model_pred, label[0]) * label[2] + criterion(model_pred, label[1]) * (1. - label[2])
                genre_loss = criterion(genre_pred, genre_label[0]) * genre_label[2] + criterion(genre_pred, genre_label[1]) * (1. - genre_label[2])
            else:
                model_pred, genre_pred = model(img, genre_label)
                artist_loss = criterion(model_pred, label)
                genre_loss = criterion(genre_pred, genre_label)

            loss = artist_loss*0.7 + genre_loss*0.3
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            train_artist_loss.append(artist_loss.item())
            train_genre_loss.append(genre_loss.item())

        tr_loss, tr_artist_loss, tr_genre_loss = np.mean(train_loss), np.mean(train_artist_loss), np.mean(train_genre_loss)

        val_loss, val_artist_loss, val_genre_loss, val_score = genre_validation(model, criterion, test_loader, device)

        print(f'Epoch [{epoch}], Train Loss: [{tr_loss:.5f}], Artist Loss: [{tr_artist_loss:.5f}], Genre Loss: [{tr_genre_loss:.5f}]'
              f'Val Loss: [{val_loss:.5f}], Artist Loss: [{val_artist_loss:.5f}], Genre Loss: [{val_genre_loss:.5f}], Val F1 Score: [{val_score:.5f}]')

        if scheduler is not None:
            scheduler.step()

        if best_score < val_score:
            torch.save(model.state_dict(), f'{model_name}.pt')
            best_model = model
            best_score = val_score

    return best_model

In [76]:
def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")


def validation(model, criterion, test_loader, device):
    model.eval()

    model_preds = []
    true_labels = []

    val_loss = []

    with torch.no_grad():
        for img, label, _ in tqdm(test_loader):
            img, label = img.float().to(device), label.to(device)

            model_pred = model(img)

            loss = criterion(model_pred, label)

            val_loss.append(loss.item())

            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()

    val_f1 = competition_metric(true_labels, model_preds)
    return np.mean(val_loss), val_f1

# genre 정보를 활용한 모델을 위한 validation 함수
def genre_validation(model, criterion, test_loader, device):
    model.eval()

    model_preds = []
    true_labels = []

    val_loss, val_artist_loss, val_genre_loss = [], [], []

    with torch.no_grad():
        for img, label, genre_label in tqdm(iter(test_loader)):
            img, label, genre_label = img.float().to(device), label.to(device), genre_label.float().to(device)

            model_pred, genre_pred = model(img)

            artist_loss = criterion(model_pred, label)
            genre_loss = criterion(genre_pred, genre_label)
            loss = artist_loss*0.7 + genre_loss*0.3

            val_loss.append(loss.item())
            val_artist_loss.append(artist_loss.item())
            val_genre_loss.append(genre_loss.item())

            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()

    val_f1 = competition_metric(true_labels, model_preds)
    return np.mean(val_loss), np.mean(val_artist_loss), np.mean(val_genre_loss), val_f1

# 모델 훈련 시작

In [77]:
model_list = [BaseModel(), ViTModelB16(), ViTModelL16(), ViTModelB16Genre(), ViTModelL16Genre()]
model_name = ['EfficientNet_v2_m', 'VisionTransformer_b16', 'VisionTransformer_l16', 'VisionTransformer_b16_genre', 'VisionTransformer_l16_genre']

In [78]:
optimizer_list = []
scheduler_list = []
for idx, model in enumerate(model_list):
    optimizer_list.append(torch.optim.AdamW(params = model.parameters(), lr = CFG["LEARNING_RATE"], weight_decay=0.01))
    scheduler_list.append(
        get_cosine_schedule_with_warmup(
            optimizer=optimizer_list[idx],
            num_warmup_steps=len(train_loader_b) * 20,
            num_training_steps=len(train_loader_b) * CFG["EPOCHS"]
        )
    )

In [79]:
for idx, model in enumerate(model_list):
    print('#'*50)
    print(f'{model_name[idx]} Model Train Start')

    if 'l16' in model_name[idx]:
        train_loader = train_loader_l
        val_loader = val_loader_l
    else:
        train_loader = train_loader_b
        val_loader = val_loader_b

    if model_name[idx].endswith('genre'):
        genre_train(model, model_name[idx], optimizer_list[idx], train_loader, val_loader, scheduler_list[idx], device)
    else:
        train(model, model_name[idx], optimizer_list[idx], train_loader, val_loader, scheduler_list[idx], device)

##################################################
EfficientNet_v2_m Model Train Start


  0%|          | 0/1182 [00:00<?, ?it/s]

tensor([ 4, 10, 34, 17], device='cuda:0', dtype=torch.int32)
tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]], dtype=torch.float64)


OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacty of 8.00 GiB of which 0 bytes is free. Of the allocated memory 7.22 GiB is allocated by PyTorch, and 12.13 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

# stacking ensemble

In [None]:
class StackingModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(StackingModel, self).__init__()
        self.efficientnet = BaseModel()
        self.vit_b16 = ViTModelB16()
        self.vit_b16_genre = ViTModelB16Genre()
        self.vit_l16 = ViTModelL16()
        self.vit_l16_genre = ViTModelL16Genre()

        self.efficientnet.load_state_dict(torch.load('./EfficientNet_v2_m.pt', map_location=device))
        self.vit_b16.load_state_dict(torch.load('./VisionTransformer_b16.pt', map_location=device))
        self.vit_b16_genre.load_state_dict(torch.load('./VisionTransformer_b16_genre.pt', map_location=device))
        self.vit_l16.load_state_dict(torch.load('./VisionTransformer_l16.pt', map_location=device))
        self.vit_l16_genre.load_state_dict(torch.load('./VisionTransformer_l16_genre.pt', map_location=device))

        self.efficientnet.requires_grad_(False)
        self.vit_b16.requires_grad_(False)
        self.vit_b16_genre.requires_grad_(False)
        self.vit_l16.requires_grad_(False)
        self.vit_l16_genre.requires_grad_(False)

        self.layer_norm = nn.LayerNorm(num_classes*5, eps=1e-6)

        self.dense = nn.Linear(in_features=num_classes*5, out_features=num_classes)

    def forward(self, x):
        x_l = torchvision.transforms.functional.resize(x, [CFG['IMG_SIZE_L'], CFG['IMG_SIZE_L']])

        x1 = self.efficientnet(x)
        x2 = self.vit_b16(x)
        x3, _ = self.vit_b16_genre(x)
        x4 = self.vit_l16(x_l)
        x5, _ = self.vit_l16_genre(x_l)

        x = torch.cat([x1, x2, x3, x4, x5], dim=1)
        x = self.layer_norm(x)

        x = self.dense(x)

        return x

In [None]:
model = StackingModel()
optimizer = torch.optim.AdamW(params=model.parameters(), lr=CFG["LEARNING_RATE"], weight_decay=0.01)
scheduler = get_cosine_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=len(train_loader_b) * 20,
    num_training_steps=len(train_loader_b) * CFG["EPOCHS"]
)

infer_model = train(model, 'StackingModel', optimizer, train_loader_b, val_loader_b, scheduler, device)

# inference

In [None]:
test_df = pd.read_csv(os.path.join(data_path, './test.csv'))
test_df.head()

test_img_paths = get_data(test_df, infer=True)

test_dataset = CustomDataset(test_img_paths, None, None, test_transform_b)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()

    model_preds = []

    with torch.no_grad():
        for i, img in enumerate(tqdm(iter(test_loader))):
            img = img.float().to(device)

            model_pred = model(img)


            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()

    print('Done.')
    return model_preds

In [None]:
preds = inference(infer_model, test_loader, device)
preds = le.inverse_transform(preds)  # LabelEncoder로 변환 된 Label을 다시 화가 이름으로 변환

# ## Submit
submit = pd.read_csv(os.path.join(data_path, './sample_submission.csv'))
submit['artist'] = preds
submit.to_csv(os.path.join(data_path, './submit.csv'), index=False)