# Сверточные нейронные сети

## 1. Реализовать СНН на MNIST

In [107]:
import os
import tqdm
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

from torchvision import datasets, transforms
import torch.nn.functional as F

BATCH_SIZE = 32
NUM_EPOCHS = 10
FOLDER = 'MNIST_data'

if not os.path.exists(FOLDER):
    os.mkdir(FOLDER)
    
trans = transforms.Compose([transforms.ToTensor()])

# данные
train_set = datasets.MNIST(root=FOLDER, train=True, transform=trans, download=True)
test_set = datasets.MNIST(root=FOLDER, train=False, transform=trans, download=True)

# итераторы
train_loader = torch.utils.data.DataLoader(dataset=train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_set, batch_size=BATCH_SIZE, shuffle=False)
sample_x, sample_y = next(iter(train_loader))

In [99]:
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, (5,5), padding=2)
        self.prelu1 = nn.PReLU()
        self.bn1 = nn.BatchNorm2d(6)
#        self.bn1 = CustomBatchNorm2d(6)
        self.conv2 = nn.Conv2d(6, 16, (5,5))
        self.prelu2 = nn.PReLU()
        self.bn2 = nn.BatchNorm2d(16)
        self.fc1   = nn.Linear(16*5*5, 120)
        self.fc2   = nn.Linear(120, 84)
        self.fc3   = nn.Linear(84, 10)
        
    def forward(self, x):
        x = F.max_pool2d(self.bn1(self.prelu1(self.conv1(x))), (2,2))
        x = F.max_pool2d(self.bn2(self.prelu2(self.conv2(x))), (2,2))
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.dropout(x, 0.1)
        x = self.fc3(x)
        return x
    
    def num_flat_features(self, x):
        size = x.size()[1:]
        num_features = 1
        for s in size:
            num_features *= s
        return num_features

net = LeNet()
print (net)

LeNet(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (prelu1): PReLU(num_parameters=1)
  (bn1): BatchNorm2d(6, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (prelu2): PReLU(num_parameters=1)
  (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=400, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
)


In [100]:
optimizer = optim.Adam(net.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

In [101]:
for epoch in range(NUM_EPOCHS):
    # trainning
    av_loss = 0.
    correct = 0.
#    for x, y in tqdm.tqdm(train_loader):
    for x, y in train_loader:
        # рассчитываем функцию потерь
        out = net(x)
        loss = criterion(out, y)
        # оптимизация параметров
        ## первым шагом обнулим градиенты предыдущего шага (важный момент,
        ## без этого результаты теряют корректность)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # подсчет статистики за эпоху        
        pred = out.max(1, keepdim=True)[1]
        correct += pred.eq(y.view_as(pred)).sum().item()
        av_loss += loss.item()
    print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(train_loader.dataset) * 100))

Epoch: 0; Accuracy train: 95.600%
Epoch: 1; Accuracy train: 97.688%
Epoch: 2; Accuracy train: 97.870%
Epoch: 3; Accuracy train: 98.005%
Epoch: 4; Accuracy train: 98.100%
Epoch: 5; Accuracy train: 98.247%
Epoch: 6; Accuracy train: 98.348%
Epoch: 7; Accuracy train: 98.435%
Epoch: 8; Accuracy train: 98.488%
Epoch: 9; Accuracy train: 98.327%


In [102]:
av_loss = 0.
correct = 0.
for x, y in test_loader:
    # рассчитываем функцию потерь
    out = net(x)
    loss = criterion(out, y)
    # подсчет статистики за эпоху        
    pred = out.max(1, keepdim=True)[1]
    correct += pred.eq(y.view_as(pred)).sum().item()
    av_loss += loss.item()
print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(test_loader.dataset) * 100))

Epoch: 9; Accuracy train: 98.530%


# Custrom BatchNorm2d

