# 월간 데이콘 예술 작품 화가 분류 AI 경진대회  
노트북 제목 : [Baseline] 사전학습 모델 Efficientnet B0를 활용한 이미지 분류
링크 : https://dacon.io/competitions/official/236006/codeshare/6675?page=1&dtype=recent

In [1]:
# !unzip -qq /content/drive/MyDrive/open.zip

# import 

In [2]:
import random
import pandas as pd
import numpy as np
import os
import cv2

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from tqdm.auto import tqdm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torchvision.models as models

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action = "ignore")

In [3]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [4]:
device

device(type='cuda')

# 하이퍼파라미터

In [5]:
CFG = {
    # 이미지 크기에따라 모델에 최적화된 사이즈가 다름
    'IMG_SIZE':224,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    # 배치사이즈도 조절해야함
    'BATCH_SIZE':64,
    'SEED':41
}

# 랜덤시드 고정

In [6]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED'])

# 데이터 전처리

In [7]:
df = pd.read_csv('./train.csv')
df.head()

Unnamed: 0,id,img_path,artist
0,0,./train/0000.jpg,Diego Velazquez
1,1,./train/0001.jpg,Vincent van Gogh
2,2,./train/0002.jpg,Claude Monet
3,3,./train/0003.jpg,Edgar Degas
4,4,./train/0004.jpg,Hieronymus Bosch


In [8]:
# 라벨인코딩
le = preprocessing.LabelEncoder()
# 인덱스를 제외한 artist에만 라벨인코더 적용
df['artist'] = le.fit_transform(df['artist'].values)

# train/ test로 나누기

In [9]:
train_df, val_df, _, _ = train_test_split(df, df['artist'].values, test_size=0.2, random_state=CFG['SEED'])

In [10]:
train_df = train_df.sort_values(by = ['id'])
train_df.head()

Unnamed: 0,id,img_path,artist
0,0,./train/0000.jpg,9
2,2,./train/0002.jpg,7
3,3,./train/0003.jpg,10
5,5,./train/0005.jpg,38
6,6,./train/0006.jpg,43


In [11]:
val_df = val_df.sort_values(by = ['id'])
val_df.head()

Unnamed: 0,id,img_path,artist
1,1,./train/0001.jpg,48
4,4,./train/0004.jpg,24
17,17,./train/0017.jpg,10
21,21,./train/0021.jpg,29
29,29,./train/0029.jpg,28


# 데이터 불러오기

In [12]:
# train시에는 이미지경로 라벨// 추론시에는 이미지경로만 가져옴
def get_data(df, infer = False):
    if infer:
        return df['img_path'].values
    return df['img_path'].values, df['artist'].values

In [13]:
train_img_paths, train_labels = get_data(train_df)
val_img_paths, val_labels = get_data(val_df)

# Custom Dataset

In [14]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        # 이미지를 읽습니다.
        image = cv2.imread(img_path)
        # cv2.imread로 가져오면 BGR로 가져오므로 RGB로 가져오기위해 아래코드를 실행합니다.
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        
        if self.labels is not None:
            label = self.labels[index]
            return image, label
        else:
            return image
    
    def __len__(self):
        return len(self.img_paths)

In [15]:
# albumentations는 타 라이브러리에 비해 빠르게 전처리가 가능해서 사용함
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

