# Python и машинное обучение: нейронные сети и компьютерное зрение

## Модуль 3b. Сверточные нейронные сети (CNN)

Convolutional Neural Networks, "Сверточные Нейросети". Применение операции "свертка" в нескольких слоях нейросети - революционное решение, которое позволило радикально повысить качество решения задач компьютерного зрения. Впервые было продемонстировано Яном Лекуном в Париже в 1993 году в задаче "распознавание рукописных цифр" (датасет MNIST).

В данном модуле рассмотрим процесс создания и обучение сверточных нейросетей в PyTorch:
- операция "свертка", определение, реализация в PyTorch;
- создание многослойной нейронной сети со сверточными слоями и распознавание рукописных цифр:
    - добавление сверточных слоев,
    - слои-объединения (max-pooling),
    - пакетная нормализация (batch normalization);
- задача бинарной классификации на примере набора полноцвентых изображений "кошки против собак", обучение нейросети с "ноля";
- дообучение готовых моделей, загруженных из интернета (на примере модели архитектуры ResNet-50).





In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision
import torchvision.transforms as transforms

from torch.nn.functional import normalize

from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn.functional as F

import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

### Операция "Свертка" (Convolution)

Выполнение операции "двумерная свертка" (convolution 2D) иллюстрирует анимированное изображение ниже. Суть операции в следующем:
- ядро свертки - квадратная матрица размером $n * n$
- по матрице размера $M * N$ , над которой выполняется операция "свертка"  запускается "окно" размером с ядро свертки ($n * n$), оно двигается слева направо, сверху вниз,с заданным шагом (stride);
- на каждом шаге происходит поэлементное умножение ядра свертки на содержимое текущего "окна", происходит суммирование произведения (операция эквивалентна скалярному произведению "плоских" представлений соотв. матриц);
- результат записывается в соответствующую ячейку результирующей матрицы.

![Convolution Gif](./conv.gif)

На данном изображении к исходной матрице применяется свертка со следующим ядром:

```
 0 1 2 
 2 2 0 
 0 1 2
```

Запустим "свертку" на изображении.

Для этого создадим сверточный слой вне сети и запустим в нем некоторую свертку поверх изображения Лены.

In [None]:
from PIL import Image, ImageFilter  

lena = Image.open('lena.jpg')

display(lena)

t_lena = transforms.ToTensor()(lena)

print(t_lena.shape)

In [None]:
# создаем сверточный слой вне сети
conv = nn.Conv2d(in_channels=1, # принимает одноканальное изображение размером (1, 256, 256)
                 out_channels=1, # отдает тоже одноканальное изображение размером (1, 254, 254) 
                 kernel_size=3, # ядро размера 3х3
                 padding=0, 
                 bias=False)
# обратите внимание на размер тензора весов сети
# assert conv.weight.shape == (OUT_CHANNELS, IN_CHANNELS, KERNEL_SIZE, KERNEL_SIZE)
conv.weight.shape

In [None]:
# сделаем Лену одноканальной, чтобы передать ее в сверточный слой
t_lena_bw = (t_lena.sum(axis=0) / 3).unsqueeze(0)

display( transforms.ToPILImage()( t_lena_bw ) )

In [None]:
# возьмем для примера "вертикальное" ядро детектора краев Собеля-Фельдмана 
kernel = torch.tensor([[-1, 0, 1], 
                       [-2, 0, 2], 
                       [-1, 0, 1]], dtype =torch.float32)




In [None]:
# нормализуем вывод
out_img -= out_img.min()
out_img /= out_img.max()

display( transforms.ToPILImage()( out_img ) )


In [None]:
kernel = torch.tensor([[2, 2, 2], 
                       [2, 2, 2], 
                       [2, 2, 2]], dtype =torch.float32)

# разрешаем копировать веса и копируем их
with torch.no_grad():
    conv.weight.copy_( kernel )
    
out_img = conv(t_lena_bw)

out_img -= out_img.min()
out_img /= out_img.max()

display( transforms.ToPILImage()( out_img ) )

### Практика

1. В коде вверху поэкпериментируйте с другими значениями весов ядра свертки. Поищите в интернете веса для других фильтров, попробуйте их применить на исходном изображении.

2. Реализуйте детектор краев Собеля-Фельдмана. То, что мы уже получили - это матрица градиентов по вертикальной оси. Используя вертикально "отзеркаленное" и транспонированное ядро, нужно получить матрицу градиентов по горизонтальной оси, а затем вычислить квадратный корень из их суммы: $G = \sqrt{G_x^2 + G_y^2}$. Номализуйте полученное изображение и выведите его на экран.

Суть метода Собеля-Фельдмана:
![image.png](attachment:image.png)

Ядра сверток:
![image-2.png](attachment:image-2.png)

Итоговое изображение:
![image-3.png](attachment:image-3.png)

<small>Источник: 
1. https://www.adeveloperdiary.com/data-science/computer-vision/how-to-implement-sobel-edge-detection-using-python-from-scratch/
2. https://www.reg.ru/blog/svyortka-v-deep-learning-prostymi-slovami/    
    
</small>

In [None]:
kernel_v = torch.tensor([[-1, 0, 1], 
                       [-2, 0, 2], 
                       [-1, 0, 1]], dtype =torch.float32)

kernel_h = kernel_v.flip(1).T

# ваш код здесь
with torch.no_grad():
    conv.weight.copy_( kernel_v )
grad_v = conv(t_lena_bw)