In [103]:
class CustomBatchNorm2d(nn.BatchNorm2d):
    def forward(self, x):
        y = x
        y = y.view(x.size(1), -1)
        mu = y.mean(dim=1)
        sigma2 = y.var(dim=1)
        if self.training is not True:
            y = y - self.running_mean.view(-1, 1)
            y = y / (self.running_var.view(-1, 1)**.5 + self.eps)
        else:
            if self.track_running_stats is True:
                with torch.no_grad():
                    self.running_mean = (1-self.momentum)*self.running_mean + self.momentum*mu
                    self.running_var = (1-self.momentum)*self.running_var + self.momentum*sigma2
            y = y - mu.view(-1,1)
            y = y / (sigma2.view(-1,1)**.5 + self.eps)

        y = self.weight.view(-1, 1) * y + self.bias.view(-1, 1)
        return y.view(x.shape)

In [104]:
class LeNet2(nn.Module):
    def __init__(self):
        super(LeNet2, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, (5,5), padding=2)
        self.prelu1 = nn.PReLU()
#        self.bn1 = nn.BatchNorm2d(6)
        self.bn1 = CustomBatchNorm2d(6)
        self.conv2 = nn.Conv2d(6, 16, (5,5))
        self.prelu2 = nn.PReLU()
#        self.bn2 = nn.BatchNorm2d(16)
        self.bn2 = CustomBatchNorm2d(16)
        self.fc1   = nn.Linear(16*5*5, 120)
        self.fc2   = nn.Linear(120, 84)
        self.fc3   = nn.Linear(84, 10)
        
    def forward(self, x):
        x = F.max_pool2d(self.bn1(self.prelu1(self.conv1(x))), (2,2))
        x = F.max_pool2d(self.bn2(self.prelu2(self.conv2(x))), (2,2))
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.dropout(x, 0.1)
        x = self.fc3(x)
        return x
    
    def num_flat_features(self, x):
        size = x.size()[1:]
        num_features = 1
        for s in size:
            num_features *= s
        return num_features

net2 = LeNet2()
print (net2)

LeNet2(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (prelu1): PReLU(num_parameters=1)
  (bn1): CustomBatchNorm2d(6, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (prelu2): PReLU(num_parameters=1)
  (bn2): CustomBatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=400, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
)


In [105]:
optimizer = optim.Adam(net.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

In [106]:
for epoch in range(NUM_EPOCHS):
    # trainning
    av_loss = 0.
    correct = 0.
    for x, y in train_loader:
        # рассчитываем функцию потерь
        out = net2(x)
        loss = criterion(out, y)
        # оптимизация параметров
        ## первым шагом обнулим градиенты предыдущего шага (важный момент,
        ## без этого результаты теряют корректность)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # подсчет статистики за эпоху        
        pred = out.max(1, keepdim=True)[1]
        correct += pred.eq(y.view_as(pred)).sum().item()
        av_loss += loss.item()
    print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(train_loader.dataset) * 100))

Epoch: 0; Accuracy train: 9.512%
Epoch: 1; Accuracy train: 9.525%
Epoch: 2; Accuracy train: 9.497%
Epoch: 3; Accuracy train: 9.427%


KeyboardInterrupt: 

In [None]:
av_loss = 0.
correct = 0.
for x, y in test_loader:
    # рассчитываем функцию потерь
    out = net2(x)
    loss = criterion(out, y)
    # подсчет статистики за эпоху        
    pred = out.max(1, keepdim=True)[1]
    correct += pred.eq(y.view_as(pred)).sum().item()
    av_loss += loss.item()
print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(test_loader.dataset) * 100))

#  RMSprop

In [None]:
from torch.optim.optimizer import Optimizer, required
import copy

class MyRMSProp(Optimizer):
    
    def __init__(self, params, lr=1e-2, alpha=0.9, eps=1e-8):
        defaults = dict(lr=lr, alpha=alpha, eps=eps)
        super(MyRMSProp, self).__init__(params, defaults)

    def __setstate__(self, state):
        super(MyRMSProp, self).__setstate__(state)

    def step(self, closure=None):
        loss = None
        if closure is not None:
            loss = closure()

        for group in self.param_groups:
            alpha = group['alpha']
            lr = group['lr']
            eps = group['eps']
            for p in group['params']:
                if p.grad is None:
                    continue
                d_p = p.grad.data
                state = self.state[p]
                if len(state) == 0:
                    state['grad_squared'] = torch.zeros_like(p.data)

                grad_squared = state['grad_squared']

                grad_squared.mul_(alpha)
                grad_squared.addcmul_(1-alpha, d_p, d_p)

                grad_avg = grad_squared.sqrt().add_(eps)

                p.data.addcdiv_(-lr, d_p, grad_avg)

        return loss

