In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# 연산을 수행할 device 설정

In [2]:
if torch.cuda.is_available():
    device=torch.device('cuda:0')
else:
    device = torch.device('cpu')
print(device)

cpu


# Dataset 설정

In [3]:
data_path = 'data'
if not os.path.exists(data_path):
    os.makedirs(data_path)
    
transform = transforms.Compose([transforms.ToTensor(), # 이미지를 텐서로 변경하고
                                transforms.Normalize((0.1307,), # 이미지를 0.1307, 0.3081값으로 normalize
                                                     (0.3081,))
                               ])

trn_dset = datasets.MNIST(root=data_path, train=True, transform=transform, download=True)
tst_dset = datasets.MNIST(root=data_path, train=False, transform=transform, download=False)

# 모델 Class정의

In [4]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3,3), stride=(1,1), padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3,3), stride=(1,1), padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        self.fc1   = nn.Linear(7*7*128, 100, bias=True)
        self.fc2   = nn.Linear(100, 10, bias=True)
        self.apply(self._init_weights)
        
    def _init_weights(self, submodule):
        if isinstance(submodule, nn.Conv2d):
            nn.init.xavier_normal_(submodule.weight)
            if submodule.bias is not None:
                submodule.bias.data.fill_(0.01)
        if isinstance(submodule, nn.Linear): # submodule이 nn.Linear에서 생성된 객체(혹은 인스턴스이면)
            nn.init.kaiming_normal_(submodule.weight) #해당 submodule의 weight는 He Initialization으로 초기화
            if submodule.bias is not None:
                submodule.bias.data.fill_(0.01) # 해당 submodule의 bias는 0.01로 초기화
                
    def forward(self, x):
        # (n_data, n_channel, width, height)으로 연산 결과의 크기 표기
        x = self.conv1(x) # (batch,1,28,28) -> (batch,64,28,28)
        x = F.relu(x)     
        x = self.pool1(x) # (batch,64,28,28) -> (batch,64,14,14)
        x = self.conv2(x) # (batch,64,14,14) -> (batch,128,14,14)
        x = F.relu(x)
        x = self.pool2(x) # (batch,128,14,14) -> (batch,128,7,7)
        x = x.reshape(-1, 7*7*128)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        out = F.softmax(x, dim=1)
        return out

# 실제 모델 생성

In [5]:
model = CNN().to(device)

# Optimizer 생성

In [6]:
opt = optim.Adam(model.parameters(), lr = 2e-4)

# DataLoader정의 

In [7]:
batch_size = 256
trn_epochs = 50
trn_loader = DataLoader(dataset = trn_dset, batch_size = batch_size, shuffle=True, drop_last=True)
tst_loader = DataLoader(dataset = tst_dset, batch_size = batch_size, shuffle=False, drop_last=False)

In [8]:
trn_dset.data.shape

torch.Size([60000, 28, 28])

# 학습 및 Infernce

In [9]:
for epoch in range(trn_epochs):
    model.train()
    for batch_idx, (x_batch, y_batch) in enumerate(trn_loader):
        x_batch = x_batch.to(device) # 차원을 늘려줌. Tensor with (256,28,28) --> (1,256,28,28)
#         x_batch = x_batch.reshape(1,-1,28,28).to(device)
        y_batch = y_batch.to(device)
        opt.zero_grad()
        y_batch_prob = model(x_batch)
        loss = F.cross_entropy(y_batch_prob, y_batch)
        loss.backward()
        opt.step()
        if (batch_idx + 1) % 10 == 0 or batch_idx+1 == len(trn_loader):
            print(f'\r>> In epoch {epoch+1} [{batch_idx+1}/{len(trn_loader)}], Training Loss: {loss.item():.4f}', end='')
    
    if (epoch+1) % 5 == 0:
        model.eval()
        y_pred_list = []
        y_real_list = []
        tst_loss = 0
        with torch.no_grad():
            for batch_idx, (x_batch, y_batch) in enumerate(tst_loader):
                x_batch = x_batch.to(device)
                y_batch = y_batch.to(device)
                y_batch_prob = model(x_batch)
                y_batch_pred = torch.argmax(y_batch_prob, axis=1)
                loss = F.cross_entropy(y_batch_prob, y_batch, reduction='sum')
                tst_loss += loss
                
                y_pred_list.append(y_batch_pred.detach().numpy())
                y_real_list.append(y_batch.detach().numpy())
                
            y_real = np.concatenate([x for x in y_real_list], axis=0)
            y_pred = np.concatenate([x for x in y_pred_list], axis=0)
            tst_loss /= y_real.shape[0]
            correct = np.sum(y_real == y_pred)
            accuracy = 100*correct / len(tst_loader.dataset)
            
            print(f'\t >> In epoch {epoch+1}: avg test loss={tst_loss:.4f}, accuracy={accuracy:.2f}% [{correct}/{len(tst_loader.dataset)}]')
                
        
    

>> In epoch 1 [40/234], Training Loss: 1.7279

KeyboardInterrupt: 

# 연습문제

