# import torch

In [None]:
import mnist_train as mt

In [None]:
import torch
from torch import optim
torch.__version__

# GPU check

In [None]:
print("torch.cuda.is_availible(): ", torch.cuda.is_available())
print("torch.cuda.get_device_name(): ", torch.cuda.get_device_name())

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

# Download MNIST dataset

## What is MNIST dataset?
> MNIST 데이터베이스 (Modified National Institute of Standards and Technology database)는  
손으로 쓴 숫자들로 이루어진 대형 데이터베이스이며,   
다양한 화상 처리 시스템을 트레이닝하기 위해 일반적으로 사용된다.   
이 데이터베이스는 또한 기계 학습 분야의 트레이닝 및 테스트에 널리 사용된다.  
>  
> https://ko.wikipedia.org/wiki/MNIST_데이터베이스


> `Dataset` 은 샘플과 정답(label)을 저장하기 위한 목적으로 사용됩니다.  
https://tutorials.pytorch.kr/beginner/basics/data_tutorial.html

In [None]:
from torchvision import datasets
from torchvision.transforms import ToTensor
import numpy as np
import matplotlib.pyplot as plt

# local에 데이터가 없으면 다운로드 받음
train_data = datasets.MNIST(
    root = 'data',
    train = True,                         
    transform = ToTensor(), 
    download = True,            
)
test_data = datasets.MNIST(
    root = 'data', 
    train = False, 
    transform = ToTensor()
)


In [None]:
print(train_data)
print("----")
print(train_data[0])

In [None]:
import random
idx = random.randrange(0, len(test_data)-1)

img, label = train_data[idx]
mt.plot_train_data(img, label)

# Plot multiple train_data

In [None]:
mt.plot_multiple_train_data(train_data=train_data)

# Dataloader

> DataLoader 는 Dataset 을 샘플에 쉽게 접근할 수 있도록 순회 가능한 객체(iterable)로 감쌉니다.  
Dataset 은 데이터셋의 특징(feature)을 가져오고  
하나의 샘플에 정답(label)을 지정하는 일을 한 번에 합니다.  
모델을 학습할 때, 일반적으로 샘플들을 “미니배치(minibatch)”로 전달하고,  
매 에폭(epoch)마다 데이터를 다시 섞어서 과적합(overfit)을 막고,  
Python의 multiprocessing 을 사용하여 데이터 검색 속도를 높이려고 합니다.
>
> DataLoader 는 간단한 API로 이러한 복잡한 과정들을 추상화한 순회 가능한 객체(iterable)입니다  
https://tutorials.pytorch.kr/beginner/basics/data_tutorial.html

In [None]:
from torch.utils.data import DataLoader
loaders = {
    'train' : torch.utils.data.DataLoader(train_data, 
                                          batch_size=100, 
                                          shuffle=True),
    
    'test'  : torch.utils.data.DataLoader(test_data, 
                                          batch_size=100, 
                                          shuffle=True, ),
}
loaders

# Defile Model

In [None]:
import torch.nn as nn
import torch.nn.functional as F
class NN(nn.Module):
    
    def __init__(self, in_features, out_features):
        super(NN, self).__init__()
        
        self.nn = nn.Sequential(
            nn.Linear(in_features, 28*30),      # layer1 - input
            nn.ReLU(),         
            
            nn.Linear(28*30, 30*40),            # layer2 - hidden
            nn.ReLU(),
            
            nn.Linear(30*40, 30*40),            # layer3 - hidden
            nn.ReLU(),
            
            nn.Linear(40*30, 30*28),            # layer4 - hidden
            nn.ReLU(),

            nn.Linear(30*28, out_features),     # layer5 - out
        )
        
    def forward(self, x):

        out = self.nn(x) 
        out = F.softmax(out, dim=1)
        
        return out

In [None]:
model = NN(28*28, 10)
model.to(device)
print(model)

In [None]:
#!pip install torchsummary
from torchsummary import summary
summary(model, input_size=(28*28,))

# model test (before training)

