In [2]:
from torchvision.datasets import MNIST, mnist, CIFAR10
from torchvision import transforms

In [3]:
import torch.nn.functional as F

In [4]:
import torch
from torch import nn
from torch.autograd import Variable
from torch.distributions import Categorical
from torch.utils.data import DataLoader

In [5]:
from tqdm.notebook import tqdm_notebook

In [6]:
import matplotlib.pyplot as plt
import seaborn as sns

In [7]:
from itertools import chain
import pandas as pd

In [8]:
from torch.utils.tensorboard import SummaryWriter
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [9]:
class CustomTargetTransform:
    def __init__(self, num_classes=10):
        self.num_classes = num_classes

    def __call__(self, target):
        new_target = torch.zeros(self.num_classes, dtype=torch.float, device=device)
        new_target[target] = 1
        return new_target

transform = transforms.Compose([
    transforms.PILToTensor(),
    transforms.Lambda(lambda x: x.float().to(device))
])

# data_loader = DataLoader(dataset, batch_size=800, shuffle=True)

In [10]:
# dataset = mnist.FashionMNIST("data", download=True, train=True, transform=transform, target_transform=CustomTargetTransform())
# dataset_target = mnist.FashionMNIST("data", download=True, train=False, transform=transforms.PILToTensor())
dataset = CIFAR10("data", download=True, train=True, transform=transform, target_transform=CustomTargetTransform())
dataset_target = CIFAR10("data", download=True, train=False, transform=transforms.PILToTensor())

Files already downloaded and verified
Files already downloaded and verified


In [11]:
target_data = torch.tensor(dataset_target.data).swapaxes(3, 1).float().to(device)
# target_data = torch.tensor(dataset_target.data).unsqueeze(1).float().to(device)
target_labels = torch.tensor(dataset_target.targets).float().to(device)
target_data.shape, target_labels.shape

(torch.Size([10000, 3, 32, 32]), torch.Size([10000]))

In [12]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, batch_norm1, batch_norm2, inter_channels, kernel_size1, kernel_size2, stride1, stride2, padding1=1, padding2=1, in_size=28):
        super().__init__()
        block = [
            nn.Conv2d(in_channels, inter_channels, kernel_size1, stride1, padding1),
        ]
        if batch_norm1:
            block.append(nn.BatchNorm2d(inter_channels))
        block.append(nn.ReLU())
        block.append(nn.Conv2d(inter_channels, in_channels, kernel_size2, stride2, padding2))
        if batch_norm2:
            block.append(nn.BatchNorm2d(in_channels))
        self.block = nn.Sequential(
            *block
        )
    
    def get_out_size(self, in_size):
        for layer in (self.block[0], self.block[-1] if type(self.block[-1]) == nn.Conv2d else self.block[-2]):
            in_size = (in_size - layer.kernel_size[0] + 2 * layer.padding[0]) // layer.stride[0] + 1
        return in_size

    def forward(self, x):
        return F.relu(x + self.block(x))


In [13]:
def get_name(model: nn.Module) -> list[nn.Module]:
    skip = 1
    names = []
    for m in model.modules():
        if skip > 0:
            skip -= 1
            continue
        names.append(m)
        if len(tuple(m.modules())) > 1:
            skip = len(tuple(m.modules())) - 1
    return names

In [14]:
def create_model(
        img_size,
        pool_kernel_size,
        pool_stride,
        *layers,
        linears: list[int],
        lr=0.01,
        batch_size=800,
        epoch_count=10,
        softmax=True,
        in_channels=1,
        maxpool=True,
        avgpool=False,
    ) -> tuple[nn.Sequential, nn.CrossEntropyLoss, torch.optim.SGD, SummaryWriter]:
    torch.manual_seed(0)
    blocks = []
    outs = img_size
    for layer in layers:
        blocks.append(
            layer
        )
        tpe = type(layer)
        if tpe == ResBlock:
            outs = layer.get_out_size(outs)
        elif tpe == nn.Conv2d:
            outs = (outs - layer.kernel_size[0] + 2 * layer.padding[0]) // layer.stride[0] + 1
            in_channels = layer.out_channels
        elif tpe == nn.MaxPool2d:
            outs = (outs - layer.kernel_size) // layer.stride + 1
    if maxpool:
        outs = (outs - pool_kernel_size) // pool_stride + 1
        if avgpool:
            blocks.append(nn.AvgPool2d(kernel_size=pool_kernel_size, stride=pool_stride))
        else:
            blocks.append(nn.MaxPool2d(kernel_size=pool_kernel_size, stride=pool_stride))
    outs = outs * outs * in_channels
    blocks.append(nn.Flatten(1))
    for layer in linears:
        blocks.append(nn.Linear(outs, layer))
        blocks.append(nn.ReLU())
        outs = layer
    blocks.append(nn.Linear(outs, 10))
    if softmax:
        blocks.append(nn.Softmax())
    model = nn.Sequential(
        *blocks,
    ).to(device)
    er_f = nn.CrossEntropyLoss()
    optim = torch.optim.AdamW(model.parameters(), lr=lr)
    # name = "_".join(m._get_name() for m in model.modules())
    # name = f"_{len(blocks_out_channels)}_{lr}_{blocks_kernel_size}_{blocks_stride}_{pool_kernel_size}_{pool_stride}"
    name = "_".join(m._get_name() for m in get_name(model))
    print(name)
    writer = SummaryWriter(comment=name)
    return model, er_f, optim, writer, name, batch_size, epoch_count


