# Deep Session 4차시 과제

# LeNet 구현해보기

<img src='http://drive.google.com/uc?export=view&id=1MevERvWOuYttJyTaFbGDJggB3luPD0TP' /><br>

LeNet-5는 32x32 크기의 흑백 이미지에서 학습된 7 layer Convolutional Neural Network  

[Conv(C1) - Subsampling(S2) - Conv(C3) - Subsampling(S4) - Conv(C5) - FC - FC]  

**Input**  
입력 이미지는 32x32  
**Layer C1**  
5x5 크기의 kernel 6개와 stride=1, convolutional layer  
입력 크기는 32x32x1 이고, 출력 크기는 28x28x6  
**Layer S2**
2x2 크기의 kernel 6개와 stride=2, subsampling layer  
입력 크기는 28x28x6 이고, 출력 크기는 14x14x6  
**Layer C3**  
5x5 크기의 kernel 16개와 stride=1, convolution layer  
입력 크기는 14x14x6 이고, 출력 크기는 10x10x16  
**Layer S4**  
2x2 크기의 kernel 16개와 stride=2, subsampling layer  
입력 크기는 10x10x16 이고, 출력 크기는 5x5x16  
**Layer C5**  
5x5 크기의 kernel 120개와 stride=1, convolutional layer  
입력 크기는 5x5x16 이고, 출력 크기는 1x1x120  
**Layer F6**  
tanh 함수를 활성화 함수로 이용하는 fully-connected layer  
입력 유닛은 120개 이고, 출력 유닛은 84개  
**Layer F7**  
RBF(Euclidean Radia Basis Function unit)를 활성화 함수로 이용하는 output layer  
입력 크기는 84 이고, 출력 크기는 10  
**Loss function**  
Loss function은 MSE(평균 제곱 오차)


In [None]:
# 필요한 라이브러리 import

import numpy as np
from datetime import datetime

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

from torchvision import datasets, transforms

import matplotlib.pyplot as plt

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
# parameters 설정

RANDOM_SEED = 42
LEARNING_RATE = 0.1
BATCH_SIZE = 32
N_EPOCHS = 10

In [None]:
#  정확도를 구하는 function과 손실을 시각화 하는 function 정의

def get_accuracy(model, data_loader, device):
    '''
    전체 data_loader에 대한 예측의 정확도를 계산하는 함수
    '''
    
    correct_pred = 0 
    n = 0
    
    with torch.no_grad():
        model.eval()
        for X, y_true in data_loader:

            X = X.to(device)
            y_true = y_true.to(device)

            _, y_prob = model(X)
            _, predicted_labels = torch.max(y_prob, 1)

            n += y_true.size(0)
            correct_pred += (predicted_labels == y_true).sum()

    return correct_pred.float() / n

def plot_losses(train_losses, valid_losses):
    '''
    training과 validation loss를 시각화하는 함수
    '''
    
    # plot style을 seaborn으로 설정
    plt.style.use('seaborn')

    train_losses = np.array(train_losses) 
    valid_losses = np.array(valid_losses)

    fig, ax = plt.subplots(figsize = (8, 4.5))

    ax.plot(train_losses, color='blue', label='Training loss') 
    ax.plot(valid_losses, color='red', label='Validation loss')
    ax.set(title="Loss over epochs", 
            xlabel='Epoch',
            ylabel='Loss') 
    ax.legend()
    fig.show()
    
    # plot style을 기본값으로 설정
    plt.style.use('default')

In [None]:
# training data에 사용되는 helper 함수 정의하기

def train(train_loader, model, criterion, optimizer, device):
    '''
    training loop의 training 단계에 대한 함수
    '''

    model.train()
    running_loss = 0
    
    for X, y_true in train_loader:

        optimizer.zero_grad()
        
        X = X.to(device)
        y_true = y_true.to(device)
    
        # 순전파
        y_hat, _ = model(X) 
        loss = criterion(y_hat, y_true) 
        running_loss += loss.item() * X.size(0)

        # 역전파
        loss.backward()
        optimizer.step()
        
    epoch_loss = running_loss / len(train_loader.dataset)
    return model, optimizer, epoch_loss

In [None]:
# validation data에 사용되는 함수 정의

