In [None]:
from google.colab import drive
drive.mount('/content1/')

In [None]:
import torch
import torchvision
from torchvision import utils
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import DataLoader, Dataset

In [None]:
class MyDataset(Dataset):
    def __init__(self, x_data, y_data, transform=None):
        self.x_data = x_data
        self.y_data = y_data
        self.transform = transform
        self.len = len(y_data)

    def __getitem__(self, index):
        sample = self.x_data[index], self.y_data[index]

        if self.transform:
            sample = self.transform(sample)
        
        return sample

    def __len__(self):
        return self.len

class ToTensor:
    def __call__(self, sample):
        inputs, labels = sample
        inputs = torch.FloatTensor(inputs)
        inputs = inputs.permute(2,0,1)
        return inputs, torch.LongTensor(labels)

class Resize:
    def __init__(self, output_size):
        assert isinstance(output_size, (int,tuple))
        self.ouput_size = output_size

    def __call__(self, sample):
        inputs, labels = sample

        h, w = inputs.shape[:2]
        if isinstance(self.output_size, int):
            if h > w :
                new_h, new_w = self.output_size*h/w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size*w/h
        else:
            new_h, new_w = self.output_size

        new_h, new_w = int(new_h), int(new_w)

        new_inputs = tf.resize(inputs, (new_h, new_w))
        
        return new_inputs, labels

In [None]:
import torchvision.transforms as tf
from torchvision import datasets


my_transform = tf.Compose([tf.Resize((32,32)),tf.ToTensor()])
path2data = '/content1/MyDrive/MNistData'

train_data = datasets.MNIST(root = path2data,
                            train = True,
                            download = True,
                            transform = my_transform)
val_data = datasets.MNIST(root = path2data,
                          train = False,
                          download = True,
                          transform = my_transform)

train_dl = DataLoader(train_data, 
                      batch_size=32, 
                      shuffle = True)
val_dl = DataLoader(val_data, 
                    batch_size=32)

In [None]:
from ipywidgets import interact
@interact(idx=(0,train_data.data.shape[0]))
def showImage(idx):
    plt.imshow(train_data.data[idx].numpy(), cmap="gray")
    plt.title("Label : {}".format(train_data.targets[idx]))
    plt.grid(False)
    plt.show()

In [None]:
%matplotlib inline

# training data를 추출합니다.
x_train, y_train = train_data.data, train_data.targets

# val data를 추출합니다.
x_val, y_val = val_data.data, val_data.targets

# 차원을 추가하여 B*C*H*W 가 되도록 합니다.
if len(x_train.shape) == 3:
    x_train = x_train.unsqueeze(1)

if len(x_val.shape) == 3:
    x_val = x_val.unsqueeze(1)

# tensor를 image로 변경하는 함수를 정의합니다.
def show(img):
    # tensor를 numpy array로 변경합니다.
    npimg = img.numpy()
    # C*H*W를 H*W*C로 변경합니다.
    npimg_tr = npimg.transpose((1,2,0))
    plt.imshow(npimg_tr, interpolation='nearest')

# images grid를 생성하고 출력합니다.
# 총 40개 이미지, 행당 8개 이미지를 출력합니다.
x_grid = utils.make_grid(x_train[:40], nrow=8, padding=2)

show(x_grid)

In [None]:
from torch import nn
import torch.nn.functional as F

