### import

In [34]:
import torch
from torch import nn
from torch.nn import functional
from torch import optim
from torch.utils.data import random_split

from torchvision.datasets import FashionMNIST 
from torchvision import transforms as tr

import os
import sys
import numpy as np
from tqdm import tqdm # train할때 progress bar를 보여줌
import matplotlib.pyplot as plt

In [35]:
project_root = 'C:/Users/rlduf/.vscode/study/DeepLearning'
sys.path.append(project_root)

In [36]:
# os.getcwd() -> 현재 경로
data_root = os.path.join(os.getcwd(), 'data') # 현재 경로에서 data_root 경로를 지정

### Preprocessing & Dataset

In [37]:
transform = tr.Compose( 
    [
        tr.ToTensor(), # PIL Image나 NumPy ndarray 를 FloatTensor 로 변환하고, 이미지의 픽셀의 크기(intensity) 값을 [0., 1.] 범위로 비례하여 조정(scale)
        tr.Normalize([0.5], [0.5]) # Normalize a tensor image with mean and standard deviation
    ]
)

fashion_mnist_dataset = FashionMNIST(data_root, download=True, train=True, transform=transform) 

In [38]:
# torch.utils.data.random_split을 사용할 수 있음
# 하지만 이 방법은 label의 비율을 고려하지 않았기 때문에 정확도가 떨어질 수 있음
# 따라서 data_utils.py 파일의 dataset_split 함수 import
from data_utils import dataset_split 

In [39]:
dataset = dataset_split(fashion_mnist_dataset, split=[0.9, 0.1])
print(dataset) # dict 형태

{'train': <torch.utils.data.dataset.Subset object at 0x000001E6151ADFC8>, 'val': <torch.utils.data.dataset.Subset object at 0x000001E6151ADC08>}


In [40]:
train_dataset = dataset['train']
val_dataset = dataset['val']

train_batch_size = 128
val_batch_size = 32

### DataLoader

In [41]:
train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=train_batch_size, shuffle=True, num_workers=1 # num_workers는 병렬처리할 때 사용
)

val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=val_batch_size, shuffle=False, num_workers=1
)

### Model

In [42]:
class MLP(nn.Module):
    def __init__(self, in_dim, h1_dim, h2_dim, out_dim):
        super(MLP, self).__init__()
        self.linear1 = nn.Linear(in_dim, h1_dim)
        self.linear2 = nn.Linear(h1_dim, h2_dim)
        self.linear3 = nn.Linear(h2_dim, out_dim)

    def forward(self, input):
        x = torch.flatten(input, start_dim = 1)
        # forward method에서 사용할거면 nn.functional.relu를 사용 / nn.Sequential 안에 사용할거면 nn.ReLU()를 사용
        x = functional.relu(self.linear1(x))
        x = functional.relu(self.linear2(x))
        out = self.linear3(x)
        return out

### Loss Fucntion, Optimization, Tensorboard

In [43]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [44]:
# model
model = MLP(28*28, 128, 64, 10).to(device)

# define loss
loss_function = nn.CrossEntropyLoss()

# define optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
max_epoch = 10

