In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# import numpy as np
# import pandas as pd 
# import glob
# from torch.utils.data import DataLoader, Dataset
# from sklearn.model_selection import StratifiedKFold
# from torch.optim.lr_scheduler import ReduceLROnPlateau
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import roc_auc_score

from PIL import Image
import matplotlib.pyplot as plt
# import time
# import copy
# import cv2
# import random
# from tqdm import tqdm

# import torch.nn as nn
# import torch
# from torchvision import models, transforms
# import torch.nn.functional as F

# import warnings
# warnings.filterwarnings('ignore')

# import os


In [None]:
import random
import pandas as pd
import numpy as np
import os
import cv2

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from tqdm.auto import tqdm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torchvision.models as models

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore') 

In [None]:
import torch, gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
# hyperparameter setting

CFG = {
    'IMG_SIZE':224,
    'EPOCHS':15,   # 학습 횟수
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':32, 
    'SEED':41
}

In [None]:
# fixed randomseed

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [None]:
# data pre_processing

df = pd.read_csv('/content/drive/MyDrive/artclassification/datas/train.csv')
df.head()

Unnamed: 0,id,img_path,artist
0,0,./train/0000.jpg,Diego Velazquez
1,1,./train/0001.jpg,Vincent van Gogh
2,2,./train/0002.jpg,Claude Monet
3,3,./train/0003.jpg,Edgar Degas
4,4,./train/0004.jpg,Hieronymus Bosch


In [None]:
pd.set_option('display.max_colwidth', -1)

#데이터프레임의 row 500(물론 column 값도 지정가능합니다)
pd.set_option('display.max_row', 500)

In [None]:
df['img_path']=['/content/drive/MyDrive/artclassification/datas'+path[1:]for path in df['img_path']]
df.head()

Unnamed: 0,id,img_path,artist
0,0,/content/drive/MyDrive/artclassification/datas/train/0000.jpg,Diego Velazquez
1,1,/content/drive/MyDrive/artclassification/datas/train/0001.jpg,Vincent van Gogh
2,2,/content/drive/MyDrive/artclassification/datas/train/0002.jpg,Claude Monet
3,3,/content/drive/MyDrive/artclassification/datas/train/0003.jpg,Edgar Degas
4,4,/content/drive/MyDrive/artclassification/datas/train/0004.jpg,Hieronymus Bosch


In [None]:
# label encoding

le = preprocessing.LabelEncoder()
df['artist'] = le.fit_transform(df['artist'].values)  # 카테고리형 수치형으로 변환

In [None]:
#train/validation split

train_df, val_df, _, _ = train_test_split(df, df['artist'].values, test_size=0.2, random_state=CFG['SEED'])

In [None]:
train_df = train_df.sort_values(by=['id'])
train_df.head()

Unnamed: 0,id,img_path,artist
0,0,/content/drive/MyDrive/artclassification/datas/train/0000.jpg,9
2,2,/content/drive/MyDrive/artclassification/datas/train/0002.jpg,7
3,3,/content/drive/MyDrive/artclassification/datas/train/0003.jpg,10
5,5,/content/drive/MyDrive/artclassification/datas/train/0005.jpg,38
6,6,/content/drive/MyDrive/artclassification/datas/train/0006.jpg,43


In [None]:
val_df = val_df.sort_values(by=['id'])
val_df.head()

Unnamed: 0,id,img_path,artist
1,1,/content/drive/MyDrive/artclassification/datas/train/0001.jpg,48
4,4,/content/drive/MyDrive/artclassification/datas/train/0004.jpg,24
17,17,/content/drive/MyDrive/artclassification/datas/train/0017.jpg,10
21,21,/content/drive/MyDrive/artclassification/datas/train/0021.jpg,29
29,29,/content/drive/MyDrive/artclassification/datas/train/0029.jpg,28


In [None]:
# data load

def get_data(df, infer=False):  # infer ???
    if infer:
        return df['img_path'].values
    return df['img_path'].values, df['artist'].values

In [None]:
train_img_paths, train_labels = get_data(train_df)
val_img_paths, val_labels = get_data(val_df)

In [None]:
train_df

