In [1]:
# drive mount
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Import Library

In [2]:
!pip install ttach
!pip install timm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ttach
  Downloading ttach-0.0.3-py3-none-any.whl (9.8 kB)
Installing collected packages: ttach
Successfully installed ttach-0.0.3
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting timm
  Downloading timm-0.6.11-py3-none-any.whl (548 kB)
[K     |████████████████████████████████| 548 kB 13.3 MB/s 
Collecting huggingface-hub
  Downloading huggingface_hub-0.10.1-py3-none-any.whl (163 kB)
[K     |████████████████████████████████| 163 kB 64.9 MB/s 
Installing collected packages: huggingface-hub, timm
Successfully installed huggingface-hub-0.10.1 timm-0.6.11


In [3]:
import random
import pandas as pd
import numpy as np
import os
import cv2

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import timm

from tqdm.auto import tqdm
from copy import deepcopy

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import ttach as tta

import torchvision.models as models

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore')

In [4]:
# device 할당
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [5]:
CFG = {
    'IMG_SIZE' : 224,
    'IMG_SIZE_H': 220,
    'IMG_SIZE_W': 275,
    'EPOCHS': 100,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 32,
    'SEED': 41,
    'PATIENCE' : 5
}

In [6]:
# RandomSeed
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED'])

## Data Preprocessing

In [7]:
train = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Artist_classification/data/train.csv')
train.shape

(5911, 3)

In [8]:
train.loc[3986] = [3986,'./train/3986.jpg','Alfred Sisley']
train.loc[3896,'artist'] = 'Titian'

In [9]:
def img_path_change(img_path):
  return '/content/drive/MyDrive/Colab Notebooks/Artist_classification' + str(img_path)[1:]

train['img_path'] = train['img_path'].apply(img_path_change)

In [10]:
train['img_path'][0]

'/content/drive/MyDrive/Colab Notebooks/Artist_classification/train/0000.jpg'

In [11]:
# Label Encoding : artist들을 범주형 데이터로 변환
# 화가 이름 50명
le = preprocessing.LabelEncoder()
train['artist'] = le.fit_transform(train['artist'].values)

## Train / Validation Split

In [12]:
train_df, val_df, _, _ = train_test_split(train, train['artist'].values, test_size=0.2, random_state=CFG['SEED'],stratify=train['artist'].values)

In [None]:
train_df = train_df.sort_values(by=['id'])
train_df.head()

Unnamed: 0,id,img_path,artist
0,0,/content/drive/MyDrive/Colab Notebooks/Artist_...,9
2,2,/content/drive/MyDrive/Colab Notebooks/Artist_...,7
3,3,/content/drive/MyDrive/Colab Notebooks/Artist_...,10
5,5,/content/drive/MyDrive/Colab Notebooks/Artist_...,38
6,6,/content/drive/MyDrive/Colab Notebooks/Artist_...,43


In [None]:
val_df = val_df.sort_values(by=['id'])
val_df.head()

Unnamed: 0,id,img_path,artist
1,1,/content/drive/MyDrive/Colab Notebooks/Artist_...,48
4,4,/content/drive/MyDrive/Colab Notebooks/Artist_...,24
17,17,/content/drive/MyDrive/Colab Notebooks/Artist_...,10
21,21,/content/drive/MyDrive/Colab Notebooks/Artist_...,29
29,29,/content/drive/MyDrive/Colab Notebooks/Artist_...,28


## Data Load

In [13]:
# inference=True면 test 데이터라는 뜻.
# 따라서 target에 해당하는 artist를 return할 수 없음.
def get_data(df, infer=False):
  if infer:
    return df['img_path'].values
    
  return df['img_path'].values, df['artist'].values

In [14]:
# 파일 경로, 레이블
train_img_paths, train_labels = get_data(train_df)
val_img_paths, val_labels = get_data(val_df)

In [15]:
# 여기서 9등분하고 train_imgs_labels, val_imgs_labels 만들기
def split_image(paths,labels):
  img_list = []
  label_list = []
  himg_list = []
  #vimg_list = []

  for path, label in tqdm(zip(paths, labels),total=len(paths)):
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    height,width,c = image.shape

    half_h = height//2
    half_w = width//2
    queter_h = half_h//2
    queter_w = half_w//2

    pos_list = [
        [0,half_w, 0,half_h],
        [half_w,width, 0,half_h],
        [0,half_w, half_h,height],
        [half_w,width, half_h,height],
        [0,half_w, queter_h,height-queter_h],
        [half_w,width, queter_h,height-queter_h],
        [queter_w,width-queter_w, 0,half_h],
        [queter_w,width-queter_w, half_h,height],
        [queter_w,width-queter_w, queter_h,height-queter_h]
    ]


    for poses in pos_list:
      img = cv2.resize(image[poses[2]:poses[3],poses[0]:poses[1]], dsize=(CFG['IMG_SIZE_W'], CFG['IMG_SIZE_H']), interpolation=cv2.INTER_LINEAR)
      img_list.append(img)      
      himg_list.append(cv2.flip(img, 1))
      #vimg_list.append(cv2.flip(img, -1))
      label_list.append(label)

  return img_list, himg_list, label_list

In [None]:
train_imgs, train_himgs,  train_labels = split_image(train_img_paths, train_labels)

  0%|          | 0/4728 [00:00<?, ?it/s]

In [None]:
val_imgs, val_himgs,  val_labels = split_image(val_img_paths, val_labels)

## CustomDataset

In [None]:
# torch.utils.data.Dataset이라는 class를 상속받는 자식 클래스
class CustomDataset(Dataset): # 메모리에서 가져올래 a = [0 1 2 3]  -> a[0] => __getitem__

  # 데이터셋을 처음 선언할 때, 자동으로 호출.
  # 몇 가지 인수들을 입력받도록 만들 수 있다.
  def __init__(self, imgs, himgs, labels, transforms=None):
    self.imgs = imgs
    self.himgs = himgs
    #self.vimgs = vimgs
    self.labels = labels
    self.transforms = transforms


  # 데이터셋에서 특정 1개의 샘플을 가져오기
  # index는 몇 번째 데이터를 가져올건지에 대한 변수.
  def __getitem__(self, index):
    image = self.imgs[index]
    himg = self.himgs[index]
    #vimg = self.vimgs[index]


    # 아래 dataset 선언을 보면 transform이 사용됨.
    if self.transforms is not None:
      image = self.transforms(image=image)['image']
      himg = self.transforms(image=himg)['image']
      #vimg = self.transforms(image=vimg)['image']

 


    if self.labels is not None:
      label = self.labels[index]
      return image, himg, label
    else:
      return image, himg

  # 데이터셋의 길이 (총 샘플의 수)
  # 데이터셋을 선언하고 dataloader를 사용할 때 내부적으로 사용
  ## 데이터셋의 len을 알아야 데이터로더가 미니 배치를 사용할 수 있기 때문
  def __len__(self):
    return len(self.imgs)

In [None]:
class TestDataset(Dataset):
  def __init__(self, img_paths, labels, transforms=None):
    self.img_paths = img_paths
    self.labels = labels
    self.transforms = transforms


  def __getitem__(self, index):
    img_path = self.img_paths[index]

    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, dsize=(CFG['IMG_SIZE_W'], CFG['IMG_SIZE_H']), interpolation=cv2.INTER_LINEAR)

    himage = cv2.flip(image, 1)
    #vimage = cv2.flip(image, -1)
    
    if self.transforms is not None:
      image = self.transforms(image=image)['image']
      himage = self.transforms(image=himage)['image']
      #vimage = self.transforms(image=vimage)['image']



    if self.labels is not None:
      label = self.labels[index]
      return image, himage, label
    else:
      return image, himage,

  def __len__(self):
    return len(self.img_paths)

In [None]:
# Albumentation Augmentation
train_transform = A.Compose([
                            #A.Resize(CFG['IMG_SIZE_H'],CFG['IMG_SIZE_W']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            #A.HorizontalFlip(p=0.5),
                            A.VerticalFlip(p=0.5),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            #A.Resize(CFG['IMG_SIZE_H'],CFG['IMG_SIZE_W']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])


In [None]:
# Data Loader

train_dataset = CustomDataset(train_imgs, train_himgs, train_labels, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=2)

val_dataset = CustomDataset(val_imgs, val_himgs, val_labels, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=2)

## Model Define

In [None]:
class Network_eff(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(Network_eff, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True) # b0 ~ b7
        self.backbone2 = models.efficientnet_b0(pretrained=True) # b0 ~ b7       
        self.classifier = nn.Linear(2000, num_classes)
        
    def forward(self, x1, x2):
        x1 = self.backbone(x1)
        x2 = self.backbone2(x2)

        x = torch.cat((x1,x2),1)

        x = self.classifier(x)
        return x



## Train

In [None]:
def train(model, optimizer, train_loader, test_loader, scheduler, device):
  # 모델을 device에 할당
  model.to(device)

  # early stopping
  es_count = 0

  # Loss 정의
  criterion = nn.CrossEntropyLoss().to(device)

  # Scheduler에서 사용할 변수 선언
  best_score = 0
  best_model = None

  for epoch in range(1, CFG['EPOCHS'] + 1):
    # model을 train 모드로 전환
    model.train()

    # loss값을 넣을 리스트 생성
    train_loss = []

    # Epoch 진행
    for img, himg, label in tqdm(iter(train_loader)):
      img, himg, label = img.float().to(device), himg.float().to(device), label.to(device)

      # 과거에 이용한 mini batch 내 이미지, 레이블을 바탕으로 계산된 Loss의 Gradient값이 optimizer에 할당되어 있는 것을 방지.
      optimizer.zero_grad()

      # pred값 
      model_pred = model(img, himg)

      # 선언한 Loss에 pred값과 정답을 넣기 
      loss = criterion(model_pred, label)

      # backpropagation
      loss.backward()

      # optimizer
      optimizer.step()

      # loss값 추가
      train_loss.append(loss.item())

    # 최종 loss값 생성
    tr_loss = np.mean(train_loss)

    val_loss, val_score = validation(model, criterion, test_loader, device)

    print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')

    # Scheduler
    if scheduler is not None:
      scheduler.step()

    es_count += 1

    # val_score을 기준으로 best model 선정
    if best_score < val_score:
      best_model = model
      best_score = val_score
      es_count = 0

      # checkpoint
      #best_acc_model = deepcopy(model.state_dict())
      print("model save!!")
      torch.save(best_model, '/content/drive/MyDrive/Colab Notebooks/Artist_classification/multimodal_model.pt')
      
    if es_count > CFG['PATIENCE']:
      print('Early Stopping')
      break

  return best_model

In [None]:
# 이번 대회에서는 F1 score를 사용
def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")

def validation(model, criterion, test_loader, device):

  # 모델을 평가용으로 전환 (dropout 등의 규제가 들어가지 않게 조절)
  model.eval()
  
  model_preds = []
  true_labels = []

  val_loss = []

  # 평가 단계에서 Gradient를 통해 파라미터 값이 업데이트되는 현상을 방지
  with torch.no_grad():
    for img,  himg, label in tqdm(iter(test_loader)):
      img,  himg, label = img.float().to(device), himg.float().to(device), label.to(device)
      model_pred = model(img, himg)
      loss = criterion(model_pred, label)
      val_loss.append(loss.item())

      model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
      true_labels += label.detach().cpu().numpy().tolist()

  val_f1 = competition_metric(true_labels, model_preds)
  return np.mean(val_loss), val_f1

In [None]:
# Run1

model_eff = Network_eff()
model_eff.eval()
optimizer = torch.optim.Adam(params = model_eff.parameters(), lr = CFG["LEARNING_RATE"])

# scheduler
lambda1 = lambda epoch: 0.65 ** epoch
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda1)

infer_model_eff = train(model_eff, optimizer, train_loader, val_loader, scheduler, device)
#infer_model_eff = train(model_eff, optimizer, train_loader, val_loader, None, device)

## Inference

In [None]:
infer_model_eff = torch.load('/content/drive/MyDrive/Colab Notebooks/Artist_classification/multimodal_model.pt')

In [None]:
# TTA (test time augmentation)

tta_transforms = tta.Compose(
    [
        tta.HorizontalFlip(),
        tta.VerticalFlip(), 
    ]
)

tta_model = tta.ClassificationTTAWrapper(infer_model_eff, tta_transforms)


In [None]:
test = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Artist_classification/data/test.csv')
test.head()

Unnamed: 0,id,img_path
0,TEST_00000,./test/TEST_00000.jpg
1,TEST_00001,./test/TEST_00001.jpg
2,TEST_00002,./test/TEST_00002.jpg
3,TEST_00003,./test/TEST_00003.jpg
4,TEST_00004,./test/TEST_00004.jpg


In [None]:
test['img_path'] = test['img_path'].apply(lambda x : '/content/drive/MyDrive/Colab Notebooks/Artist_classification/data' + x[1:] )

In [None]:
test.shape

(12670, 2)

In [None]:
# Test에는 artist 정보가 없으니 infer=True
test_img_paths = get_data(test, infer=True)

In [None]:
test_dataset = TestDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=2)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    
    model_preds = []
    
    with torch.no_grad():
        for img,himg in tqdm(iter(test_loader)):
            img, himg = img.float().to(device), himg.float().to(device)
            
            model_pred = model(img,himg)
            model_preds += model_pred.detach().cpu().numpy().tolist()
    
    print('Done.')
    return model_preds

In [None]:
# eff inference
preds_eff = inference(tta_model, test_loader, device)

  0%|          | 0/396 [00:00<?, ?it/s]

In [None]:
np.save('/content/drive/MyDrive/Colab Notebooks/Artist_classification/pred_multimodal_h', preds_eff) 