# Import

In [None]:
# torch package
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


# dataset packagea
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import datasets, transforms
from sklearn.model_selection import train_test_split

# metrcis
from sklearn.metrics import accuracy_score, classification_report

# ETC
from tqdm import tqdm
import numpy as np
from IPython.display import clear_output
import matplotlib.pyplot as plt


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# VGG

In [None]:
class VGG(nn.Module):
    def __init__(self, vgg_type:int):
        super().__init__()
        '''
        [(H_in + 2P - K)/S] + 1
        '''
        # type 16 / 19
        self.vgg_type = vgg_type

        # (3, 224, 224) -> (64, 112, 112)
        self.conv_layer1 = nn.Sequential(\
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0))

        # (64, 112, 112) -> (128, 56, 56)
        self.conv_layer2 = nn.Sequential(\
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0))

        # (128, 56, 56) -> (256, 28, 28)
        self.conv_layer3 = nn.Sequential(\
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0))

        # (256, 28, 28) -> (512, 14, 14)
        self.conv_layer4 = nn.Sequential(\
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU())

        # (512, 14, 14) -> (512, 7, 7)
        self.conv_layer5 = nn.Sequential(\
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU())


        self.MP_layer = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)


        self.add_conv_layer = nn.Sequential(\
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(512), nn.ReLU())


        self.fc_layer = nn.Sequential(\
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Linear(4096, 10)) # 원래는 1000 이지만 데이터셋을 CIFAR10 으로 해서 마지막 출력 노드가 10으로 설정



    def forward(self, x):
        out = self.conv_layer1(x)
        out = self.conv_layer2(out)
        out = self.conv_layer3(out)

        out = self.conv_layer4(out)
        if self.vgg_type == 19:
            out = self.add_conv_layer(out)
        out = self.MP_layer(out)

        out = self.conv_layer5(out)
        if self.vgg_type == 19:
            out = self.add_conv_layer(out)
        out = self.MP_layer(out)

        out = out.view(out.size(0), -1)
        vgg16_output = self.fc_layer(out)

        return vgg16_output

# Train

In [None]:
def train(model, config, train, valid):
    train_loss_history = []
    valid_loss_history = []

    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
    loss_function = nn.CrossEntropyLoss()


    for i in range(config.epochs):

        train_loss_ = 0
        model.train()
        with torch.enable_grad():
            for feature, label in train:
                feature, label = feature.to(config.device), label.to(config.device)
                optimizer.zero_grad()
                outputs = model(feature)
                loss = loss_function(outputs, label)
                loss.backward()
                optimizer.step()

                train_loss_ += loss.item()

        train_loss_ /= config.batch_size
        train_loss_history.append(train_loss_)


        valid_loss_ = 0
        model.eval()
        with torch.no_grad():
            for feature, label in valid:
                feature, label = feature.to(config.device), label.to(config.device)
                outputs = model(feature)
                loss = loss_function(outputs, label)

                valid_loss_ += loss.item()

        valid_loss_ /= config.batch_size
        valid_loss_history.append(valid_loss_)

        clear_output(wait=True) # 주피터 셀 초기화


        history_print = []
        # if (i+1)%int((config.epochs)*0.1)==0:
        log = f"epoch : {i+1} Loss(train) : {train_loss_history[-1]:.3f}  Loss(valid) : {valid_loss_history[-1]:.3f}"
        history_print.append(log)
        for log in history_print[-5:]:
            print(log)

        plt.plot(train_loss_history, label='Training loss')
        plt.plot(valid_loss_history, label='Validation loss')
        plt.ylim(0, 5)
        plt.legend()
        plt.show()



    print("="*40)
    print("Training loss: ", train_loss_history[-1])
    print("Validation loss: ", valid_loss_history[-1])
    print("="*40)

# Predict

In [None]:
def predict_evaluate(model, config, test):
    model.eval()
    output_list = []
    acc = []

    y_pred = torch.Tensor()
    y_test = torch.Tensor()
    with torch.no_grad():
        for feature, label in test:
            feature, label = feature.to(config.device), label.to(config.device)
            outputs = model(feature)
            output_list.append(outputs)
            outputs = outputs.cpu()
            _, outputs = torch.max(outputs, 1)

            label = label.cpu()

            acc.append(accuracy_score(label, outputs))
            y_pred = torch.cat((y_pred, outputs), dim=0)
            y_test = torch.cat((y_test, label), dim=0)


    print(f"Accuracy: {(sum(acc)/len(acc)):.3f}")

    return y_test, y_pred

# Hyperparameter config

In [None]:
class config():
    def __init__(self, device, learning_rate=0.001, epochs=100, batch_size = 128):
        self.learning_rate = learning_rate

        self.batch_size = batch_size
        self.epochs = epochs

        self.device = device

In [None]:
batch_size = 256

# Dataset

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_size = int(0.8 * len(train_dataset))
valid_size = len(train_dataset) - train_size

train_dataset, valid_dataset = random_split(train_dataset, [train_size, valid_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # 48000
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=True) # 12000
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)  # 10000

# Model

In [None]:
train_config = config(device = device,
                    learning_rate = 0.05,
                    epochs = 30,
                    batch_size = batch_size)

In [None]:
vgg_model = VGG(vgg_type=19).to(device)

In [None]:
train(vgg_model, train_config, train_loader, valid_loader)

# Evaluation

In [None]:
y_test, y_pred = predict_evaluate(vgg_model, train_config, test_loader)

In [None]:
print(classification_report(y_true=y_test, y_pred=y_pred))