<span style = 'font-size:1.4em;line-height:1.5em'>1. 모델 class를 다음과 같이 변형하여 작성하고, 실제 모델을 만들어서 학습해봅시다.(학습 과정 자체는 오래 걸릴테니 정상적으로 돌아가는 것만 확인하고 멈추셔도 좋습니다.</span>
- <span style = 'font-size:1.2em;line-height:1.5em'>(1) 3번째 Convolution Layer를 추가해봅시다. 3*3size의 filter의 갯수는 256개가 되도록 하며, padding=(1,1), stride=(2,2)로 정의하세요. 이 때, Convolution Layer가 끝나면, fully connected의 결과도 바뀌기 때문에 얘도 바꿔주세요. </span>
- <span style = 'font-size:1.2em;line-height:1.5em'>(2) Convolution연산이 끝날때마다, batchnorm을 추가해주세요. (1)에서 생성한 세번째 conv layer에 대한 BN도 생성해주세요.</span>


#### (1) conv layer 추가 

In [10]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3,3), stride=(1,1), padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3,3), stride=(1,1), padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3,3), stride=(2,2), padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        
        self.fc1   = nn.Linear(2*2*256, 100, bias=True)
        self.fc2   = nn.Linear(100, 10, bias=True)
        self.apply(self._init_weights)
        
    def _init_weights(self, submodule):
        if isinstance(submodule, nn.Conv2d):
            nn.init.xavier_normal_(submodule.weight)
            if submodule.bias is not None:
                submodule.bias.data.fill_(0.01)
        if isinstance(submodule, nn.Linear): # submodule이 nn.Linear에서 생성된 객체(혹은 인스턴스이면)
            nn.init.kaiming_normal_(submodule.weight) #해당 submodule의 weight는 He Initialization으로 초기화
            if submodule.bias is not None:
                submodule.bias.data.fill_(0.01) # 해당 submodule의 bias는 0.01로 초기화
                
    def forward(self, x):
        # (n_data, n_channel, width, height)으로 연산 결과의 크기 표기
        x = self.conv1(x) # (batch,1,28,28) -> (batch,64,28,28)
        x = F.relu(x)     
        x = self.pool1(x) # (batch,64,28,28) -> (batch,64,14,14)
        
        x = self.conv2(x) # (batch,64,14,14) -> (batch,128,14,14)
        x = F.relu(x)
        x = self.pool2(x) # (batch,128,14,14) -> (batch,128,7,7)
        
        x = self.conv3(x) # (batch,128,7,7) -> (batch,256,4,4)
        x = F.relu(x)
        x = self.pool3(x) # (batch,256,4,4) -> (batch,256,2,2)
        
        x = x.reshape(-1, 2*2*256)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        out = F.softmax(x, dim=1)
        return out

In [11]:
model = CNN()

In [12]:
opt = optim.Adam(model.parameters(), lr = 2e-4)

In [13]:
for batch_idx, (x_batch, y_batch) in enumerate(trn_loader):
    x_batch = x_batch.to(device) 
    print(f'input shape: {x_batch.shape}')
    pred = model(x_batch)
    print(f'output shape: {pred.shape}')
    break

input shape: torch.Size([256, 1, 28, 28])
output shape: torch.Size([256, 10])


In [14]:
for epoch in range(trn_epochs):
    model.train()
    for batch_idx, (x_batch, y_batch) in enumerate(trn_loader):
        x_batch = x_batch.to(device) # 차원을 늘려줌. Tensor with (256,28,28) --> (1,256,28,28)
#         x_batch = x_batch.reshape(1,-1,28,28).to(device)
        y_batch = y_batch.to(device)
        opt.zero_grad()
        y_batch_prob = model(x_batch)
        loss = F.cross_entropy(y_batch_prob, y_batch)
        loss.backward()
        opt.step()
        if (batch_idx + 1) % 10 == 0 or batch_idx+1 == len(trn_loader):
            print(f'\r>> In epoch {epoch+1} [{batch_idx+1}/{len(trn_loader)}], Training Loss: {loss.item():.4f}', end='')
    
    if (epoch+1) % 5 == 0:
        model.eval()
        y_pred_list = []
        y_real_list = []
        tst_loss = 0
        with torch.no_grad():
            for batch_idx, (x_batch, y_batch) in enumerate(tst_loader):
                x_batch = x_batch.to(device)
                y_batch = y_batch.to(device)
                y_batch_prob = model(x_batch)
                y_batch_pred = torch.argmax(y_batch_prob, axis=1)
                loss = F.cross_entropy(y_batch_prob, y_batch, reduction='sum')
                tst_loss += loss
                
                y_pred_list.append(y_batch_pred.detach().numpy())
                y_real_list.append(y_batch.detach().numpy())
                
            y_real = np.concatenate([x for x in y_real_list], axis=0)
            y_pred = np.concatenate([x for x in y_pred_list], axis=0)
            tst_loss /= y_real.shape[0]
            correct = np.sum(y_real == y_pred)
            accuracy = 100*correct / len(tst_loader.dataset)
            
            print(f'\t >> In epoch {epoch+1}: avg test loss={tst_loss:.4f}, accuracy={accuracy:.2f}% [{correct}/{len(tst_loader.dataset)}]')