In [15]:
def write_summary(writer: SummaryWriter, model: nn.Module):
    param_dict = {f"layer_{i}": str(m) for m, i in enumerate(get_name(model))}
    param_table = pd.DataFrame(param_dict, index=[0])
    predicted = model(target_data)
    predicted_labels = predicted.max(1).indices
    confusion_matrix = torch.zeros(10, 10, device=device)
    for i in range(len(predicted_labels)):
        confusion_matrix[predicted_labels[i].long()][target_labels[i].long()] += 1
    confusion_matrix = confusion_matrix.cpu()
    fig = plt.gcf()
    fig.clear()
    ax = fig.add_subplot(111)
    hist = predicted_labels[predicted_labels == target_labels]
    error_hist = target_labels[predicted_labels != target_labels]
    percent: torch.tensor = ((predicted_labels == target_labels).sum() / len(target_labels))
    sns.heatmap(confusion_matrix, annot=True, fmt="g", ax=ax)
    writer.add_figure("confusion_matrix", fig)
    writer.add_scalar("accuracy", percent)
    fig.clear()
    ax = fig.add_subplot(111)
    sns.histplot(hist.cpu(), stat="count", discrete=True, bins=range(10), ax=ax)
    writer.add_figure("right hist", fig)
    fig.clear()
    ax = fig.add_subplot(111)
    sns.histplot(error_hist.cpu(), stat="count", discrete=True, bins=range(10), ax=ax)
    writer.add_figure("error hist", fig)
    writer.add_text("param_table", param_table.to_markdown())
    writer.add_text("accuracy", str(percent.item()))

In [16]:
models = (
    # create_model(
    #     32,
    #     2,
    #     2,
    #     nn.Conv2d(3, 6, 5),
    #     nn.MaxPool2d(2, 2),
    #     nn.Conv2d(6, 16, 5),
    #     linears=[],
    #     lr=0.0001,
    #     batch_size=30,
    #     epoch_count=1,
    # ), # 81 - 1000
    # create_model(
    #     32,
    #     2,
    #     2,
    #     nn.Conv2d(3, 6, 5),
    #     nn.ReLU(),
    #     ResBlock(6, True, True, 10, 3, 3, 1, 1, in_size=32),
    #     nn.MaxPool2d(2, 2),
    #     nn.Conv2d(6, 16, 5),
    #     nn.ReLU(),
    #     ResBlock(16, True, True, 20, 3, 3, 1, 1, in_size=32),
    #     nn.MaxPool2d(2, 2),
    #     linears=[64, 32],
    #     lr=0.001,
    #     batch_size=4,
    #     epoch_count=1,
    #     maxpool=False,
    #     softmax=False,
    # ), # 81 - 1000
    # create_model(
    #     32,
    #     2,
    #     2,
    #     ResBlock(3, True, True, 10, 3, 3, 1, 1, in_size=32),
    #     nn.MaxPool2d(2, 2),
    #     nn.ReLU(),
    #     nn.Conv2d(3, 10, 3, 3),
    #     ResBlock(10, True, True, 20, 3, 3, 1, 1, in_size=32),
    #     nn.ReLU(),
    #     linears=[64, 32],
    #     lr=0.001,
    #     batch_size=20,
    #     epoch_count=1,
    #     maxpool=True,
    #     softmax=True,
    # ), # 81 - 1000
    create_model(
        32,
        2,
        2,
        nn.Conv2d(3, 64, 2, 2),
        nn.MaxPool2d(2, 2),
        nn.ReLU(),
        ResBlock(64, True, True, 64, 3, 3, 1, 1, in_size=32),
        nn.ReLU(),
        ResBlock(64, True, True, 64, 3, 3, 1, 1, in_size=32),
        nn.ReLU(),
        ResBlock(64, True, True, 64, 3, 3, 1, 1, in_size=32),
        nn.ReLU(),
        nn.Conv2d(64, 128, 2, 2),
        nn.ReLU(),
        ResBlock(128, True, True, 128, 3, 3, 1, 1, in_size=32),
        nn.ReLU(),
        ResBlock(128, True, True, 128, 3, 3, 1, 1, in_size=32),
        nn.ReLU(),
        ResBlock(128, True, True, 128, 3, 3, 1, 1, in_size=32),
        nn.ReLU(),
        linears=[],
        lr=0.001,
        batch_size=20,
        epoch_count=1,
        maxpool=True,
        avgpool=True,
        softmax=False,
    ), # 81 - 1000
)