Unnamed: 0,id,img_path,artist
0,0,/content/drive/MyDrive/artclassification/datas/train/0000.jpg,9
2,2,/content/drive/MyDrive/artclassification/datas/train/0002.jpg,7
3,3,/content/drive/MyDrive/artclassification/datas/train/0003.jpg,10
5,5,/content/drive/MyDrive/artclassification/datas/train/0005.jpg,38
6,6,/content/drive/MyDrive/artclassification/datas/train/0006.jpg,43
...,...,...,...
5906,5906,/content/drive/MyDrive/artclassification/datas/train/5906.jpg,40
5907,5907,/content/drive/MyDrive/artclassification/datas/train/5907.jpg,37
5908,5908,/content/drive/MyDrive/artclassification/datas/train/5908.jpg,35
5909,5909,/content/drive/MyDrive/artclassification/datas/train/5909.jpg,35


In [None]:
# 이미지 확인

import matplotlib.image as mpimg
# # # for img in os.listdir('/content/drive/MyDrive/artclassification/datas/train'):
# image=mpimg.imread('/content/drive/MyDrive/artclassification/datas/train/0002.jpg')
# # plt.imshow(image)
# # plt.show()
# image.shape


In [None]:
# customdataset

class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transforms = transforms

    def __getitem__(self, index):  # index에 해당하는 입출력 데이터를 반환
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)  # 이미지 읽어오기
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) #brg 색상을 rgb로 변경
        if self.transforms is not None:     # transform, Albumentation 적용을 위함
            image = self.transforms(image=image)['image'] # (image(파라미터 명)=image(삽입 이미지 명))
                                                            #['image']를 붙여야 image반환 안 붙이면 dict반환
        if self.labels is not None:
            label = self.labels[index]  # label이 None가 아니라면
            return image, label         # image label 반환
        else:
            return image
    
    def __len__(self):
        return len(self.img_paths)

In [None]:
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE']*2,CFG['IMG_SIZE']*2,), 
                            A.RandomCrop(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.CoarseDropout(always_apply=False, p=0.5, max_holes=8, max_height=16, max_width=16, min_holes=1, min_height=8, min_width=8),
                            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2,),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

In [None]:
# 파이토치 dataset기능은 상속한 클래스를 통해 학습 데이터(샘플) x와 레이블(정답) y를 묶은 컨테이너
# 이를 dataloader에 전달하여 dataset을 샘플에 쉽게 접근가능할 수 있도록 순회가능한 객체로 감쌈
# 여기서는 customdataset 사용

# dataloader를 통해 dataset의 전체 데이터가 batch size로 slice되어 공급됨 dataset을 input으로 넣어
# 여러 옵션을 통해 batch생성 dataloader는 iterator(반복, for문과 같음) 형식으로 데이터에 접근
# batch_size나 shuffle 유무 설정가능
train_dataset = CustomDataset(train_img_paths, train_labels, train_transform) # train image, label, albmetation결과 저장
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)
# num_workers= 멀티 프로세싱 관련 파라미터 학습도중 cpu의 작업을 몇 개의 코어를 사용할지
val_dataset = CustomDataset(val_img_paths, val_labels, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
# model define

class BaseModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True)
        # self.backbone = models.convnext_large(pretrained=True)
        # self.backbone = models.convnext_base(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self,x): 
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [None]:
# class BaseModel(nn.Module):    # 신경망 모델을 nn.Module의 하위 클래스로 정의
#     def __init__(self, num_classes=50):  # __init__로 산경망 계층 초기화
#         super(BaseModel, self).__init__() # super()함수를 통해 부모의 __init__메소드를 자식이 사용가능
#         self.backbone = models.efficientnet_b0(pretrained=True)
#         # self.backbone = models.convnext_base(pretrained=True)
#         self.backbone.classifier[2] = nn.Linear(1024, num_classes) # nn.Linear(input_dim, output_dim)
#         # forward메소드의 입력 데이터에 대한 연산을 구현

#     def forward(self,x): 
#         x = self.backbone(x)
#         return x

In [None]:
# def cutmix(batch, alpha):
#     data, targets = batch

#     indices = torch.randperm(data.size(0))
#     shuffled_data = data[indices]
#     shuffled_targets = targets[indices]

#     lam = np.random.beta(alpha, alpha)

