In [1]:
import numpy as np
import pandas as pd
import random
from PIL import Image

In [2]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split

import torchvision
from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize

In [3]:
import warnings
warnings.filterwarnings("ignore")

In [4]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [5]:
print ("PyTorch version:[%s]."%(torch.__version__))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #'cuda:0'
print ("device:[%s]."%(device))

PyTorch version:[1.7.1].
device:[cuda].


In [6]:
# seed 고정
def set_seed(random_seed):
    torch.manual_seed(random_seed)
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)  # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)
    
set_seed(42)

In [7]:
TRAIN_PATH = '/opt/ml/input/data/train/data.csv'

In [8]:
class Custom_Dataset(Dataset):
    def __init__(self, path, target, transform, train=True):
        super().__init__()
        self.train = train
        self.path = path
        self.transforms = transform
        self.target = target
        self.data = pd.read_csv(path, index_col=0).reset_index(drop=True).copy()
        self.img_paths = self.data['path']
        self.labels = list(self.data[target])
        for i in range(len(self.labels)):
            self.labels[i] = int(self.labels[i])

    def __len__(self):
        return len(self.data)
        

    def __getitem__(self, idx):
        X, y = None, None
        im = Image.open(self.img_paths[idx])

        if self.transforms is not None:
            y = self.labels[idx]
            X = self.transforms(im)

        if self.train:
            return X, y
        
        return X

In [9]:
transform = transforms.Compose([
    transforms.Resize((512, 384), Image.BILINEAR),
    transforms.CenterCrop(224),
    transforms.RandomHorizontalFlip(0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.2, 0.2, 0.2)),
])

In [10]:
train_dataset = Custom_Dataset(path=TRAIN_PATH, target='class', transform=transform, train=True)

In [11]:
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)

In [12]:
import timm

model = timm.create_model('vit_tiny_patch16_224',num_classes = 18,pretrained=True)

In [13]:
model