In [None]:
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, (5,5), padding=2)
        self.prelu1 = nn.PReLU()
        self.bn1 = nn.BatchNorm2d(6)
#        self.bn1 = CustomBatchNorm2d(6)
        self.conv2 = nn.Conv2d(6, 16, (5,5))
        self.prelu2 = nn.PReLU()
        self.bn2 = nn.BatchNorm2d(16)
        self.fc1   = nn.Linear(16*5*5, 120)
        self.fc2   = nn.Linear(120, 84)
        self.fc3   = nn.Linear(84, 10)
        
    def forward(self, x):
        x = F.max_pool2d(self.bn1(self.prelu1(self.conv1(x))), (2,2))
        x = F.max_pool2d(self.bn2(self.prelu2(self.conv2(x))), (2,2))
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.dropout(x, 0.1)
        x = self.fc3(x)
        return x
    
    def num_flat_features(self, x):
        size = x.size()[1:]
        num_features = 1
        for s in size:
            num_features *= s
        return num_features

In [None]:
net_rmsprop = LeNet()
print (net_rmsprop)

In [None]:
optimizer = MyRMSProp(net_rmsprop.parameters())
criterion = nn.CrossEntropyLoss()

In [None]:
NUM_EPOCHS = 10

for epoch in range(NUM_EPOCHS):
    # trainning
    av_loss = 0.
    correct = 0.
    for x, y in train_loader:
        # рассчитываем функцию потерь
        out = net_rmsprop(x)
        loss = criterion(out, y)
        # оптимизация параметров
        ## первым шагом обнулим градиенты предыдущего шага (важный момент,
        ## без этого результаты теряют корректность)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # подсчет статистики за эпоху        
        pred = out.max(1, keepdim=True)[1]
        correct += pred.eq(y.view_as(pred)).sum().item()
        av_loss += loss.item()
    print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(train_loader.dataset) * 100))

# SGD vs. RMSProp

In [None]:
net_sgd = LeNet()
print (net_sgd)

In [None]:
optimizer = optim.SGD(net_sgd.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

In [None]:
NUM_EPOCHS = 10

for epoch in range(NUM_EPOCHS):
    # trainning
    av_loss = 0.
    correct = 0.
    for x, y in train_loader:
        # рассчитываем функцию потерь
        out = net_sgd(x)
        loss = criterion(out, y)
        # оптимизация параметров
        ## первым шагом обнулим градиенты предыдущего шага (важный момент,
        ## без этого результаты теряют корректность)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # подсчет статистики за эпоху        
        pred = out.max(1, keepdim=True)[1]
        correct += pred.eq(y.view_as(pred)).sum().item()
        av_loss += loss.item()
    print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(train_loader.dataset) * 100))

# RMSProp Accuracy test

In [None]:
av_loss = 0.
correct = 0.
for x, y in test_loader:
    # рассчитываем функцию потерь
    out = net_rmsprop(x)
    loss = criterion(out, y)
    # подсчет статистики за эпоху        
    pred = out.max(1, keepdim=True)[1]
    correct += pred.eq(y.view_as(pred)).sum().item()
    av_loss += loss.item()
print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(test_loader.dataset) * 100))

# SGD Accuracy test

In [None]:
av_loss = 0.
correct = 0.
for x, y in test_loader:
    # рассчитываем функцию потерь
    out = net_sgd(x)
    loss = criterion(out, y)
    # подсчет статистики за эпоху        
    pred = out.max(1, keepdim=True)[1]
    correct += pred.eq(y.view_as(pred)).sum().item()
    av_loss += loss.item()
print('Epoch: {}; Accuracy train: {:.3f}%'.format(epoch, correct / len(test_loader.dataset) * 100))

### RMSProp  Accuracy test: 96.440% 
### SGD Accuracy test = 98.760%