<a href="https://colab.research.google.com/github/GrigoryBartosh/hse08_ip/blob/master/hw3_neural_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
cd 'drive/My Drive/ip_hw_3'

/content/drive/My Drive/ip_hw_3


In [0]:
! pip install wandb >> /dev/null

In [0]:
import os
import copy
import zipfile
import pandas as pd

import numpy as np
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
from torchvision import models
from torchvision import transforms

from PIL import Image

import wandb

from tqdm.auto import tqdm, trange
import matplotlib.pyplot as plt
import matplotlib.cm as cm

PATH_DATA = 'tl-signs-hse-itmo-2020-winter.zip'

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 256

In [5]:
xs, ys = [], []
with zipfile.ZipFile(PATH_DATA, 'r') as zip_file:
    with zip_file.open('train.csv') as file:
        train_data = pd.read_csv(file)

    for _, x in tqdm(train_data.iterrows()):
        name, lable = x['filename'], x['class_number']
        with zip_file.open(os.path.join('train', 'train', name)) as img_file:
            image = Image.open(img_file).convert('RGB')
        xs += [image]
        ys += [lable]

num_classes = max(ys) + 1

x_train, x_val, y_train, y_val = train_test_split(xs, ys, test_size=0.45)

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))




In [0]:
class SiamDataset(data.Dataset):
    def __init__(self, xs, ys, do_aug=False):
        self.xs = xs
        self.ys = ys
        self.do_aug = do_aug

    def aug(self, x):
        PF, PT = 0.2, 0.6

        x = x.copy()
        w, h, _ = x.shape
        dx = np.random.randint(int(w * (PT - PF))) + int(w * PF)
        dy = np.random.randint(int(h * (PT - PF))) + int(h * PF)
        x1 = np.random.randint(w - dx)
        y1 = np.random.randint(h - dy)
        x2, y2 = x1 + dx, y1 + dy
        x[x1:x2, y1:y2] = 0

        return x

    def __getitem__(self, index):
        x = self.xs[index]
        y = self.ys[index]

        x = np.array(x)
        x = self.aug(x) if self.do_aug else x
        x = torch.FloatTensor(x)

        y = torch.LongTensor([y])

        return x, y

    def __len__(self):
        return len(self.xs)

In [0]:
def collate_xs(xs):
    xs = torch.stack(xs, axis=0)
    xs = xs * 2 / 255 - 1
    xs = xs.permute(0, 3, 1, 2)

    return xs

def collate_fn(data):
    xs, ys = zip(*data)
    
    xs = collate_xs(xs)
    ys = torch.cat(ys)

    return xs, ys

train_dataset = SiamDataset(x_train, y_train, do_aug=True)
val_dataset = SiamDataset(x_val, y_val, do_aug=False)

train_data_loader = data.DataLoader(
    dataset=train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=8,
    collate_fn=collate_fn
)
val_data_loader = data.DataLoader(
    dataset=val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=8,
    collate_fn=collate_fn
)

In [0]:
def conv3x3(in_planes, out_planes, stride=1, padding=1):
    return nn.Conv2d(in_planes, out_planes, 3, 
                     stride, padding, bias=False)
    

def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, 1, 
                     stride, padding=0, bias=False)


class SimpleBlock(nn.Module):
    def __init__(self, in_planes, out_planes=None, stride=1):
        super(SimpleBlock, self).__init__()

        if out_planes is None:
            out_planes = in_planes

        self.activ = nn.ReLU()

        self.conv1 = conv3x3(in_planes, out_planes, stride)
        self.ln1 = nn.BatchNorm2d(out_planes)

    def forward(self, x):
        out = self.conv1(x)
        out = self.ln1(out)
        out = self.activ(out)

        return out


class ResBasicBlock(nn.Module):
    def __init__(self, in_planes, out_planes=None, stride=1):
        super(ResBasicBlock, self).__init__()

        planes = in_planes

        if out_planes is None:
            out_planes = planes

        self.residual_conv = None
        if stride != 1 or in_planes != out_planes:
            self.residual_conv = conv1x1(in_planes, out_planes, stride)
            self.residual_ln = nn.BatchNorm2d(out_planes)

        self.activ = nn.ReLU()

        self.conv1 = conv3x3(in_planes, planes, stride)
        self.ln1 = nn.BatchNorm2d(planes)
        self.conv2 = conv3x3(planes, out_planes)
        self.ln2 = nn.BatchNorm2d(out_planes)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.ln1(out)
        out = self.activ(out)

        out = self.conv2(out)
        out = self.ln2(out)

        if self.residual_conv is not None:
            identity = self.residual_conv(identity)
            identity = self.residual_ln(identity)

        out += identity
        out = self.activ(out)

        return out