In [45]:
# do train with validation.
train_step = 0
for epoch in range(1, max_epoch+1):
    # valid step
    with torch.no_grad():
        val_loss = 0.0
        val_corrects = 0
        model.eval()

        for val_batch_idx, (val_images, val_labels) in enumerate(
            tqdm(val_dataloader, position=0, leave=True, desc="validation")
        ):
            val_images, val_labels = val_images.to(device), val_labels.to(device)
            # forward
            val_outputs = model(val_images)
            _, val_preds = torch.max(val_outputs, 1)
            
            # loss & acc
            val_loss += loss_function(val_outputs, val_labels) / val_outputs.shape[0]
            val_corrects += torch.sum(val_preds == val_labels.data) / val_outputs.shape[0]
    
    # valid step logging
    val_epoch_loss = val_loss / len(val_dataloader)
    val_epoch_acc = val_corrects / len(val_dataloader)
    
    print(
        f"{epoch} epoch, {train_step} step: val_loss: {val_epoch_loss}, val_acc: {val_epoch_acc}\n"
    )

    # check model early stopping point & save model if the model reached the best performance.
    early_stopper(val_epoch_loss, model)
    if early_stopper.early_stop:
        break
    
    # train step
    current_loss = 0
    current_corrects = 0
    model.train()

    for batch_idx, (images, labels) in enumerate(
         tqdm(train_dataloader, position=0, leave=True, desc="training")
    ):
        current_loss = 0.0
        current_corrects = 0

        # Forward
        # get predictions
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        
        # get loss (Loss 계산)
        loss = loss_function(outputs, labels)

        # Backpropagation
        # optimizer 초기화 (zero화)
        optimizer.zero_grad()

        # Perform backward pass
        loss.backward()

        # Perform Optimization
        optimizer.step()

        current_loss += loss.item()
        current_corrects += torch.sum(preds == labels.data)

        if train_step % 100 == 0:
            train_loss = current_loss / 100
            train_acc = current_corrects / 100

            print(
                f"{train_step}: train_loss: {train_loss}, train_acc: {train_acc}"
            )
            current_loss = 0
            current_corrects = 0

        train_step += 1

    print('\n')

validation: 100%|██████████| 188/188 [00:04<00:00, 42.59it/s] 


epoch:1 -> val_loss:2.325981  val_acc:0.09066666662693024


training:   0%|          | 0/422 [00:01<?, ?it/s]


NameError: name 'log_interval' is not defined

In [None]:
# save model
os.makedirs("C:/Users/rlduf/.vscode/study/DeepLearning/logs/models", exist_ok=True)
torch.save(model, "C:/Users/rlduf/.vscode/study/DeepLearning/logs/models/MLP.ckpt")

In [None]:
# load model
loaded_model = torch.load("C:/Users/rlduf/.vscode/study/DeepLearning/logs/models/MLP.ckpt")
loaded_model.eval()
print(loaded_model)

In [None]:
def softmax(x, axis=0):
    "numpy softmax"
    max = np.max(x, axis=axis, keepdims=True)
    e_x = np.exp(x - max)
    sum = np.sum(e_x, axis=axis, keepdims=True)
    f_x = e_x / sum
    return f_x

In [None]:
test_batch_size = 100
test_dataset = FashionMNIST(data_root, download=True, train=False, transform=transforms.ToTensor())
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False, num_workers=1)

test_labels_list = []
test_preds_list = []
test_outputs_list = []

for i, (test_images, test_labels) in enumerate(tqdm(test_dataloader, position=0, leave=True, desc="testing")):
    # forward
    test_images, test_labels = test_images.to(device), test_labels.to(device)
    test_outputs = loaded_model(test_images)
    _, test_preds = torch.max(test_outputs, 1)

    final_outs = softmax(test_outputs.detach().cpu().numpy(), axis=1)
    test_outputs_list.extend(final_outs)
    test_preds_list.extend(test_preds.detach().cpu().numpy())
    test_labels_list.extend(test_labels.detach().cpu().numpy())

# sklearn library의 성능 평가를 위해 ndarray로 변환
test_preds_list = np.array(test_preds_list)
test_labels_list = np.array(test_labels_list)

print(f"\nacc: {np.mean(test_preds_list == test_labels_list)*100}%")

In [None]:
# ROC Curve
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score

fpr = {}
tpr = {}
thresh = {}
n_class = 10

for i in range(n_class):
    fpr[i], tpr[i], thresh[i] = roc_curve(test_labels_list, np.array(test_outputs_list)[:, i], pos_label=i)

# plot.
for i in range(n_class):
    plt.plot(fpr[i], tpr[i], linestyle="--", label=f"Class {i} vs Rest")
plt.title("Multi-class ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend(loc="best")
plt.show()

print("auc_score", roc_auc_score(test_labels_list, test_outputs_list, multi_class="ovo", average="macro"))