In [None]:
## vscode 

# python: 인터프리터 선택 > conda env 동일하게 선택

## Import

In [None]:
# !pip install torch torchvision numpy matplotlib cv2 scikit-learn tqdm

In [None]:
import os
import cv2
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

from tqdm.auto import tqdm

import torchvision
import torchvision.models as models
import torchvision.transforms as transforms

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore') 

In [None]:
writer = SummaryWriter('runs/artist_experiment')

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
# elif torch.backends.mps.is_available():
#     device = torch.device('mps')
else:
    device = torch.device('cpu')

print("Using PyTorch version:", torch.__version__,' Device:', device)

## Hyperparameter Setting

In [None]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':64,
    'SEED':41
}

## Fixed RandomSeed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Pre-processing

In [None]:
df = pd.read_csv('./data/train.csv')
df['img_path'] = df['img_path'].str.replace('./train/', './data/train/')
df.head()

In [None]:
# Label Encoding
le = preprocessing.LabelEncoder()
df['artist'] = le.fit_transform(df['artist'].values)

## Train / Validation Split

In [None]:
train_df, val_df, _, _ = train_test_split(df, df['artist'].values, test_size=0.2, random_state=CFG['SEED'])

In [None]:
train_df = train_df.sort_values(by=['id'])
train_df.head()

In [None]:
val_df = val_df.sort_values(by=['id'])
val_df.head()

## Data Load

In [None]:
def get_data(df, infer=False):
    if infer:
        return df['img_path'].values
    return df['img_path'].values, df['artist'].values

In [None]:
train_img_paths, train_labels = get_data(train_df)
val_img_paths, val_labels = get_data(val_df)

## CustomDataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transforms is not None:
            image = self.transforms(image)
        
        if self.labels is not None:
            label = self.labels[index]
            return image, label
        else:
            return image
    
    def __len__(self):
        return len(self.img_paths)

In [None]:
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

In [None]:
train_dataset = CustomDataset(train_img_paths, train_labels, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val_img_paths, val_labels, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

## Model Define

In [None]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [None]:
def matplotlib_imshow(img, one_channel=False):
    if one_channel:
        img = img.mean(dim=0)
    img = img / 2 + 0.5
    npimg = img.numpy()
    if one_channel:
        plt.imshow(npimg, cmap="Greys")
    else:
        plt.imshow(np.transpose(npimg, (1, 2, 0)))

def images_to_probs(net, images):
    '''
    학습된 신경망과 이미지 목록으로부터 예측 결과 및 확률을 생성합니다
    '''
    output = net(images)
    _, preds_tensor = torch.max(output, 1)
    preds = np.squeeze(preds_tensor.numpy())
    return preds, [F.softmax(el, dim=0)[i].item() for i, el in zip(preds, output)]

def plot_classes_preds(net, images, labels, classes):
    '''
    학습된 신경망과 배치로부터 가져온 이미지 / 라벨을 사용하여 matplotlib
    Figure를 생성합니다. 이는 신경망의 예측 결과 / 확률과 함께 정답을 보여주며,
    예측 결과가 맞았는지 여부에 따라 색을 다르게 표시합니다. "images_to_probs"
    함수를 사용합니다.
    '''
    preds, probs = images_to_probs(net, images)
    fig = plt.figure(figsize=(12, 48))
    for idx in np.arange(4):
        ax = fig.add_subplot(1, 4, idx+1, xticks=[], yticks=[])
        matplotlib_imshow(images[idx], one_channel=True)
        ax.set_title("{0}, {1:.1f}%\n(label: {2})".format(
            classes[preds[idx]],
            probs[idx] * 100.0,
            classes[labels[idx]]),
                    color=("green" if preds[idx]==labels[idx].item() else "red"))
    return fig

## Train

In [None]:
def train(model, optimizer, train_loader, device, writer, epoch, classes):
    model.train()
    train_loss = []
    criterion = nn.CrossEntropyLoss().to(device)
    
    for img, label in tqdm(iter(train_loader)):
        img, label = img.float().to(device), label.to(device)

        optimizer.zero_grad()

        model_pred = model(img)

        loss = criterion(model_pred, label)

        loss.backward()
        optimizer.step()

        train_loss.append(loss.item())

    tr_loss = np.mean(train_loss)
    
    writer.add_scalar('Loss/train', tr_loss, epoch)    
    writer.add_figure('predictions vs. actuals', plot_classes_preds(model, images, labels, classes), global_step=epoch)
    
    return tr_loss

def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")

def validation(model, test_loader, device, writer, epoch):
    model.eval()
    
    model_preds = []
    true_labels = []
    
    val_loss = []
    criterion = nn.CrossEntropyLoss().to(device)
    
    with torch.no_grad():
        for img, label in tqdm(iter(test_loader)):
            img, label = img.float().to(device), label.to(device)

            model_pred = model(img)

            loss = criterion(model_pred, label)

            val_loss.append(loss.item())

            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()

    val_f1 = competition_metric(true_labels, model_preds)
    val_loss = np.mean(val_loss)
    
    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('F1/val', val_f1, epoch)
    return val_loss, val_f1

def train_and_evaluate(model, optimizer, train_loader, test_loader, scheduler, device, epochs, writer, classes):
    model.to(device)

    best_score = 0
    best_model = None
    
    for epoch in range(1, epochs+1):
        tr_loss = train(model, optimizer, train_loader, device, writer, epoch, classes)
        val_loss, val_score = validation(model, test_loader, device, writer, epoch)
        
        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')

        if scheduler is not None:
            scheduler.step()
        
        if best_score < val_score:
            best_model = model.state_dict()
            best_score = val_score

    return best_model

## Run!!

In [None]:
model = BaseModel().to(device)
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)

In [None]:
dataiter = iter(train_loader)
images, labels = next(dataiter)

img_grid = torchvision.utils.make_grid(images)
matplotlib_imshow(img_grid, one_channel=True)

writer.add_image('four_artist_images', img_grid)
writer.add_graph(model, images)

# Log image grid and model graph
writer.add_graph(model, images)

In [None]:
infer_model = train_and_evaluate(model, optimizer, train_loader, val_loader, scheduler, device, CFG["EPOCHS"], writer, le.classes_)

## Inference

In [None]:
test_df = pd.read_csv('./data/test.csv')
test_df['img_path'] = test_df['img_path'].str.replace('./train/', './data/train/')
test_df.head()

In [None]:
test_img_paths = get_data(test_df, infer=True)

In [None]:
test_dataset = CustomDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    
    model_preds = []
    
    with torch.no_grad():
        for img in tqdm(iter(test_loader)):
            img = img.float().to(device)
            
            model_pred = model(img)
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
    
    print('Done.')
    return model_preds

In [None]:
preds = inference(infer_model, test_loader, device)

In [None]:
preds = le.inverse_transform(preds) # LabelEncoder로 변환 된 Label을 다시 화가이름으로 변환

## Submit

In [None]:
submit = pd.read_csv('./data/sample_submission.csv')
submit.head()

In [None]:
submit['artist'] = preds
submit.head()

In [None]:
submit.to_csv('./data/submit.csv', index=False)