class MyNet(nn.Module):
    def __init__(self, num_classes, in_dim=32):
        super(MyNet, self).__init__()

        self.embed_dim = in_dim * 2 ** 4

        self.model = [SimpleBlock(3, in_dim, 2)]
        for _ in range(3):
            self.model += [ResBasicBlock(in_dim, in_dim * 2, 2),
                           ResBasicBlock(in_dim * 2, in_dim * 2)]
            in_dim = in_dim * 2
        self.model += [conv3x3(in_dim, self.embed_dim, padding=0)]
        self.model = nn.Sequential(*self.model)

        self.linear = nn.Linear(self.embed_dim, num_classes)
                 
    def forward(self, x):
        x = self.model(x)
        x = x.squeeze(dim=2)
        x = x.squeeze(dim=2)
        x = self.linear(x)
        return x


class MyResNet(nn.Module):
    def __init__(self, num_classes, model='resnet18', pretrained=False):
        super(MyResNet, self).__init__()
                 
        if model == 'resnet18':
            model_type = models.resnet18
        elif model == 'resnet34':
            model_type = models.resnet34
        elif model == 'resnet50':
            model_type = models.resnet50
        elif model == 'resnet101':
            model_type = models.resnet101
        elif model == 'resnet152':
            model_type = models.resnet152

        self.model = model_type(pretrained=pretrained)
        
        if pretrained:
            for parma in self.model.parameters():
                parma.requires_grad = False

        num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Linear(num_ftrs, num_classes)
                 
    def forward(self, x):
        return self.model(x)


class MyVGG(nn.Module):
    def __init__(self, num_classes, model='vgg11', pretrained=False):
        super(MyVGG, self).__init__()
                 
        if model == 'vgg11':
            model_type = models.vgg11_bn
        elif model == 'vgg16':
            model_type = models.vgg16_bn
        elif model == 'vgg19':
            model_type = models.vgg19_bn

        self.model = model_type(pretrained=pretrained)
        
        if pretrained:
            for parma in self.model.parameters():
                parma.requires_grad = False

        num_ftrs = self.model.classifier[6].in_features
        self.model.classifier[6] = nn.Linear(num_ftrs, num_classes)
                 
    def forward(self, x):
        return self.model(x)


class MyDense(nn.Module):
    def __init__(self, num_classes, model='densenet121', pretrained=False):
        super(MyDense, self).__init__()
                 
        if model == 'densenet121':
            model_type = models.densenet121
        elif model == 'densenet161':
            model_type = models.densenet161
        elif model == 'densenet169':
            model_type = models.densenet169
        elif model == 'densenet201':
            model_type = models.densenet201

        self.model = model_type(pretrained=pretrained)
        
        if pretrained:
            for parma in self.model.parameters():
                parma.requires_grad = False

        num_ftrs = self.model.classifier.in_features
        self.model.classifier = nn.Linear(num_ftrs, num_classes)
                 
    def forward(self, x):
        return self.model(x)


class AnsambleModel(nn.Module):
    def __init__(self, num_classes):
        super(AnsambleModel, self).__init__()

        self.models = nn.ModuleList([
            MyNet(num_classes, in_dim=32),
            MyResNet(num_classes, model='resnet34', pretrained=True),
            MyResNet(num_classes, model='resnet50', pretrained=True),
            #MyVGG(num_classes, model='vgg16', pretrained=True),
            MyDense(num_classes, model='densenet121', pretrained=True),
            MyDense(num_classes, model='densenet161', pretrained=True)
        ])

    def forward(self, x):
        x = [m(x) for m in self.models]
        x = torch.stack(x, axis=0)
        return x

In [0]:
class AnsableCEL(nn.Module):
    def __init__(self, margin=1.0, p=2, part=1.0):
        super(AnsableCEL, self).__init__()

        self.criterion = nn.CrossEntropyLoss()

    def forward(self, xs, ys):
        losses = []
        for x in xs:
            losses += [self.criterion(x, ys)]
        losses = torch.stack(losses)
        loss = losses.sum()
        return loss

