In [None]:
# drive mount
from google.colab import drive
drive.mount('/content/drive')

## Import Library

In [None]:
!pip install ttach
!pip install timm

In [None]:
import random
import pandas as pd
import numpy as np
import os
import cv2

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import timm

from tqdm.auto import tqdm
from copy import deepcopy

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import ttach as tta

import torchvision.models as models

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore')

In [None]:
# device 할당
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
CFG = {
    'IMG_SIZE_H': 220,
    'IMG_SIZE_W': 275,
    'EPOCHS': 50,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 64,
    'SEED': 41,
    'PATIENCE' : 3
}

In [None]:
# RandomSeed
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED'])

## Data Preprocessing

In [None]:
train = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Artist_classification/data/train.csv')
train.shape

In [None]:
def img_path_change(img_path):
  return '/content/drive/MyDrive/Colab Notebooks/Artist_classification/data' + str(img_path)[1:]

train['img_path'] = train['img_path'].apply(img_path_change)

In [None]:
# Label Encoding : artist들을 범주형 데이터로 변환
# 화가 이름 50명
le = preprocessing.LabelEncoder()
train['artist'] = le.fit_transform(train['artist'].values)

## Train / Validation Split

In [None]:
train_df, val_df, _, _ = train_test_split(train, train['artist'].values, test_size=0.2, random_state=CFG['SEED'])

In [None]:
train_df = train_df.sort_values(by=['id'])
train_df.head()

In [None]:
val_df = val_df.sort_values(by=['id'])
val_df.head()

## Data Load

In [None]:
# inference=True면 test 데이터라는 뜻.
# 따라서 target에 해당하는 artist를 return할 수 없음.
def get_data(df, infer=False):
  if infer:
    return df['img_path'].values
    
  return df['img_path'].values, df['artist'].values

In [None]:
# 파일 경로, 레이블
train_img_paths, train_labels = get_data(train_df)
val_img_paths, val_labels = get_data(val_df)

In [None]:
# 여기서 9등분하고 train_imgs_labels, val_imgs_labels 만들기
def split_image(paths,labels):
  img_list = []
  label_list = []

  for path, label in tqdm(zip(paths, labels)):
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    height,width,c = image.shape

    half_h = height//2
    half_w = width//2
    queter_h = half_h//2
    queter_w = half_w//2

    pos_list = [
        [0,half_w, 0,half_h],
        [half_w,width, 0,half_h],
        [0,half_w, half_h,height],
        [half_w,width, half_h,height],
        [0,half_w, queter_h,height-queter_h],
        [half_w,width, queter_h,height-queter_h],
        [queter_w,width-queter_w, 0,half_h],
        [queter_w,width-queter_w, half_h,height],
        [queter_w,width-queter_w, queter_h,height-queter_h]
    ]

    for poses in pos_list:
      img_list.append(image[poses[2]:poses[3],poses[0]:poses[1]])
      label_list.append(label)

  return img_list,label_list

In [None]:
train_imgs, train_labels = split_image(train_img_paths, train_labels)

In [None]:
val_imgs, val_labels = split_image(val_img_paths, val_labels)

## CustomDataset

In [None]:
# torch.utils.data.Dataset이라는 class를 상속받는 자식 클래스
class CustomDataset(Dataset):

  # 데이터셋을 처음 선언할 때, 자동으로 호출.
  # 몇 가지 인수들을 입력받도록 만들 수 있다.
  def __init__(self, imgs, labels, transforms=None):
    self.imgs = imgs
    self.labels = labels
    self.transforms = transforms

  # 데이터셋에서 특정 1개의 샘플을 가져오기
  # index는 몇 번째 데이터를 가져올건지에 대한 변수.
  def __getitem__(self, index):
    image = self.imgs[index]

    # 아래 dataset 선언을 보면 transform이 사용됨.
    if self.transforms is not None:
      image = self.transforms(image=image)['image']

    if self.labels is not None:
      label = self.labels[index]
      return image, label
    else:
      return image

  # 데이터셋의 길이 (총 샘플의 수)
  # 데이터셋을 선언하고 dataloader를 사용할 때 내부적으로 사용
  ## 데이터셋의 len을 알아야 데이터로더가 미니 배치를 사용할 수 있기 때문
  def __len__(self):
    return len(self.imgs)

In [None]:
class TestDataset(Dataset):
  def __init__(self, img_paths, labels, transforms=None):
    self.img_paths = img_paths
    self.labels = labels
    self.transforms = transforms

  def __getitem__(self, index):
    img_path = self.img_paths[index]

    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    if self.transforms is not None:
      image = self.transforms(image=image)['image']

    if self.labels is not None:
      label = self.labels[index]
      return image, label
    else:
      return image

  def __len__(self):
    return len(self.img_paths)