VisionTransformer(
  (patch_embed): PatchEmbed(
    (proj): Conv2d(3, 192, kernel_size=(16, 16), stride=(16, 16))
    (norm): Identity()
  )
  (pos_drop): Dropout(p=0.0, inplace=False)
  (blocks): Sequential(
    (0): Block(
      (norm1): LayerNorm((192,), eps=1e-06, elementwise_affine=True)
      (attn): Attention(
        (qkv): Linear(in_features=192, out_features=576, bias=True)
        (attn_drop): Dropout(p=0.0, inplace=False)
        (proj): Linear(in_features=192, out_features=192, bias=True)
        (proj_drop): Dropout(p=0.0, inplace=False)
      )
      (drop_path): Identity()
      (norm2): LayerNorm((192,), eps=1e-06, elementwise_affine=True)
      (mlp): Mlp(
        (fc1): Linear(in_features=192, out_features=768, bias=True)
        (act): GELU()
        (fc2): Linear(in_features=768, out_features=192, bias=True)
        (drop): Dropout(p=0.0, inplace=False)
      )
    )
    (1): Block(
      (norm1): LayerNorm((192,), eps=1e-06, elementwise_affine=True)
      (attn): 

In [14]:
class F1_Loss(nn.Module):
    def __init__(self, classes=18, epsilon=1e-7):
        super().__init__()
        self.classes = classes
        self.epsilon = epsilon

    def forward(self, y_pred, y_true):
        assert y_pred.ndim == 2
        assert y_true.ndim == 1

        y_true = F.one_hot(y_true, self.classes).to(torch.float32)
        y_pred = F.softmax(y_pred, dim=1)

        tp = (y_true * y_pred).sum(dim=0).to(torch.float32)
        tn = ((1 - y_true) * (1 - y_pred)).sum(dim=0).to(torch.float32)
        fp = ((1 - y_true) * y_pred).sum(dim=0).to(torch.float32)
        fn = (y_true * (1 - y_pred)).sum(dim=0).to(torch.float32)

        precision = tp / (tp + fp + self.epsilon)
        recall = tp / (tp + fn + self.epsilon)

        f1 = 2 * (precision * recall) / (precision + recall + self.epsilon)
        f1 = f1.clamp(min=self.epsilon, max=1 - self.epsilon)
        return (1 - f1.mean())

In [15]:
model.to(device)

lr_rate = 0.002
epochs = 50
alpha = 0.8

loss_fn = F1_Loss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr_rate)

In [16]:
from sklearn.metrics import f1_score
torch.cuda.empty_cache()

best_loss = 1e9

model.train()

for epoch in range(epochs):
    running_loss = 0.0
    running_acc = 0.0
    n_iter = 0
    epoch_f1 = 0.0

    for index, (images, labels) in enumerate(train_dataloader):
        images = torch.stack(list(images), dim=0).to(device)
        labels = torch.tensor(list(labels)).to(device)


        optimizer.zero_grad()
        logits = model(images)
        _, preds = torch.max(logits, 1)
        loss1 = loss_fn(logits, labels)
        loss2 = F.cross_entropy(logits, labels)
        loss = loss1 * (1 - alpha) + loss2 * alpha
        

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        running_acc += torch.sum(preds == labels.data)
        epoch_f1 += f1_score(labels.cpu().numpy(), preds.cpu().numpy(), average='macro')
        n_iter += 1
    
    epoch_loss = running_loss / len(train_dataloader.dataset)
    epoch_acc = running_acc / len(train_dataloader.dataset)
    epoch_f1 = epoch_f1/n_iter

    print(f"Epoch: {epoch} -  Loss : {epoch_loss:.3f},  Accuracy : {epoch_acc:.3f},  F1-Score : {epoch_f1:.4f}")
    if epoch_loss < best_loss:
        PATH = './checkpoint/' +"model_saved.pt"
        torch.save({'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': loss,
                    }, PATH)
        best_loss = epoch_loss

Epoch: 0 -  Loss : 1.949,  Accuracy : 0.260,  F1-Score : 0.1025
Epoch: 1 -  Loss : 1.625,  Accuracy : 0.374,  F1-Score : 0.2121
Epoch: 2 -  Loss : 1.526,  Accuracy : 0.419,  F1-Score : 0.2552
Epoch: 3 -  Loss : 1.553,  Accuracy : 0.415,  F1-Score : 0.2482
Epoch: 4 -  Loss : 1.460,  Accuracy : 0.448,  F1-Score : 0.2814
Epoch: 5 -  Loss : 1.322,  Accuracy : 0.497,  F1-Score : 0.3302
Epoch: 6 -  Loss : 1.277,  Accuracy : 0.519,  F1-Score : 0.3583
Epoch: 7 -  Loss : 1.249,  Accuracy : 0.531,  F1-Score : 0.3658
Epoch: 8 -  Loss : 1.340,  Accuracy : 0.501,  F1-Score : 0.3334
Epoch: 9 -  Loss : 1.202,  Accuracy : 0.549,  F1-Score : 0.3829
Epoch: 10 -  Loss : 1.108,  Accuracy : 0.586,  F1-Score : 0.4223
Epoch: 11 -  Loss : 1.069,  Accuracy : 0.600,  F1-Score : 0.4372
Epoch: 12 -  Loss : 0.991,  Accuracy : 0.632,  F1-Score : 0.4745
Epoch: 13 -  Loss : 0.985,  Accuracy : 0.630,  F1-Score : 0.4709
Epoch: 14 -  Loss : 0.951,  Accuracy : 0.646,  F1-Score : 0.4974
Epoch: 15 -  Loss : 0.909,  Accurac

In [19]:
import os

class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
        return image

    def __len__(self):
        return len(self.img_paths)

test_dir = '/opt/ml/input/data/eval'
submission = pd.read_csv(os.path.join(test_dir, 'info.csv'))
image_dir = os.path.join(test_dir, 'images')

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]
transform = transforms.Compose([
    transforms.Resize((512, 384), Image.BILINEAR),
    transforms.CenterCrop(224),
    transforms.RandomHorizontalFlip(0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.2, 0.2, 0.2)),
])
dataset = TestDataset(image_paths, transform)

loader = DataLoader(
    dataset,
    shuffle=False
)

# 모델을 정의합니다. (학습한 모델이 있다면 torch.load로 모델을 불러주세요!)
model.load_state_dict(torch.load('./checkpoint/model_saved.pt')['model_state_dict'])
model.eval()

# 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
all_predictions = []
for images in loader:
    with torch.no_grad():
        images = images.to(device)
        pred = model(images)
        pred = pred.argmax(dim=-1)
        all_predictions.extend(pred.cpu().numpy())
submission['ans'] = all_predictions

# 제출할 파일을 저장합니다.
submission.to_csv(os.path.join(test_dir, 'submission.csv'), index=False)
print('test inference is done!')


test inference is done!
