In [18]:
import cv2
import numpy as np
from typing import Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
import time
from torchvision.transforms import Compose, ToTensor, Normalize, RandomHorizontalFlip, ColorJitter

In [10]:
def imshow(img: torch.Tensor) -> None:
    npimg = img.detach().cpu().numpy()
    plt.axis('off')
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


## Image classification

Dataset: [CIFAR10](https://www.cs.toronto.edu/~kriz/cifar.html) - 10 classes, color images 32x32 px

In [14]:
# CIFAR10 Classes
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [35]:
transform=transforms.Compose([
                              RandomHorizontalFlip(p=0.5),
                              ColorJitter(brightness=.5),
                              ToTensor(),
                              Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                              ])

def get_loaders(batch_size: int=128,
                num_workers: int=2,
                transform: transforms.Compose=transform) -> Tuple[DataLoader, DataLoader]:
    train = datasets.CIFAR10('../data', train=True, download=True, transform=transform)
    test = datasets.CIFAR10('../data', train=False, download=True, transform=transform)
    torch.manual_seed(123)  # To ensure the same sampling during each experiment
    
    train_loader = torch.utils.data.DataLoader(train, batch_size=batch_size, num_workers=num_workers, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test, batch_size=batch_size, num_workers=num_workers, shuffle=False)
    return train_loader, test_loader

In [None]:
train_loader, test_loader = get_loaders(8)
images, labels = iter(train_loader).next()
print("Tensor shape: {} BatchxCxHxW".format(images.shape))
labels = labels.numpy()
for i in range(images.shape[0]):
    print(classes[labels[i]])
    img = images[i, ...].numpy()
    img = np.moveaxis(img, 0, 2)
    plt.imshow(img)
    plt.show()

## Model

### Conv_Net

In [29]:
class Conv_Net(nn.Module): 
    def __init__(self,
                 input_channels: int = 3,
                 lin_input: int = 1440,
                 lin_output: int = 10) -> None:
        super(Conv_Net, self).__init__()
        self.conv = nn.Sequential(
              self.simple_conv_block(input_channels, input_channels*30),  # [batch, 3, 32, 32] -> [batch, 90, 16, 16]
              self.simple_conv_block(input_channels*30, input_channels*60, final_layer = False),  # [batch, 90, 16, 16] -> [batch, 180, 8, 8]
              self.simple_conv_block(input_channels*60, input_channels*120, final_layer = False),  # [batch, 180, 8, 8] -> [batch, 360, 4, 4]
              self.simple_conv_block(input_channels*120, input_channels*240, final_layer = False),  # [batch, 360, 4, 4] -> [batch, 720, 2, 2]
              self.simple_conv_block(input_channels*240, input_channels*480, final_layer = True)  # [batch, 720, 2, 2] -> [batch, 1440, 1, 1]
              )
        self.lin = nn.Sequential(
            self.simple_lin_block(lin_input, lin_output),  # [batch, 1440] -> [batch, 10]
            )


    def simple_conv_block(self,
                          input_channels: int = 3,
                          output_channels: int = 3,
                          kernel_size: int = 2,
                          padding: int = 0,
                          final_layer: bool = False) -> nn.Sequential:
        if not final_layer:
            return nn.Sequential(
                nn.Conv2d(in_channels=input_channels,
                          out_channels=output_channels,
                          kernel_size=kernel_size,
                          padding=padding),
                nn.BatchNorm2d(output_channels),
                nn.MaxPool2d(kernel_size=2, stride=2, padding=1),
                nn.Dropout(p=0.2),
                nn.LeakyReLU(0.2)
            )
        else:  # Final Layer
            return nn.Sequential(
                nn.Conv2d(in_channels=input_channels,
                          out_channels=output_channels,
                          kernel_size=kernel_size,
                          padding=padding)
            )
  
    def simple_lin_block(self, input: int, output: int) -> nn.Sequential:
        return nn.Sequential(
            nn.Linear(input, output),
            nn.LeakyReLU(0.2)
        )


    def forward(self, image: torch.tensor) -> torch.tensor:
        x = self.conv(image)
        x = torch.flatten(x, start_dim = 1)  # [batch, 1440, 1, 1] -> [batch, 1440]
        x = self.lin(x)
        return F.log_softmax(x, dim = 1)

### Accuracy, train, test

In [None]:
def accuracy(pred: torch.Tensor, ground: torch.Tensor) -> float:
    pred_class = pred.argmax(dim=1)
    return sum(pred_class==ground).item()

def train(model: nn.Sequential,
          device: torch.device,
          train_loader,#: dataloader.DataLoader,
          scheduler: torch.sheduler) -> None:
    model.train()
    for data, target in train_loader:
        data = data.to(device)
        target = target.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = F.nll_loss(out, target)
        loss.backward()
        scheduler.step()


def test(model: nn.Sequential,
         device: torch.device,
         test_loader: DataLoader) -> Tuple[float, float]:
    model.eval()
    test_loss = 0
    acc = 0
    with torch.no_grad():
        for data, target in test_loader:
            data = data.to(device)
            target = target.to(device)
            out = model(data)
            acc += accuracy(out, target)
            loss = F.nll_loss(out, target, reduction='sum')
            test_loss += loss.item()
    return (test_loss / len(test_loader.dataset),
            acc / len(test_loader.dataset))

### Training

In [None]:
epochs = 100
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

batch_size = 128
lr = 1e-3
eps = 5e-3


train_loader, test_loader = get_loaders(batch_size=batch_size, transform=transform, num_workers=2)
model = Conv_Net().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

t0 = time.time()

acc_list = []
test_loss_list = []
for epoch in range(epochs):
    train(model, device, train_loader, optimizer)
    test_loss, acc = test(model, device, test_loader)
    acc_list.append(acc)
    test_loss_list.append(test_loss)
    t1 = (time.time() - t0) / 60
    print('Epoch: {}, test loss: {:.3f}, accuracy: {:.3f}, time: {:.2f} min'.format(epoch+1, test_loss, acc, t1))
    if epoch > 2:
        if max(acc_list[-5:]) - min(acc_list[-5:]) > eps:
            continue
        else:
            break
print('Finish!')


In [None]:
plt.plot(range(1, len(acc_list)+1), acc_list, label = 'acccuracy')
plt.legend()
plt.show()
plt.plot(range(1, len(test_loss_list)+1), test_loss_list, label = 'loss')
plt.legend()
plt.show()