#     image_h, image_w = data.shape[2:]
#     cx = np.random.uniform(0, image_w)
#     cy = np.random.uniform(0, image_h)
#     w = image_w * np.sqrt(1 - lam)
#     h = image_h * np.sqrt(1 - lam)
#     x0 = int(np.round(max(cx - w / 2, 0)))
#     x1 = int(np.round(min(cx + w / 2, image_w)))
#     y0 = int(np.round(max(cy - h / 2, 0)))
#     y1 = int(np.round(min(cy + h / 2, image_h)))

#     data[:, :, y0:y1, x0:x1] = shuffled_data[:, :, y0:y1, x0:x1]
#     targets = (targets, shuffled_targets, lam)

#     return data, targets

In [None]:
# cutmix

def rand_bbox(size, lam): # size : [B, C, W, H]
    W = size[2] # 이미지의 width
    H = size[3] # 이미지의 height
    cut_rat = np.sqrt(1. - lam)  # 패치 크기의 비율 정하기 #np.sqrt= numpy배열의 제곱근
    cut_w = np.int(W * cut_rat)  # 패치의 너비 
    cut_h = np.int(H * cut_rat)  # 패치의 높이

    # uniform
    # 기존 이미지의 크기에서 랜덤하게 값을 가져옵니다.(중간 좌표 추출)
    cx = np.random.randint(W) # np.random.randint= 균일 분포의 정수 난수 1개생성
    cy = np.random.randint(H)

    # 패치 부분에 대한 좌표값을 추출합니다.
    bbx1 = np.clip(cx - cut_w // 2, 0, W)  # np.clip(array, min, max)
    bby1 = np.clip(cy - cut_h // 2, 0, H)  # array 내의 element들에 대해서
    bbx2 = np.clip(cx + cut_w // 2, 0, W) # min 값 보다 작은 값들을 min값으로 바꿔주고
    bby2 = np.clip(cy + cut_h // 2, 0, H) # max값 보다 큰 값들을 max값으로 바꿔주는 함수

    return bbx1, bby1, bbx2, bby2

In [None]:
# train
def train(model, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device) # to.(device) 모델을 gpu에서 연산하도록

    criterion = nn.CrossEntropyLoss().to(device) # loss함수 softmax자동 적용(다중 클래스분류에 사용되는 활성화 함수, 각 클래스에 속할 확률 계산)
                                                # 정답과 예측한 값 사이의 entropy를 계산
    best_score = 0
    best_model = None
    
    for epoch in range(1,CFG["EPOCHS"]+1):
        model.train() # 모델을 학습모드로 변환 / 평가 모드는 model.eval()
        train_loss = []
        for img, label in tqdm(iter(train_loader)): # tqdm으로 감싸 진행률 출력, iter은 순회가능한 객체를 받아 iterator로 변환 iter은 한번 출력하면 값이 사라짐
            img, label = img.float().to(device), label.to(device) # img와 label을 설정devlce(여기서는 gpu)로 보냄
            
            optimizer.zero_grad() # 반복할 때 마다 기울기를 새로 계산하므로 해당 명령으로 초기화

            model_pred = model(img)
            
            loss = criterion(model_pred, label)  #nn.CrossEntropyLoss에 (예측값, 정답) 전달

            loss.backward() # Require_grad=True로 설정된 모든 tensor에 대해 gradient를 계산
                            # 역전파에서 gradient를 계산하는 starting point가 loss값이기에 loss변수에 적용
            optimizer.step() # 이전 단계에서 계산된 loss를 통해 파라미터를 최적화(optimize)
                              # 아래에서 optimizer는 adam으로 설정 , step을 통해 parameter업데이트

            train_loss.append(loss.item()) #loss 값 train_loss에 추가

        tr_loss = np.mean(train_loss) # tr_loss=train_loss의 평균
            
        val_loss, val_score = validation(model, criterion, test_loader, device)
            
        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')
        
        if scheduler is not None:
            scheduler.step()
            
        if best_score < val_score:
            best_model = model
            best_score = val_score
        
    return best_model

In [None]:
def competition_metric(true, pred):
    return f1_score(true, pred, average="macro") #f1_score(정답, 예측값, 매크로 평균)

def validation(model, criterion, test_loader, device):
    model.eval() # model.eval() 모델을 평가모드로
    
    model_preds = []
    true_labels = []
    
    val_loss = []
    
    with torch.no_grad():  # autograd를 끔으로써 메모리 사용량 줄이고 연산 속도 높임
        for img, label in tqdm(iter(test_loader)):  # tqdm으로 감싸 진행률 출력, iter은 순회가능한 객체를 받아 iterator로 변환 iter은 한번 출력하면 값이 사라짐
            img, label = img.float().to(device), label.to(device)
            
            model_pred = model(img)
            
            loss = criterion(model_pred, label)
            
            val_loss.append(loss.item())
            
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist() 
            true_labels += label.detach().cpu().numpy().tolist()
            # detach= tensor에서 이루어진 연산을 추적해 기록(grapg)이 기록에서 도함수계산되고 역전파 이루어짐
            # detach는 연산기록에서 분리한 tensor를 반환
            # cpu()=gpu에 올라간 tensor를 cpu로 복사
            # numpy()= tensor를 numpy로 변환하여 반환 저장공간을 공유하기에 하나 변경시 다른 하나도 변경
            #          cpu에 올라간 tensor만 numpy() 사용가능 
            # tolist()= list 변환  사용시 detach()-cpu()-numpy()순서로 사용
    val_f1 = competition_metric(true_labels, model_preds)  # f1스코어 계산
    return np.mean(val_loss), val_f1

In [None]:
# run
# epoch 을 더 늘려봐도 될 듯
model = BaseModel()
model.eval()
optimizer = torch.optim.AdamW(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = None

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-3dd342df.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-3dd342df.pth


  0%|          | 0.00/20.5M [00:00<?, ?B/s]

  0%|          | 0/148 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

In [None]:
test_df = pd.read_csv('/content/drive/MyDrive/artclassification/datas/test.csv')
test_df['img_path']=['/content/drive/MyDrive/artclassification/datas'+path[1:]for path in test_df['img_path']]
test_df.head()

Unnamed: 0,id,img_path
0,TEST_00000,/content/drive/MyDrive/artclassification/datas/test/TEST_00000.jpg
1,TEST_00001,/content/drive/MyDrive/artclassification/datas/test/TEST_00001.jpg
2,TEST_00002,/content/drive/MyDrive/artclassification/datas/test/TEST_00002.jpg
3,TEST_00003,/content/drive/MyDrive/artclassification/datas/test/TEST_00003.jpg
4,TEST_00004,/content/drive/MyDrive/artclassification/datas/test/TEST_00004.jpg


In [None]:
test_img_paths = get_data(test_df, infer=True)

In [None]:
test_dataset = CustomDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    
    model_preds = []
    
    with torch.no_grad():
        for img in tqdm(iter(test_loader)):
            img = img.float().to(device)
            
            model_pred = model(img)
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
    
    print('Done.')
    return model_preds

In [None]:
preds = inference(infer_model, test_loader, device)

  0%|          | 0/198 [00:00<?, ?it/s]

Done.


In [None]:
preds = le.inverse_transform(preds) # LabelEncoder로 변환 된 Label을 다시 화가이름으로 변환

In [None]:
preds

array(['Edgar Degas', 'Amedeo Modigliani', 'Caravaggio', ...,
       'Amedeo Modigliani', 'Titian', 'Vincent van Gogh'], dtype=object)

In [None]:
# 경로 다시 수정해 줘야함!!!!!!!!!!!!!!!!!!!!!!!
submit = pd.read_csv('/content/drive/MyDrive/artclassification/datas/sample_submission.csv')
submit.head()

Unnamed: 0,id,artist
0,TEST_00000,Edgar Degas
1,TEST_00001,Edgar Degas
2,TEST_00002,Edgar Degas
3,TEST_00003,Edgar Degas
4,TEST_00004,Edgar Degas


In [None]:
submit['artist'] = preds
submit.head()

Unnamed: 0,id,artist
0,TEST_00000,Edgar Degas
1,TEST_00001,Amedeo Modigliani
2,TEST_00002,Caravaggio
3,TEST_00003,Albrecht Du rer
4,TEST_00004,Amedeo Modigliani


In [None]:
submit.to_csv('/content/drive/MyDrive/artclassification/datas/submit5.csv', index=False)