In [1]:
import matplotlib.pyplot as plt
import numpy
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

In [None]:
# архитектура модели
# гиперпараметры
# загрузчик данных
# формирование батча
# инициализировать модель
# оптимизатор
# функция потерь
# опционально шедулеры
# трейн луп

In [2]:
# hyperparams
inp_dim = 28 * 28
hidden = 256
out_dim = 10
device_id = -1
device = 'cpu' if device_id == -1 else f'cuda:{device_id}' # 'cuda:0' id GPU
n_epochs = 10
batch_size = 128

In [3]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5), (0.5)),
     ])

# зашружаем тренировочный сет
dataset_train = datasets.MNIST('.', 
                               train=True,            
                               download=True, 
                               transform=transform)

dataset_test = datasets.MNIST('.', 
                              train=False,
                              download=True, 
                              transform=transform)


In [4]:
dataset_train

Dataset MNIST
    Number of datapoints: 60000
    Root location: .
    Split: Train
    StandardTransform
Transform: Compose(
               ToTensor()
               Normalize(mean=0.5, std=0.5)
           )

In [None]:

plt.imshow(dataset_train.data[0].detach().numpy())
plt.show()

In [9]:
class LinearModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_p=0.1):
        super().__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        # custom init nn.init.xavier_uniform(self.linear1.weight) 
        self.do = nn.Dropout(dropout_p)
        self.linear2 = nn.Linear(hidden_dim, output_dim)
        self.activ = nn.ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activ(x)
        x = self.do(x)
        x = self.linear2(x)

        return x


In [5]:
# Как формируется батч
# dataloader берет из Samler набор индексов и по этим индексам получает элементы из Dataset
dataset = datasets.MNIST('.', train=True, download=True)

def collate_fn(data: list):
    # data = [(pic, target)...]
    pics = []
    targets = []
    for item in data:
        pics.append(numpy.array(item[0]))
        targets.append(item[1])
    pics = torch.from_numpy(numpy.array(pics)).float() / 255
    pics = pics.view(pics.size(0), -1) # mtx 28x28 to vec 7xx
    targets = torch.from_numpy(numpy.array(targets))

    return {
        'data': pics,
        'target': targets,
    }

In [8]:
dataloader = DataLoader(dataset, 
                        batch_size, 
                        shuffle=True, 
                        collate_fn=collate_fn,
                        drop_last = True,
                        )

In [10]:
model = LinearModel(inp_dim, hidden, out_dim).to(device)
model.train() 
optim = torch.optim.Adam(model.parameters())
loss_func = nn.CrossEntropyLoss()


In [20]:
60000 // 128 # число шагов

468

In [15]:
%%time
for epoch in range(n_epochs):
    
    for i, batch in enumerate(dataloader):
        optim.zero_grad()

        predict = model(batch['data'].to(device))
        loss = loss_func(predict, batch['target'].to(device).long())
        loss.backward()
        optim.step()
        if i % 400 == 0:
            print(f'epoch: {epoch}, step: {i}, loss: {loss.item()}')
    
    # TODO Добавить тест модели на тестовом датасете
    #save every epoch
    torch.save(model.state_dict(), f'./chkpt_cv1_{epoch}.pth')

epoch: 0, step: 0, loss: 2.3173484802246094
epoch: 0, step: 400, loss: 0.162078395485878
epoch: 1, step: 0, loss: 0.16323435306549072
epoch: 1, step: 400, loss: 0.16989266872406006
epoch: 2, step: 0, loss: 0.0922124981880188
epoch: 2, step: 400, loss: 0.05404552444815636
epoch: 3, step: 0, loss: 0.10862594097852707
epoch: 3, step: 400, loss: 0.0844673290848732
epoch: 4, step: 0, loss: 0.03672228753566742
epoch: 4, step: 400, loss: 0.11693213135004044
epoch: 5, step: 0, loss: 0.07009770721197128
epoch: 5, step: 400, loss: 0.0764431357383728
epoch: 6, step: 0, loss: 0.05374812334775925
epoch: 6, step: 400, loss: 0.03128642961382866
epoch: 7, step: 0, loss: 0.020395509898662567
epoch: 7, step: 400, loss: 0.08720237761735916
epoch: 8, step: 0, loss: 0.019754230976104736
epoch: 8, step: 400, loss: 0.033190954476594925
epoch: 9, step: 0, loss: 0.014715553261339664
epoch: 9, step: 400, loss: 0.03923499584197998
CPU times: total: 2min 55s
Wall time: 1min 33s


# Добавить тест модели на тестовом датасете

nn.CrossEntropyLoss() - это функция потерь (loss function) в библиотеке PyTorch, которая широко используется в задачах классификации.
nn.CrossEntropyLoss() принимает на вход выходные данные (logits) модели и соответствующие им истинные метки классов, и вычисляет функцию потерь, которая показывает, насколько хорошо модель предсказывает метки классов.

Более конкретно, nn.CrossEntropyLoss() применяет экспоненту к выходам модели (чтобы получить вероятности), затем вычисляет логарифмы вероятностей и применяет отрицательный логарифм правдоподобия для вычисления ошибки модели. После этого суммируется ошибка для каждого примера в батче и возвращается среднее значение ошибки.

In [13]:
test_dataloader = DataLoader(dataset_test, 
                        batch_size, 
                        shuffle=True, 
                        collate_fn=collate_fn,
                        drop_last = True,
                        )
# переводим модель в режим тестирования
model.eval()

# test_loss - это среднее значение функции потерь на тестовом датасете.
test_loss = 0

# correct - количество правильно классифицированных примеров correct
correct = 0

