In [None]:
import numpy as np
import pandas as pd
import plotly.express as px
import torch
import tqdm.notebook as tqdm
import random
import math
from functools import partial

from collections import OrderedDict


import matplotlib.pyplot as plt
%matplotlib inline

def set_global_seed(seed: int) -> None:
    """
    Set global seed for reproducibility.
    """

    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

set_global_seed(42)

## Загрузка и обработка данных

In [None]:
import torchvision


In [None]:
transform = torchvision.transforms.Compose([
    torchvision.transforms.RandomHorizontalFlip(),
    torchvision.transforms.RandomCrop(32, 4),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [None]:
ds_train = torchvision.datasets.CIFAR10(
    root='./', train=True, transform=transform, download=True
)
ds_test = torchvision.datasets.CIFAR10(
    root='./', train=False,
    transform=torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )]),
    download=True
)
ds_train, ds_test

Transfom to PIL image

In [None]:
tmean, tstd = transform.transforms[-1].mean, transform.transforms[-1].std
tmean, tstd = np.array(tmean), np.array(tstd)

inverse_transform = torchvision.transforms.Compose([
    torchvision.transforms.Normalize(
        mean=-tmean / tstd,
        std=1.0 / tstd
    ),
    torchvision.transforms.ToPILImage()
])

In [None]:
fig, axes = plt.subplots(2, 5, figsize=(13, 6))

for idx, ds in enumerate((ds_train, ds_test)):
    for jdx, kdx in enumerate(np.random.randint(0, len(ds), size=5)):
        image, label = ds[kdx]
        axes[idx, jdx].imshow(inverse_transform(image))
        axes[idx, jdx].set_title(f'Метка: {label} -> {ds.classes[label]}')

axes[0, 0].set_ylabel('Обучающая выборка')
axes[1, 0].set_ylabel('Тестовая выборка')

fig.tight_layout()
plt.show()

In [None]:
BATCH_SIZE = 128
DEVICE = torch.device('cpu')
if torch.cuda.is_available():
    DEVICE = torch.device('cuda', 0)

print(type(DEVICE), DEVICE)

In [None]:
dl_train = torch.utils.data.DataLoader(
    dataset=ds_train, batch_size=BATCH_SIZE,
    num_workers=2, shuffle=True
)
dl_test = torch.utils.data.DataLoader(
    dataset=ds_test, batch_size=BATCH_SIZE,
    num_workers=2, shuffle=False
)