In [0]:
def train(model, criterion, optimizer, scheduler, epochs):
    wandb.init(project="hse08_ip_hw_3")

    best_model = [copy.deepcopy(m) for m in model.models]
    best_accuracy = [0 for _ in model.models]
    for _ in trange(epochs):
        for xs, ys in train_data_loader:
            xs = xs.to(device)
            ys = ys.to(device)
            
            optimizer.zero_grad()

            outputs = model(xs)
            loss = criterion(outputs, ys)

            wandb.log({'Train loss': loss.item()})

            loss.backward()
            optimizer.step()

        losses = []
        accuracy = [(0, 0) for _ in model.models]
        model.eval()
        with torch.no_grad():
            for xs, ys in val_data_loader:
                xs = xs.to(device)
                ys = ys.to(device)

                outputs = model(xs)
                loss = criterion(outputs, ys)
                for i, out in enumerate(outputs):
                    true_detections = (ys == out.argmax(axis=1)).sum().item()
                    accuracy[i] = (accuracy[i][0] + true_detections,
                                   accuracy[i][1] + ys.shape[0])
                
                losses += [loss.item()]
            
        model.train()

        loss = np.array(losses).mean()
        wandb.log({'Val loss': loss})
        for i, (acs, ss) in enumerate(accuracy):
            ac = acs / ss
            wandb.log({f'Accuracy {i + 1}': ac})
            
            if best_accuracy[i] < ac:
                best_accuracy[i] = ac
                best_model[i] = copy.deepcopy(model.models[i])
        
        if scheduler:
            scheduler.step()

    for i, m in enumerate(best_model):
        model.models[i] = m

    return model

In [0]:
def get_scores_for_one(model, xs):
    scores = []
    with torch.no_grad():
        for i in range(0, len(xs), BATCH_SIZE):
            j = min(i + BATCH_SIZE, len(xs))
            batch_xs = xs[i:j]

            batch_xs = [torch.FloatTensor(np.array(x)) for x in batch_xs]
            batch_xs = collate_xs(batch_xs)
            batch_xs = batch_xs.to(device)

            batch_scores = model(batch_xs).reshape(j - i, -1)
            batch_scores = F.softmax(batch_scores, dim=-1)
            batch_scores = batch_scores.cpu().numpy()
            
            scores += [batch_scores]

    scores = np.concatenate(scores, axis=0)

    return scores

def get_scores(model, xs):
    scores = [get_scores_for_one(m, xs) for m in model.models]
    scores = np.stack(scores, axis=1)
    return scores

In [0]:
def get_predictions(scores, f):
    if f == 'mean':
        ys = scores.mean(axis=1).argmax(axis=1)
    elif f == 'vote':
        ys = scores.argmax(axis=2)
        ys = [np.bincount(y).argmax() for y in ys]
        ys = np.stack(ys, axis=0)

    return ys

In [0]:
def get_nn_accuracy(scores, ys, f):
    ys_predicted = get_predictions(scores, f)
    return (ys_predicted == ys).sum() / len(ys)

In [0]:
nn_model = AnsambleModel(num_classes=67)
nn_model.to(device)

criterion = AnsableCEL()

In [15]:
LR = 0.001
EPOCHS = 4

optimizer = optim.Adam(filter(lambda p: p.requires_grad, nn_model.parameters()), 
                       LR)

nn_model.train()
nn_model = train(nn_model, criterion, optimizer, None, EPOCHS)

HBox(children=(IntProgress(value=0, max=4), HTML(value='')))




In [0]:
LR = 0.0001
EPOCHS = 40

for parma in nn_model.parameters():
    parma.requires_grad = True

optimizer = optim.Adam(filter(lambda p: p.requires_grad, nn_model.parameters()), 
                       LR)

nn_model.train()
nn_model = train(nn_model, criterion, optimizer, None, EPOCHS)

HBox(children=(IntProgress(value=0, max=40), HTML(value='')))

In [0]:
nn_model.eval()

train_scores = get_scores(nn_model, x_train)
val_scores = get_scores(nn_model, x_val)

train_nn_accuracy = get_nn_accuracy(train_scores, y_train, f='vote')
val_nn_accuracy = get_nn_accuracy(val_scores, y_val, f='vote')

print(f'Train NN accuracy vote {train_nn_accuracy}')
print(f'Val NN accuracy vote {val_nn_accuracy}')

train_nn_accuracy = get_nn_accuracy(train_scores, y_train, f='mean')
val_nn_accuracy = get_nn_accuracy(val_scores, y_val, f='mean')

print(f'Train NN accuracy mean {train_nn_accuracy}')
print(f'Val NN accuracy mean {val_nn_accuracy}')

In [0]:
images = []
with zipfile.ZipFile(PATH_DATA, 'r') as zip_file:
    image_names = [f[10:] for f in zip_file.namelist() if 'test' in f]

    for name in image_names:
        with zip_file.open(os.path.join('test', 'test', name)) as img_file:
            image = Image.open(img_file).convert('RGB')
        images += [(name, image)]

predictions = {'filename': [], 'class_number': []}
with torch.no_grad():
    for i in range(0, len(images), BATCH_SIZE):
        j = min(i + BATCH_SIZE, len(images))
        batch = images[i:j]
        _, xs = zip(*batch)
        scores = get_scores(nn_model, xs)
        ys = get_predictions(scores, f='mean')

        for (name, _), y in zip(images[i:j], ys):
            predictions['filename'] += [name]
            predictions['class_number'] += [y]

predictions = pd.DataFrame(predictions)
predictions.to_csv('test_neural.csv', index=False)