class LeNet_5(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1,6, kernel_size=5, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(6)
        self.conv2 = nn.Conv2d(6,16, kernel_size=5, stride=1, padding=0)
        self.bn2 = nn.BatchNorm2d(16)
        self.conv3 = nn.Conv2d(16, 120, kernel_size=5, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(120)
        self.fc1 = nn.Linear(120, 84)
        self.fc2 = nn.Linear(84,10)
        self.dropout = nn.Dropout(p=0.3)
        
        nn.init.xavier_normal_(self.conv1.weight)
        nn.init.xavier_normal_(self.conv2.weight)
        nn.init.xavier_normal_(self.conv3.weight)
        nn.init.xavier_normal_(self.fc1.weight)
        nn.init.xavier_normal_(self.fc2.weight)


    # dropout의 경우 model.train()시 사용되지만, model.eval()시 사용되지 않는다.
    def forward(self, x):
        x = F.tanh(self.bn1(self.conv1(x)))
        x = self.dropout(x)
        x = F.avg_pool2d(x, 2, 2)
        x = F.tanh(self.bn2(self.conv2(x)))
        x = self.dropout(x)
        x = F.avg_pool2d(x, 2, 2)
        x = F.tanh(self.bn3(self.conv3(x)))
        x = self.dropout(x)
        x = x.view(-1, 120)
        x = F.tanh(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return F.softmax(x, dim=1)

model = LeNet_5()
print(model)

In [None]:
# 모델을 CUDA device로 전달한다. 
device = torch.device("cuda")
model.to(device)
print(next(model.parameters()).device)

In [None]:
from torchsummary import summary
summary(model, input_size=(1,32,32))

In [None]:
# reduction의 경우 값들의 연산을 나타낸다. 
# mean, sum, 등등 가능하다. 
loss_func = nn.CrossEntropyLoss(reduction='sum')

In [None]:
from torch import optim

optimizer = optim.Adam(model.parameters(), lr=0.001)
# optimizer의 param_groups는 parameter들을 dictionary 형태로 가지고 있다. 
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

# learning rate이 Cosine함수를 따라 변한다. -> learning rate이 최대에서 최소까지 계속 반복
# T_max :  최대 iter횟수
# eta_min : learning rate의 최솟값
#from torch.optim.lr_scheduler import CosineAnnealingLR
#lr_scheduler = CosineAnnealingLR(optimizer, T_max=2, eta_min=1e-05)

In [None]:
import copy
from tqdm import tqdm
import time

def metrics_batch(output, target):
    pred = output.argmax(dim=1, keepdim=True)
    corrects = pred.eq(target.view_as(pred)).sum().item()
    return corrects

def loss_batch(loss_func, output, target, opt=None):
    loss = loss_func(output, target)
    metric_b = metrics_batch(output, target)
    if opt is not None:
        opt.zero_grad()
        loss.backward()
        opt.step()
    return loss.item(), metric_b

def loss_epoch(model, loss_func, dataset_dl, sanity_check=False, opt=None):
    running_loss = 0.0
    running_metric = 0.0
    len_data = len(dataset_dl.dataset)

    for xb, yb in dataset_dl:
        xb = xb.type(torch.float).to(device)
        yb = yb.to(device)
        output = model(xb)
        loss_b, metric_b = loss_batch(loss_func, output, yb, opt)
        running_loss += loss_b

        if metric_b is not None:
            running_metric += metric_b
        
        if sanity_check is True:
            break
    
    loss = running_loss / float(len_data)
    metric = running_metric / float(len_data)
    return loss, metric

def train_val(model, params):
    num_epochs = params['num_epochs']
    loss_func = params['loss_func']
    opt = params['optimizer']
    train_dl = params['train_dl']
    val_dl = params['val_dl']
    sanity_check = params['sanity_check']
    path2weights = params['path2weights']

    loss_history = {
        'train' : [],
        'val' : []
    }

    metric_history = {
        'train' : [],
        'val' : []
    }

    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')

    for epoch in tqdm(range(num_epochs)):
        current_lr = get_lr(opt)
        print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs-1, current_lr))
        model.train()
        train_loss, train_metric = loss_epoch(model, loss_func, train_dl, sanity_check, opt)

        loss_history['train'].append(train_loss)
        metric_history['train'].append(train_metric)

        model.eval()
        with torch.no_grad():
            val_loss, val_metric = loss_epoch(model, loss_func, val_dl, sanity_check)
            loss_history['val'].append(val_loss)
            metric_history['val'].append(val_metric)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), path2weights)
            print('Copied best model weights')

        #lr_scheduler.step()

        print('train loss: %.6f, dev loss: %.6f, accuracy: %.2f' %(train_loss, val_loss, 100*val_metric))
        print('-'*10)
        

    # best model을 반환합니다.
    model.load_state_dict(best_model_wts)
    return model, loss_history, metric_history

In [None]:
params_train={'num_epochs':30, 'loss_func':loss_func, 'optimizer':optimizer, 'train_dl':train_dl, 'val_dl': val_dl, 'sanity_check':False, 'path2weights':'/content1/MyDrive/MNistData/best_model_dropout_xavierInitialization.pt'}
model, loss_hist, metric_hist = train_val(model, params_train)

In [None]:
num_epochs = params_train["num_epochs"]

plt.title("Train-Val Loss")
plt.plot(range(1, num_epochs+1), loss_hist["train"], label="train")
plt.plot(range(1, num_epochs+1), loss_hist["val"], label="val")
plt.ylabel("Label")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()

In [None]:
plt.title("Train-Val Accuracy")
plt.plot(range(1,num_epochs+1),metric_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),metric_hist["val"],label="val")
plt.ylabel("Accuracy")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()