Conv2d_MaxPool2d_ReLU_ResBlock_ReLU_ResBlock_ReLU_ResBlock_ReLU_Conv2d_ReLU_ResBlock_ReLU_ResBlock_ReLU_ResBlock_ReLU_AvgPool2d_Flatten_Linear


In [17]:
try:
    for model, er_f, optim, writer, name, batch_count, epoch_count in tqdm_notebook(models):
        #  = create_model(28, *param)
        torch.manual_seed(0)
        data_loader = DataLoader(dataset, batch_size=batch_count, shuffle=True)
        total_epochs = 0
        for epoch in tqdm_notebook(range(epoch_count)):
            for image, target in tqdm_notebook(data_loader, leave=False):
                optim.zero_grad()
                outs = model(image)
                loss = er_f(outs, target)
                loss.backward()
                optim.step()
                percent: torch.tensor = ((outs.max(1).indices == target.max(1).indices).sum() / len(target))
                writer.add_scalar("loss", loss, total_epochs)
                writer.add_scalar("train_accuracy", percent, total_epochs)
                predicted = model(target_data)
                predicted_labels = predicted.max(1).indices
                percent: torch.tensor = ((predicted_labels == target_labels).sum() / len(target_labels))
                writer.add_scalar("test_accuracy", percent, total_epochs)
                total_epochs += 1
finally:
    write_summary(writer, model)
    torch.save(model, f"model_{repr(dataset.__class__).split('.')[-1][:-2]}_{percent:.2f}.pth")

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/2500 [00:00<?, ?it/s]

OutOfMemoryError: HIP out of memory. Tried to allocate 118.00 MiB. GPU 0 has a total capacity of 7.98 GiB of which 0 bytes is free. Of the allocated memory 7.48 GiB is allocated by PyTorch, and 89.69 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_HIP_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [76]:
optim = torch.optim.AdamW(model.parameters(), lr=0.0001)
data_loader = DataLoader(dataset, batch_size=100, shuffle=True)
epoches = 1

In [64]:
optim = torch.optim.AdamW(model.parameters(), lr=0.0001)
data_loader = DataLoader(dataset, batch_size=1000, shuffle=True)
epoches = 10

In [77]:
try:
    for epoch in tqdm_notebook(range(epoches)):
        for image, target in tqdm_notebook(data_loader, leave=False):
            optim.zero_grad()
            outs = model(image)
            loss = er_f(outs, target)
            loss.backward()
            optim.step()
            percent: torch.tensor = ((outs.max(1).indices == target.max(1).indices).sum() / len(target))
            writer.add_scalar("loss", loss, total_epochs)
            writer.add_scalar("train_accuracy", percent, total_epochs)
            predicted = model(target_data)
            predicted_labels = predicted.max(1).indices
            percent: torch.tensor = ((predicted_labels == target_labels).sum() / len(target_labels))
            writer.add_scalar("test_accuracy", percent, total_epochs)
            total_epochs += 1
finally:
    write_summary(writer, model)
    torch.save(model, f"model_{repr(dataset.__class__).split('.')[-1][:-2]}_{percent:.2f}.pth")

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/500 [00:00<?, ?it/s]

  return self._call_impl(*args, **kwargs)
  return self._call_impl(*args, **kwargs)


In [None]:
torch.load(model, f"model_{repr(dataset.__class__).split('.')[-1][:-2]}_{name:.2f}.pth")