In [None]:
class ConvNet(torch.nn.Module):
    cfg = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']

    def __init__(self, n_classes=10, use_batchnorm=False, dropout_p=0.0):
        '''
        :param int n_classes: Число выходных признаков
        :param bool use_batchnorm: Использовать ли батчнорм между свёрточными слоями
        :param float dropout_p: Вероятность обнуления активации слоем Dropout
        '''
        super().__init__()

        self.n_classes = n_classes

        ### your code here
        self.in_channels = 3
        self.features = torch.nn.Sequential()
        for cfg_item in self.cfg:
            if isinstance(cfg_item, int):
                self.features.append(torch.nn.Conv2d(self.in_channels,
                                                     out_channels=int(cfg_item),
                                                     kernel_size=3,
                                                     padding=1))
                if use_batchnorm:
                    self.features.append(torch.nn.BatchNorm2d(int(cfg_item)))
                self.features.append(torch.nn.ReLU(inplace=True))
                self.in_channels = int(cfg_item)
            elif cfg_item == "M":
                self.features.append(torch.nn.MaxPool2d(kernel_size=(2, 2), stride=2))

        # self.avgpool = torch.nn.AdaptiveAvgPool2d(output_size=(2, 2))
        self.classifier = torch.nn.Sequential(
            torch.nn.Linear(in_features=512, out_features=512),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=dropout_p),
            torch.nn.Linear(in_features=512, out_features=512),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=dropout_p),
            torch.nn.Linear(in_features=512, out_features=10)
        )
        for m in self.modules():
            if isinstance(m, torch.nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                m.bias.data.zero_()



    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        # x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


In [None]:
def test_model(model, train_dataloader) -> float:
    model.to(DEVICE)
    model.eval()
    with torch.no_grad():
        train_accuracies = []
        for images, labels in tqdm.tqdm(train_dataloader, total=len(train_dataloader), leave=False):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs  = model(images)
            train_accuracies.append(torch.sum(outputs.argmax(dim=1) == labels)/labels.shape[0])
        return float((sum(train_accuracies) / len(train_accuracies)).cpu() * 100)

In [None]:
test_conv_net = ConvNet()
test_conv_net.load_state_dict(torch.load(GLOBAL_PATH + 'Models/VGG_09.11.2022-16:50.pth', map_location=DEVICE))
test_model(test_conv_net, dl_test)

In [None]:
import tqdm.notebook as tqdm
from functools import partial

In [None]:
def training_loop(n_epochs, network, loss_fn, optimizer,scheduler, dl_train, dl_test, device):
    '''
    :param int n_epochs: Число итераций оптимизации
    :param torch.nn.Module network: Нейронная сеть
    :param Callable loss_fn: Функция потерь
    :param torch.nn.Optimizer optimizer: Оптимизатор
    :param torch.utils.data.DataLoader dl_train: Даталоадер для обучающей выборки
    :param torch.utils.data.DataLoader dl_test: Даталоадер для тестовой выборки
    :param torch.device device: Устройство на котором будут происходить вычисления
    :returns: Списки значений функции потерь и точности на обучающей и тестовой выборках после каждой итерации
    '''
    loss_fn.to(device)
    train_losses, test_losses, train_accuracies, test_accuracies = [], [], [], []
    pbar = tqdm.tqdm(range(n_epochs), total=n_epochs, leave=False)
    for epoch in (pbar):

        # Итерация обучения сети
        for images, labels in tqdm.tqdm(dl_train, total=len(dl_train), leave=False):
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            loss = loss_fn(network(images), labels)
            loss.backward()
            optimizer.step()
            scheduler.step()


        # Оцениваем качество модели каждые 3 итерации
        if epoch % 3 == 0 or epoch == n_epochs - 1:
            # Переводим сеть в инференс режим
            network.eval()

            # При тестировании сети нет необходимости считать градиенты, поэтому можно отключить автоматическое дифференцирование
            #   для ускорения операций
            with torch.no_grad():
                # Вычисление качества и функции потерь на обучающей выборке
                tmp_train_losses, tmp_train_accuracies = [], []
                for images, labels in tqdm.tqdm(dl_train, total=len(dl_train), leave=False):
                    images = images.to(device)
                    labels = labels.to(device)

                    outputs  = network(images)

                    tmp_train_losses.append(loss_fn(outputs, labels))
                    tmp_train_accuracies.append(torch.sum(outputs.argmax(dim=1) == labels)/labels.shape[0])

                train_losses.append((sum(tmp_train_losses) / len(tmp_train_losses)).cpu())
                train_accuracies.append((sum(tmp_train_accuracies) / len(tmp_train_accuracies)).cpu() * 100)
                # Вычисление качества и функции потерь на тестовой выборке
                tmp_test_losses, tmp_test_accuracies = [], []
                for images, labels in tqdm.tqdm(dl_test, total=len(dl_test), leave=False):
                    images = images.to(device)
                    labels = labels.to(device)

                    outputs  = network(images)

                    tmp_test_losses.append(loss_fn(outputs, labels))
                    tmp_test_accuracies.append(torch.sum(outputs.argmax(dim=1) == labels)/labels.shape[0])

                test_losses.append((sum(tmp_test_losses) / len(tmp_test_losses)).cpu())
                test_accuracies.append((sum(tmp_test_accuracies) / len(tmp_test_accuracies)).cpu() * 100)

            pbar.set_description(
                'Loss (Train/Test): {0:.3f}/{1:.3f}. Accuracy, % (Train/Test): {2:.2f}/{3:.2f}\n'.format(
                    train_losses[-1], test_losses[-1], train_accuracies[-1], test_accuracies[-1]
                )
            )

    return train_losses, test_losses, train_accuracies, test_accuracies

In [None]:
train_func = partial(
    training_loop, n_epochs=70, loss_fn=torch.nn.CrossEntropyLoss(),
    dl_train=dl_train, dl_test=dl_test, device=DEVICE
)

In [None]:
conv_net = ConvNet()
conv_net.to(DEVICE)

In [None]:
optimizer = torch.optim.SGD(conv_net.parameters(), lr=0.05, momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.9)

In [None]:
train_losses, test_losses, train_accs, test_accs = train_func(
    network=conv_net,
    optimizer=optimizer,
    scheduler=scheduler
)

In [None]:
import time
torch.save(conv_net.state_dict(), GLOBAL_PATH + f'Models/VGG_{time.strftime("%d.%m.%Y-%H:%M")}.pth')

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(np.arange(len(train_accs)) * 3, train_accs, label="Точность на обучении", color='red', marker='.', linestyle='-.')
ax.plot(np.arange(len(test_accs)) * 3, test_accs, label="Точность на тесте", color='red', marker='*')

ax.set_xlabel("Номер эпохи")
ax.set_ylabel("$\%$")

ax.grid(True)
ax.legend()

fig.tight_layout()
plt.show()

In [None]:
np.save(GLOBAL_PATH + f'/VGG_train_accuracy_{time.strftime("%d.%m.%Y-%H:%M")}.npy', train_accs)
np.save(GLOBAL_PATH + f'/VGG_test_accuracy_{time.strftime("%d.%m.%Y-%H:%M")}.npy', test_accs)

In [None]:
with open(GLOBAL_PATH + f'/VGG_train_accuracy_21.04.2023-11:02.npy', 'rb') as f:
    a = np.load(f)
with open(GLOBAL_PATH + f'/VGG_test_accuracy_21.04.2023-11:02.npy', 'rb') as f:
    b = np.load(f)

fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(np.arange(len(a)) * 3, a, label="Точность на обучении", color='red', marker='.', linestyle='-.')
ax.plot(np.arange(len(b)) * 3, b, label="Точность на тесте", color='red', marker='*')

ax.set_xlabel("Номер эпохи")
ax.set_ylabel("$\%$")

ax.grid(True)
ax.legend()

fig.tight_layout()
plt.show()

In [None]:
df_plot = pd.concat([pd.DataFrame({'accuracy, %': train_accs}),
                     pd.DataFrame({'accuracy, %': test_accs})
                    ],
                    keys=['Точность на обучении', 'Точность на тесте'])
x = [list(np.arange(len(train_accs)) * 3) * 2]
df_plot.reset_index(inplace=True)
df_plot.set_index(x, inplace=True)
df_plot.reset_index(inplace=True)

In [None]:
fig = px.line(df_plot, x='level_1', y="accuracy, %", color='level_0')
fig.show()