def validate(valid_loader, model, criterion, device):
    '''
    training loop의 validation 단계에 대한 함수
    '''
   
    model.eval()
    running_loss = 0
    
    for X, y_true in valid_loader:
    
        X = X.to(device)
        y_true = y_true.to(device)

        # 순전파와 손실 기록하기
        y_hat, _ = model(X) 
        loss = criterion(y_hat, y_true) 
        running_loss += loss.item() * X.size(0)

    epoch_loss = running_loss / len(valid_loader.dataset)
        
    return model, epoch_loss

In [None]:
# training loop 정의

def training_loop(model, criterion, optimizer, train_loader, valid_loader, epochs, device, print_every=1):
    '''
    전체 training loop를 정의하는 함수
    '''
    
    # metrics를 저장하기 위한 객체 설정
    best_loss = 1e10
    train_losses = []
    valid_losses = []
 
    # model 학습하기
    for epoch in range(0, epochs):

        # training
        model, optimizer, train_loss = train(train_loader, model, criterion, optimizer, device)
        train_losses.append(train_loss)

        # validation
        with torch.no_grad():
            model, valid_loss = validate(valid_loader, model, criterion, device)
            valid_losses.append(valid_loss)

        if epoch % print_every == (print_every - 1):
            
            train_acc = get_accuracy(model, train_loader, device=device)
            valid_acc = get_accuracy(model, valid_loader, device=device)
                
            print(f'{datetime.now().time().replace(microsecond=0)} --- '
                  f'Epoch: {epoch}\t'
                  f'Train loss: {train_loss:.4f}\t'
                  f'Valid loss: {valid_loss:.4f}\t'
                  f'Train accuracy: {100 * train_acc:.2f}\t'
                  f'Valid accuracy: {100 * valid_acc:.2f}')

    plot_losses(train_losses, valid_losses)
    
    return model, optimizer, (train_losses, valid_losses)

In [None]:
# data 준비하기

# transforms 정의하기
transforms = transforms.Compose([transforms.Resize((32, 32)),
                                 transforms.ToTensor()])

# data set 다운받고 생성하기
train_dataset = datasets.MNIST(root='mnist_data', 
                               train=True, 
                               transform=transforms,
                               download=True)

valid_dataset = datasets.MNIST(root='mnist_data', 
                               train=False, 
                               transform=transforms)

# data loader 정의하기
train_loader = DataLoader(dataset=train_dataset, 
                          batch_size=BATCH_SIZE, 
                          shuffle=True)

valid_loader = DataLoader(dataset=valid_dataset, 
                          batch_size=BATCH_SIZE, 
                          shuffle=False)

In [None]:
# 데이터 확인 1

for (X_train, y_train) in train_loader:
    print('X_train:', X_train.size(), 'type:', X_train.type())
    print('y_train:', y_train.size(), 'type:', y_train.type())
    break

# <span style="color:purple">이부분 채우기</span>

In [None]:
# LeNet-5 구조 정의 
# nn.Conv2d, nn.AvgPool2d, nn.Linear를 이용해서 입력 이미지가 각 층을 통과하는 모양으로 만들기

class LeNet5(nn.Module):

    def __init__(self):
        super(LeNet5, self).__init__()
        
        # 이 부분 채우기


    def forward(self, x):
        # 이 부분 채우기
        x = 
        
        probs = F.softmax(x, dim=1)
        return x, probs

In [None]:
# LeNet-5 구조 정의 
# nn.Sequential을 이용해서 ()괄호 안에 층들을 쌓아서 만들기

class LeNet5_Sequential(nn.Module):

    def __init__(self):
        super(LeNet5_Sequential, self).__init__()
        
        self.feature_extractor = nn.Sequential( 
            
            # 이 부분 채우기
        )

        self.classifier = nn.Sequential(
            
            # 이 부분 채우기
        )


    def forward(self, x):
        # 이 부분 채우기
        x = 
        
        probs = F.softmax(x, dim=1)
        return x, probs

In [None]:
# model, optimizer, loss function 설정하기

torch.manual_seed(RANDOM_SEED)

#model = LeNet5().to(DEVICE)
model = LeNet5_Sequential().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

In [None]:
# 신경망 학습

model, optimizer, _ = training_loop(model, criterion, optimizer, train_loader, 
                                    valid_loader, N_EPOCHS, DEVICE)

### 추가적으로 seed, learning_rate, batch_size, epoch 값 등 변경해보기