# total - общее количество примеров total
total = 0

# test_accuracy - это точность модели на тестовом датасете

# отключаем вычисление градиентов
with torch.no_grad():
    for batch in test_dataloader:
        inputs = batch['data'].to(device)
        targets = batch['target'].to(device).long()
        outputs = model(inputs)
        
        loss = loss_func(outputs, targets)
        test_loss += loss.item()

        predicted = outputs.argmax(dim=1)
        correct += (predicted == targets).sum().item()
        total += targets.size(0)

test_accuracy = correct / total
test_loss /= len(test_dataloader)
print(f"Test accuracy: {test_accuracy}")
print(f"Test loss: {test_loss}")


Test accuracy: 0.09835737179487179
Test loss: 2.303620763314076


# Новый раздел

In [27]:
# hyperparams
input_ch = 1
hidden_ch = 128
out_dim = 10
device_id = -1 # device_id = 0 
device = 'cpu' if device_id == -1 else f'cuda:{device_id}'
n_epochs = 10
batch_size = 128

input_ch - количество входных каналов в изображении, обычно равно 3 для RGB-изображений и 1 для монохромных.
hidden_ch - количество выходных каналов для каждого сверточного слоя.
output_dim - количество классов для классификации.
dropout_p - вероятность выключения нейрона в Dropout-регуляризации.

# Добавить батч норм, еще уменьшить размер фичматрицы, увеличить число выходных каналов, поправить при изменении числа каналов и размера фич матриц

nn.BatchNorm2d - это слой нормализации пакета (Batch Normalization) в двумерной сверточной нейронной сети. Он принимает на вход тензор размера (batch_size, num_channels, height, width) и выполняет нормализацию пакета на каждом канале, чтобы сделать их более устойчивыми к изменениям в данных.
После нормализации происходит масштабирование и сдвиг на два параметра, которые обучаются во время обучения модели, чтобы восстановить оригинальный диапазон значений.

In [28]:
class ConvDO(nn.Module):
    def __init__(self, input_ch, output_ch, kernel_size, stride, padding, 
                 dropout_p=0.1):
        super().__init__()
        self.conv = nn.Conv2d(
            input_ch, 
            output_ch, 
            kernel_size=kernel_size,
            stride=stride,
            padding=padding, 
        )
        # TODO добавить батч норм
        self.bn = nn.BatchNorm2d(output_ch)
        self.do = nn.Dropout(dropout_p)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)  # применение BatchNorm2d
        return self.do(x)

class ConvModel(nn.Module):
    def __init__(self, input_ch, hidden_ch, output_dim, dropout_p=0.1):
        super().__init__()
#         self.conv1 = ConvDO(input_ch, hidden_ch, 5, 2, 2) # уменьшим размер выходной фичматрицы в 2 раза
#         # TODO еще уменьшить размер фичматрицы
#         self.conv2 = ConvDO(hidden_ch, hidden_ch, 3, 1, 1)
#         # TODO увеличить число выходных каналов
#         self.conv3 = ConvDO(hidden_ch, 1, 3, 1, 1)
#         self.linear = nn.Linear(
#             14 * 14, # TODO поправить при изменении числа каналов и размера фич матриц
#             output_dim,
#         )  
        
        
        self.conv1 = ConvDO(input_ch, hidden_ch, 5, 2, 2)
        self.conv2 = ConvDO(hidden_ch, hidden_ch, 3, 1, 1)
        self.conv3 = ConvDO(hidden_ch, hidden_ch*2, 3, 1, 1)
        self.conv4 = ConvDO(hidden_ch*2, 1, 3, 1, 1)
        self.linear = nn.Linear(14 * 14, output_dim,)  
        
# В self.conv3 и self.conv4 изменен размер выходной фичматрицы с hidden_ch на hidden_ch*2 и 
# число выходных каналов с hidden_ch на 1.
# Добавлен слой self.conv4, преобразующий выходной тензор размером 16x16xhidden_ch*2 в тензор размером 14x14x1. 

        self.activ = nn.ReLU()    

    def forward(self, x):
        x = self.activ(self.conv1(x))
        x = self.activ(self.conv2(x))
        x = self.activ(self.conv3(x))
        x = self.activ(self.conv4(x))
        x = self.linear(x.view(x.size(0), -1))

        return x

In [24]:
trainloader = torch.utils.data.DataLoader(dataset_train, 
                                          batch_size=batch_size,
                                          shuffle=True, 
                                          num_workers=2, 
                                          drop_last = True,)

testloader = torch.utils.data.DataLoader(dataset_test, 
                                          batch_size=batch_size,
                                          shuffle=True, 
                                          num_workers=2, 
                                          drop_last = True,)

In [30]:
model_conv = ConvModel(input_ch, hidden_ch, out_dim).to(device)
optim = torch.optim.Adam(model_conv.parameters())
loss_func = nn.CrossEntropyLoss()

In [None]:
for epoch in range(n_epochs):
    for i, batch in enumerate(trainloader):
        inputs, labels = batch
        optim.zero_grad()

        predict = model_conv(inputs.to(device))
        loss = loss_func(predict, labels.to(device))
        loss.backward()
        optim.step()
        if i % 200 == 0:
            print(f'epoch: {epoch}, step: {i}, loss: {loss.item()}')
    
    
    model_conv.train()
    # print('test loss:', loss_test / i)
    #save every epoch

    torch.save(model_conv.state_dict(), f'./chkpt_cv1_conv_{epoch}.pth')


epoch: 0, step: 0, loss: 2.4098689556121826
epoch: 0, step: 200, loss: 0.2259868085384369