with torch.no_grad():
    conv.weight.copy_( kernel_h )
grad_h = conv(t_lena_bw)

out_img = torch.sqrt( grad_v ** 2 + grad_h ** 2 )

out_img -= out_img.min()
out_img /= out_img.max()

display( transforms.ToPILImage()( out_img ) )

### Обучение нейросети на наборе данных MINST

Это те же "рукописные цифры", только с разрешением 28x28 пикселей и в количестве 10000 экзепляров. Инструменты для работы с ним есть во всех фреймворках для работы с нейросетями. PyTorch - не исключение.

In [None]:
from torchvision import datasets

mean, std = 0.1307, 0.3081

mnist_transforms = transforms.Compose([transforms.ToTensor(),
                                       transforms.Normalize(mean=mean, std=std)])

trainset = datasets.MNIST('./data', download=True, train=True, transform=mnist_transforms)
valset = datasets.MNIST('./data', download=True, train=False, transform=mnist_transforms)

print(len(trainset))
print(len(valset))

In [None]:
batch_size=128

train_generator_MNIST = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
val_generator_MNIST = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=True)

## Сверточная сеть архитектуры LeNet-5

![image.png](attachment:image.png)

In [None]:
class LeNet5V1(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature = nn.Sequential(
            #1
            nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2),   # 28*28->32*32-->28*28
            nn.Tanh(),
            nn.AvgPool2d(kernel_size=2, stride=2),  # 14*14
            
            #2
            nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1),  # 10*10
            nn.Tanh(),
            nn.AvgPool2d(kernel_size=2, stride=2),  # 5*5
            
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=16*5*5, out_features=120),
            nn.Tanh(),
            nn.Linear(in_features=120, out_features=84),
            nn.Tanh(),
            nn.Linear(in_features=84, out_features=10),
        )
        
    def forward(self, x):
        return self.classifier(self.feature(x))
    
    

In [None]:
model = LeNet5V1()
print(model)


In [None]:
# функция для расчета точности
def accuracy_fn(logps, labels):
    pred_classes = torch.argmax(torch.exp(logps), axis=1)
    val_classes = labels
    return float(torch.eq(pred_classes, val_classes).sum() / labels.shape[0])


def train_batches(model,
                  train_generator,
                  valid_generator,
                  batch_size=20, epochs=40, report_positions=20, **kwargs):

    results = {'epoch_count': [], 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}

    # прогоняем данные по нейросети
    for epoch in range(epochs):
        model.train()

        train_loss = valid_loss = 0.0;
        train_correct = valid_correct = 0.0

        for X_batch, y_batch in train_generator:

            X_batch = X_batch.to(device); y_batch = y_batch.to(device)

            y_logps = model(X_batch) #логарифмы вероятности отнесения к классам
            loss = criterion(y_logps, y_batch) #кросс-энтропия

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.data.item()
            train_correct += accuracy_fn(y_logps, y_batch) * y_batch.shape[0]

        train_loss /= len(train_generator.dataset)
        train_acc = 100 * train_correct / len(train_generator.dataset)

        # Валидацию тоже делаем по батчам
        model.eval()

        for valid_batches, (X_val_batch, y_val_batch) in enumerate(valid_generator):
            X_val_batch = X_val_batch.to(device); y_val_batch = y_val_batch.to(device)
            y_batch_logps = model(X_val_batch)
            loss = criterion(y_batch_logps, y_val_batch)

            valid_loss += loss.data.item()
            valid_correct += accuracy_fn(y_batch_logps, y_val_batch) * y_val_batch.shape[0]

        valid_loss /= len(valid_generator.dataset)
        valid_acc = 100 * valid_correct / len(valid_generator.dataset)

        results['epoch_count'] += [epoch]
        results['train_loss'] += [ train_loss ]
        results['train_acc'] += [ train_acc ]
        results['val_loss'] += [ valid_loss ]
        results['val_acc'] += [ valid_acc ]

        if epoch % (epochs // report_positions) == 0 or epochs<50:
            print(f"Epoch: {epoch+1:4.0f} | Train Loss: {train_loss:.5f}, "+\
                  f"Accuracy: {train_acc:.2f}% | \
                Validation Loss: {valid_loss:.5f}, Accuracy: {valid_acc:.2f}%")

    return results

# рисовалка графиков
def plot_results(results):

    fig, axs = plt.subplots(1,2)

    fig.set_size_inches(10,3)

    for i, loss_acc in enumerate(['loss', 'acc']):
        for train_val in ['train', 'val']:
            axs[i].plot(results['epoch_count'], results[f'{train_val}_{loss_acc}'], label=f'{loss_acc} {train_val}')

        axs[i].legend()

    plt.show()

In [None]:
device = "cuda" if torch.cuda.is_available() else \
    "mps" if torch.backends.mps.is_built() else "cpu"
device

In [None]:

dict_vary = {'hidden': 32,
            'activation': 'tanh',
            'batch_size': batch_size,
            'lr': 0.01,
            'momentum': 0.9,
            'optimizer': 'RMSprop',
            'epochs': 60}

model = LeNet5V1().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=dict_vary['lr'])
criterion = nn.CrossEntropyLoss()

results = train_batches(model,
                        train_generator_MNIST,
                        val_generator_MNIST, report_positions=6, **dict_vary)

plot_results(results)
summary(model,
        input_size=images.shape,
        col_names=["input_size", "output_size", "num_params"],
        device=device
       )