In [None]:
mt.show_sample_predict_nn(model, device, test_data)

# Train model

In [None]:
from torch.autograd import Variable
import time

def train(model, loaders, num_epochs, loss_func, optimizer, train_loss_list:list, test_loss_list:list):

    total_step = len(loaders['train'])      # num of batch
    
    for epoch in range(num_epochs):
        loss_dict = {
            'train': 0.,
            'test': 0.
        }        
        start_time = time.time()
        
        for phase in ['train', 'test']:
            
            if phase == 'train':
                model.train()
            else:
                model.eval()
        
            # batch size단위로 학습하기
            for i, (images, labels) in enumerate(loaders[phase]):

                images = images.to(device, dtype=torch.float32)
                labels = labels.to(device)

                # iteration
                loss = 0    # iteration loss
                for idx in range(len(images)):
                    image = images[idx]
                    image = image.view(1, -1)
                    output = model(image)
                    loss =+ loss_func(output, labels.unsqueeze(1)[idx])

                loss_dict[phase] += loss.item()
                
                if phase == 'train':
                    optimizer.zero_grad()     # clear gradients for this training step               
                    loss.backward()           # backpropagation, compute gradients         
                    optimizer.step()          # apply gradients                 

                    # batch 100번 마다 로그 찍기
                    if (i+1) % 100 == 0:
                        print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.6f}' 
                               .format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
        
                
        train_loss = loss_dict['train'] / len(loaders['train']) 
        test_loss = loss_dict['test'] / len(loaders['test']) 

        # test_loss가 가장 작을때의 weight 를 저장
        if epoch == 0:
            min_loss = test_loss
            best_model = model.state_dict()
        elif test_loss < min_loss:
            min_loss = test_loss
            best_model = model.state_dict()
                        
        train_loss_list.append(train_loss)
        test_loss_list.append(test_loss)
        duration = time.time() - start_time
        print(f"Epoch [{epoch+1}/{num_epochs}] summary, train_loss:{train_loss:8.8f}, "\
              f"test_loss:{test_loss:8.8f} duration: {duration:.1f}s")

    # test_loss가 가장 작을때의 weight 로 복원
    model.load_state_dict(best_model)
        
    return model, train_loss_list, test_loss_list
        

In [None]:
model = NN(28*28, 10)
model = model.to(device)
loss_func = nn.CrossEntropyLoss()   
optimizer = optim.Adam(model.parameters(), lr = 0.0001)   

train_loss_list = []
test_loss_list = []

In [None]:
model, train_loss_list, test_loss_list = train(model, loaders, 10, loss_func, optimizer, train_loss_list, test_loss_list)

In [None]:
mt.draw_loss(train_loss_list, test_loss_list)

# Evaluate

In [None]:
from tqdm import tqdm
def evaluate():
    # Test the model
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        
        for images, labels in tqdm(loaders['test']):
            images = images.to(device, dtype=torch.float32)
            labels = labels.to(device)
            for idx in range(len(images)):
                image = images[idx].view(1,-1)
                output = model(image)  
                pred = torch.max(output, 1)[1].item()
                total += 1
                if pred == labels[idx].item():
                    correct+=1
                
        print(f'Test Accuracy of the model on the 10000 test images: {correct/total*100:.2f}%')


In [None]:
evaluate()

# Evaluation data sampling

In [None]:
figure = plt.figure(figsize=(10, 8))
cols, rows = 5, 5
for i in range(1, cols * rows + 1):
    model.eval()
    sample_idx = np.random.randint(len(test_data), size=(1,)).item()
    img, gt = test_data[sample_idx]
    img = img.to(device)

    predicted = model(img.view(1,-1))
    label = torch.argmax(predicted)

    figure.add_subplot(rows, cols, i)
    plt.title(f"{label} (GT:{gt} / {gt==label})")
    plt.axis("off")
    plt.imshow(img.view(28,28).cpu(), cmap="gray")

plt.tight_layout()
plt.show()

# Test

In [None]:
mt.show_sample_predict_nn(model, device, test_data)