>> In epoch 1 [20/234], Training Loss: 2.0882

KeyboardInterrupt: 

#### (2) batchnormalization layer 추가 

In [15]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3,3), stride=(1,1), padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3,3), stride=(1,1), padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3,3), stride=(2,2), padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        
        self.fc1   = nn.Linear(2*2*256, 100, bias=True)
        self.fc2   = nn.Linear(100, 10, bias=True)
        
        self.normal_1 = nn.BatchNorm2d(64)
        self.normal_2 = nn.BatchNorm2d(128)
        self.normal_3 = nn.BatchNorm2d(256)
        
        self.apply(self._init_weights)
        
    def _init_weights(self, submodule):
        if isinstance(submodule, nn.Conv2d):
            nn.init.xavier_normal_(submodule.weight)
            if submodule.bias is not None:
                submodule.bias.data.fill_(0.01)
        if isinstance(submodule, nn.Linear): # submodule이 nn.Linear에서 생성된 객체(혹은 인스턴스이면)
            nn.init.kaiming_normal_(submodule.weight) #해당 submodule의 weight는 He Initialization으로 초기화
            if submodule.bias is not None:
                submodule.bias.data.fill_(0.01) # 해당 submodule의 bias는 0.01로 초기화
                
    def forward(self, x):
        # (n_data, n_channel, width, height)으로 연산 결과의 크기 표기
        x = self.conv1(x) # (batch,1,28,28) -> (batch,64,28,28)
        x = self.normal_1(x) # channel = 64
        x = F.relu(x)     
        x = self.pool1(x) # (batch,64,28,28) -> (batch,64,14,14)
        x = self.conv2(x) # (batch,64,14,14) -> (batch,128,14,14)
        x = self.normal_2(x) # channel = 128
        x = F.relu(x)
        x = self.pool2(x) # (batch,128,14,14) -> (batch,128,7,7)
        x = self.conv3(x) # (batch,128,7,7) -> (batch,256,4,4)
        x = self.normal_3(x) # channel = 256
        x = F.relu(x)
        x = self.pool3(x) # (batch,256,4,4) -> (batch,256,2,2)
        x = x.reshape(-1, 2*2*256)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        out = F.softmax(x, dim=1)
        return out

In [16]:
model = CNN()

In [17]:
opt = optim.Adam(model.parameters(), lr = 2e-4)

In [18]:
for batch_idx, (x_batch, y_batch) in enumerate(trn_loader):
    x_batch = x_batch.to(device) 
    print(f'input shape: {x_batch.shape}')
    pred = model(x_batch)
    print(f'output shape: {pred.shape}')
    break

input shape: torch.Size([256, 1, 28, 28])
output shape: torch.Size([256, 10])


In [19]:
for epoch in range(trn_epochs):
    model.train()
    for batch_idx, (x_batch, y_batch) in enumerate(trn_loader):
        x_batch = x_batch.to(device) # 차원을 늘려줌. Tensor with (256,28,28) --> (1,256,28,28)
#         x_batch = x_batch.reshape(1,-1,28,28).to(device)
        y_batch = y_batch.to(device)
        opt.zero_grad()
        y_batch_prob = model(x_batch)
        loss = F.cross_entropy(y_batch_prob, y_batch)
        loss.backward()
        opt.step()
        if (batch_idx + 1) % 10 == 0 or batch_idx+1 == len(trn_loader):
            print(f'\r>> In epoch {epoch+1} [{batch_idx+1}/{len(trn_loader)}], Training Loss: {loss.item():.4f}', end='')
    
    if (epoch+1) % 5 == 0:
        model.eval()
        y_pred_list = []
        y_real_list = []
        tst_loss = 0
        with torch.no_grad():
            for batch_idx, (x_batch, y_batch) in enumerate(tst_loader):
                x_batch = x_batch.to(device)
                y_batch = y_batch.to(device)
                y_batch_prob = model(x_batch)
                y_batch_pred = torch.argmax(y_batch_prob, axis=1)
                loss = F.cross_entropy(y_batch_prob, y_batch, reduction='sum')
                tst_loss += loss
                
                y_pred_list.append(y_batch_pred.detach().numpy())
                y_real_list.append(y_batch.detach().numpy())
                
            y_real = np.concatenate([x for x in y_real_list], axis=0)
            y_pred = np.concatenate([x for x in y_pred_list], axis=0)
            tst_loss /= y_real.shape[0]
            correct = np.sum(y_real == y_pred)
            accuracy = 100*correct / len(tst_loader.dataset)
            
            print(f'\t >> In epoch {epoch+1}: avg test loss={tst_loss:.4f}, accuracy={accuracy:.2f}% [{correct}/{len(tst_loader.dataset)}]')

>> In epoch 1 [20/234], Training Loss: 1.8519

KeyboardInterrupt: 