In [None]:
# Albumentation Augmentation
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE_H'],CFG['IMG_SIZE_W']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            A.HorizontalFlip(p=0.5),
                            A.VerticalFlip(p=0.5),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE_H'],CFG['IMG_SIZE_W']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

In [None]:
# Data Loader
train_dataset = CustomDataset(train_imgs, train_labels, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val_imgs, val_labels, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

## Model Define

In [None]:
class Network_swin(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(Network_swin, self).__init__()
        self.backbone = models.swin_t(weights='IMAGENET1K_V1')
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

## Train

In [None]:
def train(model, optimizer, train_loader, test_loader, scheduler, device):
  # 모델을 device에 할당
  model.to(device)

  # early stopping
  es_count = 0

  # Loss 정의
  criterion = nn.CrossEntropyLoss().to(device)

  # Scheduler에서 사용할 변수 선언
  best_score = 0
  best_model = None

  for epoch in range(1, CFG['EPOCHS'] + 1):
    # model을 train 모드로 전환
    model.train()

    # loss값을 넣을 리스트 생성
    train_loss = []

    # Epoch 진행
    for img, label in tqdm(iter(train_loader)):
      img, label = img.float().to(device), label.to(device)

      # 과거에 이용한 mini batch 내 이미지, 레이블을 바탕으로 계산된 Loss의 Gradient값이 optimizer에 할당되어 있는 것을 방지.
      optimizer.zero_grad()

      # pred값 
      model_pred = model(img)

      # 선언한 Loss에 pred값과 정답을 넣기 
      loss = criterion(model_pred, label)

      # backpropagation
      loss.backward()

      # optimizer
      optimizer.step()

      # loss값 추가
      train_loss.append(loss.item())

    # 최종 loss값 생성
    tr_loss = np.mean(train_loss)

    val_loss, val_score = validation(model, criterion, test_loader, device)

    print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')

    # Scheduler
    if scheduler is not None:
      scheduler.step()

    es_count += 1

    # val_score을 기준으로 best model 선정
    if best_score < val_score:
      best_model = model
      best_score = val_score
      es_count = 0

      # checkpoint
      best_acc_model = deepcopy(model.state_dict())
      print("model save!!")
      torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/Artist_classification/model/best_swin.pt')
      
    if es_count > CFG['PATIENCE']:
      print('Early Stopping')
      break

  return best_model

In [None]:
# 이번 대회에서는 F1 score를 사용
def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")

def validation(model, criterion, test_loader, device):

  # 모델을 평가용으로 전환 (dropout 등의 규제가 들어가지 않게 조절)
  model.eval()
  
  model_preds = []
  true_labels = []

  val_loss = []

  # 평가 단계에서 Gradient를 통해 파라미터 값이 업데이트되는 현상을 방지
  with torch.no_grad():
    for img, label in tqdm(iter(test_loader)):
      img, label = img.float().to(device), label.to(device)
      model_pred = model(img)
      loss = criterion(model_pred, label)
      val_loss.append(loss.item())

      model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
      true_labels += label.detach().cpu().numpy().tolist()

  val_f1 = competition_metric(true_labels, model_preds)
  return np.mean(val_loss), val_f1

In [None]:
# Run1

model_swin = Network_swin()
model_swin.eval()
optimizer = torch.optim.Adam(params = model_swin.parameters(), lr = CFG["LEARNING_RATE"])

# scheduler
lambda1 = lambda epoch: 0.65 ** epoch
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda1)

infer_model_swin = train(model_swin, optimizer, train_loader, val_loader, scheduler, device)

## Inference

In [None]:
# 모델 로딩이 필요하면 쓰세요
#infer_model_swin = Network_swin()
#infer_model_swin.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/Artist_classification/model/swin.pt', map_location=device))

In [None]:
# TTA (test time augmentation)
tta_transforms = tta.Compose(
    [
        tta.HorizontalFlip(),
        tta.VerticalFlip(), 
    ]
)

tta_model = tta.ClassificationTTAWrapper(infer_model_swin, tta_transforms)

In [None]:
test = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Artist_classification/data/test.csv')
test.head()

In [None]:
test['img_path'] = test['img_path'].apply(lambda x : '/content/drive/MyDrive/Colab Notebooks/Artist_classification/data' + x[1:] )

In [None]:
# Test에는 artist 정보가 없으니 infer=True
test_img_paths = get_data(test, infer=True)

In [None]:
test_dataset = TestDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=2)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    
    model_preds = []
    
    with torch.no_grad():
        for img,himg in tqdm(iter(test_loader)):
            img, himg = img.float().to(device), himg.float().to(device)
            
            model_pred = model(img,himg)
            model_preds += model_pred.detach().cpu().numpy().tolist()
    
    print('Done.')
    return model_preds

In [None]:
# eff inference
preds_swin = inference(tta_model, test_loader, device)

In [None]:
np.save('/content/drive/MyDrive/Colab Notebooks/Artist_classification/pred/pred_swin', preds_swin) 