In [16]:
train_dataset = CustomDataset(train_img_paths, train_labels, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val_img_paths, val_labels, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [30]:
# CustomDataset  image = self.transforms(image=image)['image'] 부분 확인 1
pathh = train_img_paths[0]
pathh = cv2.imread(pathh)
# 3차원 리스트 형태로 출력됨
pathh = cv2.cvtColor(pathh, cv2.COLOR_BGR2RGB)

array([[[42, 34, 31],
        [46, 38, 35],
        [55, 47, 44],
        ...,
        [98, 76, 53],
        [96, 74, 51],
        [93, 71, 48]],

       [[68, 60, 57],
        [55, 47, 44],
        [48, 40, 37],
        ...,
        [98, 76, 53],
        [97, 75, 52],
        [95, 73, 50]],

       [[82, 74, 71],
        [62, 54, 51],
        [49, 39, 37],
        ...,
        [95, 73, 50],
        [96, 74, 51],
        [97, 75, 52]],

       ...,

       [[38, 33, 29],
        [40, 35, 31],
        [43, 38, 34],
        ...,
        [48, 34, 31],
        [47, 36, 32],
        [46, 35, 31]],

       [[41, 36, 32],
        [44, 39, 35],
        [49, 44, 40],
        ...,
        [48, 34, 31],
        [47, 36, 32],
        [47, 36, 32]],

       [[44, 39, 35],
        [47, 42, 38],
        [51, 46, 42],
        ...,
        [49, 35, 32],
        [47, 36, 32],
        [47, 36, 32]]], dtype=uint8)

In [37]:
# CustomDataset  image = self.transforms(image=image)['image'] 부분 확인 2
# 딕셔너리형태로 ["image"]라고 작성된 이유는 증강된 이미지라는 뜻
# ["mask"]를 부를 수도 있는데, 이 경우에는 augmentation된 마스크를 불러올 수 있음
imagee = train_transform(image= pathh)["image"]
print(imagee.shape)
print("-"*10)
print(imagee)

torch.Size([3, 224, 224])
----------
tensor([[[-1.2274, -1.4843, -1.1075,  ..., -0.4226, -0.5938, -0.5253],
         [-1.0904, -1.1932, -1.2445,  ..., -0.4054, -0.4568, -0.5253],
         [-1.0562, -0.9877, -0.9534,  ..., -0.5082, -0.5424, -0.4568],
         ...,
         [-1.4843, -1.5185, -1.5870,  ..., -1.1760, -1.3815, -1.0904],
         [-1.4843, -1.5528, -1.5357,  ..., -1.2959, -1.2788, -1.2103],
         [-1.4158, -1.2445, -1.5014,  ..., -1.0219, -1.3130, -1.2959]],

        [[-1.2829, -1.5805, -1.2479,  ..., -0.6702, -0.8627, -0.7752],
         [-1.1779, -1.3354, -1.4055,  ..., -0.6527, -0.7227, -0.7927],
         [-1.1779, -1.1429, -1.1604,  ..., -0.7577, -0.8102, -0.7402],
         ...,
         [-1.5280, -1.5980, -1.6506,  ..., -1.4055, -1.5980, -1.3004],
         [-1.5455, -1.6155, -1.5980,  ..., -1.5105, -1.5105, -1.4055],
         [-1.4055, -1.2304, -1.5105,  ..., -1.2829, -1.5805, -1.4405]],

        [[-1.0898, -1.3861, -1.0376,  ..., -0.8981, -1.0550, -0.9504],
        

# 모델 정의

In [17]:
class BaseModel(nn.Module):
    def __init__(self, num_classes = len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

# 학습

In [18]:
def train(model, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_score = 0
    best_model = None

    for epoch in range(1,CFG["EPOCHS"]+1):
        model.train()
        train_loss = []
        for img, label in tqdm(iter(train_loader)):
            img, label = img.float().to(device), label.to(device)
            
            optimizer.zero_grad()
            model_pred = model(img)
            loss = criterion(model_pred, label)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())

        tr_loss = np.mean(train_loss)
            
        val_loss, val_score = validation(model, criterion, test_loader, device)

        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')

        if scheduler is not None:
            scheduler.step()

        if best_score < val_score:
            best_model = model
            best_score = val_score

    return best_model

In [19]:
def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")

def validation(model, criterion, test_loader, device):
    model.eval()
    
    model_preds = []
    true_labels = []
    
    val_loss = []
    
    with torch.no_grad():
        for img, label in tqdm(iter(test_loader)):
            img, label = img.float().to(device), label.to(device)
            
            model_pred = model(img)
            
            loss = criterion(model_pred, label)
            
            val_loss.append(loss.item())
            
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()
        
    val_f1 = competition_metric(true_labels, model_preds)
    return np.mean(val_loss), val_f1

# 시작

In [20]:
model = BaseModel()
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = None

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [1], Train Loss : [2.29841] Val Loss : [1.47752] Val F1 Score : [0.50580]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [2], Train Loss : [0.78184] Val Loss : [1.17181] Val F1 Score : [0.61499]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [3], Train Loss : [0.29980] Val Loss : [1.11390] Val F1 Score : [0.62354]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [4], Train Loss : [0.13787] Val Loss : [1.13421] Val F1 Score : [0.65414]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [5], Train Loss : [0.09095] Val Loss : [1.16459] Val F1 Score : [0.64698]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [6], Train Loss : [0.07354] Val Loss : [1.16702] Val F1 Score : [0.65977]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [7], Train Loss : [0.05357] Val Loss : [1.18831] Val F1 Score : [0.67239]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [8], Train Loss : [0.04812] Val Loss : [1.28416] Val F1 Score : [0.66217]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [9], Train Loss : [0.04633] Val Loss : [1.34674] Val F1 Score : [0.65388]


  0%|          | 0/74 [00:00<?, ?it/s]

  0%|          | 0/19 [00:00<?, ?it/s]

Epoch [10], Train Loss : [0.04538] Val Loss : [1.35756] Val F1 Score : [0.64960]


# 추론

In [21]:
test_df = pd.read_csv("./test.csv")
test_df.head()

Unnamed: 0,id,img_path
0,TEST_00000,./test/TEST_00000.jpg
1,TEST_00001,./test/TEST_00001.jpg
2,TEST_00002,./test/TEST_00002.jpg
3,TEST_00003,./test/TEST_00003.jpg
4,TEST_00004,./test/TEST_00004.jpg


In [22]:
test_img_paths= get_data(test_df, infer = True)

test_dataset = CustomDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size = CFG["BATCH_SIZE"], shuffle = False, num_workers = 0)

In [23]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()

    model_preds = []

    with torch.no_grad():
        for img in tqdm(iter(test_loader)):
            img = img.float().to(device)

            model_pred = model(img)
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()

    print("finish")
    return model_preds

In [24]:
preds= inference(infer_model, test_loader, device)

  0%|          | 0/198 [00:00<?, ?it/s]

finish


In [26]:
preds= le.inverse_transform(preds)

# 제출

In [27]:
submit = pd.read_csv("./sample_submission.csv")
submit.head()

Unnamed: 0,id,artist
0,TEST_00000,Edgar Degas
1,TEST_00001,Edgar Degas
2,TEST_00002,Edgar Degas
3,TEST_00003,Edgar Degas
4,TEST_00004,Edgar Degas


In [28]:
submit["artist"] = preds
submit.head()

Unnamed: 0,id,artist
0,TEST_00000,Edgar Degas
1,TEST_00001,Edgar Degas
2,TEST_00002,Edgar Degas
3,TEST_00003,Albrecht Du rer
4,TEST_00004,Vincent van Gogh


In [29]:
submit.to_csv('./submit.csv', index=False)