In [7]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn, optim
from torch.nn import functional as F

%matplotlib inline
%config InlineBackend.figure_format='retina'

print(f"PyTorch version:{torch.__version__}")
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"device:{device}.")

PyTorch version:1.10.0+cu111
device:cuda:0.


In [8]:
# Dataset
from torchvision import datasets, transforms
mnist_train = datasets.MNIST(root='./data/', train=True, transform=transforms.ToTensor(), download=True)
mnist_test = datasets.MNIST(root='./data/', train=False, transform=transforms.ToTensor(), download=True)

print("mnist_train : \n", mnist_train, '\n')
print("mnist_train : \n", mnist_test, '\n')

mnist_train : 
 Dataset MNIST
    Number of datapoints: 60000
    Root location: ./data/
    Split: Train
    StandardTransform
Transform: ToTensor() 

mnist_train : 
 Dataset MNIST
    Number of datapoints: 10000
    Root location: ./data/
    Split: Test
    StandardTransform
Transform: ToTensor() 



In [9]:
# Data Iterator

BATCH_SIZE = 256
train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=BATCH_SIZE, shuffle=True, num_workers=1)
test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=BATCH_SIZE, shuffle=True, num_workers=1)

In [12]:
# Define Model
class ConvolutionalNeuralNetworkClass(nn.Module):
    def __init__(self, name='cnn', xdim=[1,28,28],
                 ksize=3, cdims=[32,64], hdims=[1024,128], ydim=10,
                 USE_BATCHNORM=False):
        super(ConvolutionalNeuralNetworkClass, self).__init__()
        self.name = name
        self.xdim = xdim
        self.ksize = ksize
        self.cdims = cdims
        self.hdims = hdims
        self.ydim = ydim
        self.USE_BATCHNORM = USE_BATCHNORM

        #conv layer
        self.layers = []
        prev_cdim = self.xdim[0]
        for cdim in self.cdims:
            self.layers.append(
                nn.Conv2d(in_channels=prev_cdim,
                         out_channels=cdim,
                         kernel_size=self.ksize,
                         stride=(1,1),
                         padding=self.ksize//2)
            )

            if self.USE_BATCHNORM :
                self.layers.append(nn.BatchNorm2d(cdim))
            self.layers.append(nn.ReLU(True))
            self.layers.append(nn.MaxPool2d(kernel_size=(2,2)))
            self.layers.append(nn.Dropout2d(p=0.5))
            prev_cdim = cdim

        # dense layer
        self.layers.append(nn.Flatten())
        prev_hdim = prev_cdim*(self.xdim[1]//(2**len(self.cdims))) * (self.xdim[2]//(2**len(self.cdims)))
        for hdim in self.hdims :
            self.layers.append(nn.Linear(prev_hdim, hdim, bias=True))
            self.layers.append(nn.ReLU(True))
            prev_hdim = hdim
        
        # final layer (without activation)
        self.layers.append(nn.Linear(prev_hdim, self.ydim, bias=True))

        # concat all layers
        self.net = nn.Sequential()
        for l_idx, layer in enumerate(self.layers):
            layer_name = f"{type(layer).__name__.lower()}_{l_idx}"
            self.net.add_module(layer_name, layer)
        
        # initialize parameter
        self.init_param()

    def init_param(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight)
                nn.init.zeros_(m.bias)
    
    def forward(self, x):
        return self.net(x)
    
C = ConvolutionalNeuralNetworkClass(
    name='cnn', xdim=[1,28,28], ksize=3, cdims=[32,64],
    hdims=[256,128], ydim=10
).to(device)

loss = nn.CrossEntropyLoss()
optm = optim.Adam(C.parameters(), lr=1e-3)
print("Done")

Done


In [17]:
# check parameters
np.set_printoptions(precision=3)
n_param=0
for p_idx, (param_name, param) in enumerate(C.named_parameters()):
    if param.requires_grad:
        param_numpy = param.detach().cpu().numpy()
        n_param += len(param_numpy.reshape(-1))
        print(f"[{p_idx}] name : {param_name} shape : {param_numpy.shape}")
        print(f"   val : {param_numpy.reshape(-1)[:5]}")
    
print(f"Total number of parameters : {format(n_param, ',d')}")
# layer 0 -> 4로 건너뛰는 이유?
# Relu 1, Maxpool 1, Dropout 1 3개층을 거치기 때문에

[0] name : net.conv2d_0.weight shape : (32, 1, 3, 3)
   val : [0.237 0.511 0.656 0.693 0.125]
[1] name : net.conv2d_0.bias shape : (32,)
   val : [0. 0. 0. 0. 0.]
[2] name : net.conv2d_4.weight shape : (64, 32, 3, 3)
   val : [-0.127 -0.052  0.056  0.049  0.155]
[3] name : net.conv2d_4.bias shape : (64,)
   val : [0. 0. 0. 0. 0.]
[4] name : net.linear_9.weight shape : (256, 3136)
   val : [-0.02   0.035 -0.028 -0.026  0.027]
[5] name : net.linear_9.bias shape : (256,)
   val : [0. 0. 0. 0. 0.]
[6] name : net.linear_11.weight shape : (128, 256)
   val : [-0.222 -0.004  0.079  0.14  -0.054]
[7] name : net.linear_11.bias shape : (128,)
   val : [0. 0. 0. 0. 0.]
[8] name : net.linear_13.weight shape : (10, 128)
   val : [-0.158 -0.052  0.01  -0.045  0.029]
[9] name : net.linear_13.bias shape : (10,)
   val : [0. 0. 0. 0. 0.]
Total number of parameters : 856,074


In [20]:
# forward path
torch.set_printoptions(precision=3)
x_numpy = np.random.rand(2,1,28,28)
x_torch = torch.from_numpy(x_numpy).float().to(device)
y_torch = C.forward(x_torch)
y_numpy = y_torch.detach().cpu().numpy()

print("x_torch :\n", x_torch)
print("y_torch :\n", y_torch)
print()
print(f"x_numpy{x_numpy.shape} : \n", x_numpy)
print(f"y_numpy{y_numpy.shape} : \n", y_numpy)

x_torch :
 tensor([[[[0.044, 0.139, 0.581,  ..., 0.347, 0.481, 0.480],
          [0.618, 0.096, 0.899,  ..., 0.322, 0.578, 0.275],
          [0.755, 0.969, 0.614,  ..., 0.584, 0.377, 0.761],
          ...,
          [0.480, 0.106, 0.764,  ..., 0.347, 0.217, 0.242],
          [0.517, 0.387, 0.551,  ..., 0.927, 0.154, 0.757],
          [0.694, 0.488, 0.682,  ..., 0.189, 0.773, 0.413]]],


        [[[0.699, 0.021, 0.219,  ..., 0.890, 0.040, 0.271],
          [0.471, 0.255, 0.925,  ..., 0.387, 0.093, 0.201],
          [0.236, 0.188, 0.429,  ..., 0.503, 0.478, 0.738],
          ...,
          [0.930, 0.344, 0.852,  ..., 0.603, 0.394, 0.571],
          [0.943, 0.976, 0.595,  ..., 0.767, 0.018, 0.038],
          [0.200, 0.051, 0.579,  ..., 0.186, 0.638, 0.132]]]], device='cuda:0')
y_torch :
 tensor([[-0.335,  1.611,  0.588, -2.325,  0.534,  2.595, -1.717,  1.788,  0.683,
         -0.585],
        [-1.208,  0.773,  0.760,  0.314, -1.665,  1.775, -1.966,  2.812, -4.026,
         -2.408]], devic

In [23]:
# eval function

def func_eval(model, data_iter, device):
    with torch.no_grad():
        n_total, n_correct = 0,0
        model.eval()
        for batch_in, batch_out in data_iter:
            y_target = batch_out.to(device)
            model_pred = model(batch_in.view(-1,1,28,28).to(device))
            _,y_pred = torch.max(model_pred.data, 1)
            n_correct += (y_pred == y_target).sum().item()
            n_total += batch_in.size(0)
        val_accr = (n_correct/n_total)
        model.train()
    return val_accr

In [24]:
# Evaluation

C.init_param()
train_accr = func_eval(C, train_iter, device)
test_accr = func_eval(C, test_iter, device)
print(f"train_accr : {train_accr} \n test_accr : {test_accr}")

train_accr : 0.09823333333333334 
 test_accr : 0.0976


In [27]:
# Train

print("Start training.")
C.init_param()
C.train()
EPOCHS, print_every = 10,1
for epoch in range(EPOCHS):
    loss_val_sum = 0
    for batch_in, batch_out in train_iter :
        # Forward path
        y_pred = C.forward(batch_in.view(-1,1,28,28).to(device))
        loss_out = loss(y_pred, batch_out.to(device))
        # Update
        optm.zero_grad()
        loss_out.backward()
        optm.step()
        loss_val_sum += loss_out
    
    loss_val_avg = loss_val_sum / len(train_iter)

    if ((epoch%print_every) == 0) or (epoch==(EPOCHS-1)):
        train_accr = func_eval(C, train_iter, device)
        test_accr = func_eval(C, test_iter, device)
        print(f"epoch : {epoch:d} | loss : {loss_val_avg:.3f} | train_accr : {train_accr:.3f} | test_accr : {test_accr:.3f}")

Start training.
epoch : 0 | loss : 0.444 | train_accr : 0.968 | test_accr : 0.967
epoch : 1 | loss : 0.130 | train_accr : 0.982 | test_accr : 0.981
epoch : 2 | loss : 0.096 | train_accr : 0.985 | test_accr : 0.984
epoch : 3 | loss : 0.079 | train_accr : 0.989 | test_accr : 0.986
epoch : 4 | loss : 0.070 | train_accr : 0.989 | test_accr : 0.986
epoch : 5 | loss : 0.061 | train_accr : 0.992 | test_accr : 0.989
epoch : 6 | loss : 0.056 | train_accr : 0.993 | test_accr : 0.988
epoch : 7 | loss : 0.052 | train_accr : 0.994 | test_accr : 0.989
epoch : 8 | loss : 0.047 | train_accr : 0.994 | test_accr : 0.990
epoch : 9 | loss : 0.043 | train_accr : 0.995 | test_accr : 0.989
