# Script to train embeddings using neural network

In [None]:
import os

import numpy as np
from scipy.constants import pi as pi
from scipy.special import binom as binom
import pandas as pd
import cv2
import torch
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import DataLoader

import pytorch_utils as utils

In [None]:
device = utils.get_torch_device()

In [None]:
train_dir = "team_color_dataset_splitted/train/"
test_dir = "team_color_dataset_splitted/test/"

In [None]:
num_classes = len(os.listdir(train_dir))

In [None]:
class LSoftmaxLinear(nn.Module):
    def __init__(self, in_channels, out_channels, margin = 4):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.margin = margin

        self.weight = nn.Parameter(torch.FloatTensor(in_channels, out_channels))

        self.divisor = pi / (self.margin + 1e-5)
        self.coeffs = binom(margin, range(0, margin + 1, 2))
        self.cos_exps = range(self.margin, -1, -2)
        self.sin_sq_exps = range(len(self.cos_exps))
        self.signs = [1]
        for i in range(1, len(self.sin_sq_exps)):
            self.signs.append(self.signs[-1] * -1)

    def reset_parameters(self):
        nn.init.kaiming_normal_(self.weight.data.t())

    def find_k(self, cos):
        acos = cos.acos()
        k = (acos / self.divisor).floor().detach()
        return k

    def forward(self, x, y=None):
        if not self.training:
            assert y is None
            return x.matmul(self.weight)
        assert y is not None
        logit = x.matmul(self.weight)
        batch_size = logit.size(0)
        logit_target = logit[range(batch_size), y]
        weight_target_norm = self.weight[:, y].norm(p=2, dim=0)
        input_norm = x.norm(p=2, dim=1)
        # norm_target_prod: (batch_size,)
        norm_target_prod = weight_target_norm * input_norm
        # cos_target: (batch_size,)
        cos_target = logit_target / (norm_target_prod + 1e-10)
        sin_sq_target = 1 - cos_target**2

        # coeffs, cos_powers, sin_sq_powers, signs: (self.margin // 2 + 1,)
        coeffs = torch.tensor(x.data.new(self.coeffs))
        cos_exps = torch.tensor(x.data.new(self.cos_exps))
        sin_sq_exps = torch.tensor(x.data.new(self.sin_sq_exps))
        signs = torch.tensor(x.data.new(self.signs))

        cos_terms = cos_target.unsqueeze(1)**cos_exps.unsqueeze(0)
        sin_sq_terms = sin_sq_target.unsqueeze(1)**sin_sq_exps.unsqueeze(0)

        cosm_terms = signs.unsqueeze(0) * coeffs.unsqueeze(0) * cos_terms * sin_sq_terms
        cosm = cosm_terms.sum(1)
        k = self.find_k(cos_target)

        ls_target = norm_target_prod * (((-1)**k * cosm) - 2 * k)
        logit[range(batch_size), y] = ls_target
        return logit

In [None]:
class LargeMarginCosineLoss(nn.Module):
    # TODO: Move parameters to config
    def __init__(self, in_channels, out_channels, input_norm=30.0, margin=0.5):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.input_norm = input_norm  # norm of input feature
        self.margin = margin  # should be in (0; 1) range
        self.weight = Parameter(torch.FloatTensor(out_channels, in_channels))
        nn.init.xavier_uniform_(self.weight)

    def forward(self, x, y=None):
        if not self.training:
            assert y is None
            return F.linear(F.normalize(x), self.weight)
        assert y is not None
        # init cos(theta), phi(theta) was uncovered and included in future transformations
        cosine = F.linear(F.normalize(x), F.normalize(self.weight))
        # convert y to one-hot
        one_hot = torch.tensor(cosine).zero_()
        one_hot.scatter_(1, y.view(-1, 1), self.margin)
        output = (cosine - one_hot) * self.input_norm
        return output

In [None]:
class LargeMarginArcLoss(nn.Module):
    # TODO: Move parameters to config
    def __init__(self, in_channels, out_channels, input_norm=30.0, margin=0.5, easy_margin=False):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.input_norm = input_norm  # norm of input feature
        self.margin = margin  # should be in (0; 1) range
        self.weight = Parameter(torch.FloatTensor(out_channels, in_channels))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = np.cos(margin)
        self.sin_m = np.sin(margin)

    def forward(self, x, y=None):
        if not self.training:
            assert y is None
            return F.linear(F.normalize(x), self.weight)
        assert y is not None
        # init cos(theta), sin(theta), phi(theta)
        cosine = F.linear(F.normalize(x), F.normalize(self.weight))
        sine = torch.sqrt(1.0 - torch.pow(cosine, 2))
        phi = cosine * self.cos_m - sine * self.sin_m
        if not self.easy_margin:
            phi = torch.where(cosine > -self.cos_m, phi, cosine - self.margin * self.sin_m)
        else:
            phi = torch.where(cosine > 0, phi, cosine)
        one_hot = torch.tensor(cosine).zero_()
        one_hot.scatter_(1, y.view(-1, 1), 1)
        output = one_hot * phi + (1.0 - one_hot) * cosine
        return output * self.input_norm

In [None]:
class Flatten(nn.Module):
    def forward(self, x):
        # size: batch_size, channels, height, width
        return x.view(x.size(0), -1)


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.arch()
        self.to(device, non_blocking=True, dtype=torch.float32)

    def arch(self):
        kernel_size = (3, 3)
        same = utils.same_padding(kernel_size)
        valid = utils.valid_padding(kernel_size)

        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=kernel_size, padding=same),
            nn.PReLU(16),
            nn.BatchNorm2d(16, affine=False))

        self.conv_block2 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=kernel_size, padding=valid),
            nn.PReLU(16),
            nn.BatchNorm2d(16, affine=False),
            nn.MaxPool2d((2, 2), stride=2))

        self.conv_block3 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=kernel_size, padding=same),
            nn.PReLU(32),
            nn.BatchNorm2d(32, affine=False))
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(32, 32, kernel_size=kernel_size, padding=valid),
            nn.PReLU(32),
            nn.BatchNorm2d(32, affine=False),
            nn.MaxPool2d((2, 2), stride=2))

        self.embedding = nn.Sequential(
            Flatten(),
            nn.Linear(32 * 8 * 4, 64),
            nn.BatchNorm1d(64, affine=False))
        
        self.logit = nn.Linear(64, num_classes)
        
        self.cosine_margin = LargeMarginCosineLoss(in_channels=64, out_channels=num_classes)
        self.arc_margin = LargeMarginArcLoss(in_channels=64, out_channels=num_classes)
        self.large_margin = LSoftmaxLinear(in_channels=64, out_channels=num_classes)
        
    @torch.no_grad()
    def predict(self, x):
        x = torch.from_numpy(x).permute(0, 3, 1, 2).float().div(255)
        x = torch.tensor(x, dtype=torch.float).to(device, non_blocking=True, dtype=torch.float32)
        return self(x)

    def forward(self, x, y=None):
        out = self.conv_block1(x)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = self.conv_block4(out)
        embeddings = self.embedding(out)
        logit = self.logit(embeddings)
        return logit, embeddings

In [None]:
class CosineMarginNet(Net):
    def __init__(self):
        super().__init__()
        
    def forward(self, x, y=None):
        out = self.conv_block1(x)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = self.conv_block4(out)
        embeddings = self.embedding(out)
        logit = self.cosine_margin(embeddings, y)
        return logit, embeddings 

In [None]:
class ArcMarginNet(Net):
    def __init__(self):
        super().__init__()
        
    def forward(self, x, y=None):
        out = self.conv_block1(x)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = self.conv_block4(out)
        embeddings = self.embedding(out)
        logit = self.arc_margin(embeddings, y)
        return logit, embeddings

In [None]:
class LargeMarginNet(Net):
    def __init__(self):
        super().__init__()
        
    def forward(self, x, y=None):
        out = self.conv_block1(x)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = self.conv_block4(out)
        embeddings = self.embedding(out)
        logit = self.large_margin(embeddings, y)
        return logit, embeddings

### Data augmentation 

In [None]:
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(7),
    transforms.RandomAffine(0, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Lambda(utils.to_bgr_transform)])
transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Lambda(utils.to_bgr_transform)])

### Loading datasets 

In [None]:
train_set = ImageFolder(train_dir, transform=transform_train)
test_set = ImageFolder(test_dir, transform=transform_test)

In [None]:
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
batch_size = 512
train_loader = DataLoader(
    train_set, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(
    test_set, batch_size=batch_size, shuffle=True, num_workers=4)
num_classes = len(train_loader.dataset.classes)

### Train and evaluate functions 

In [None]:
def train(net, epoch):
    optimizer = Adam(net.parameters(), lr=0.003, weight_decay=0.00005)
    net.train()
    total_loss = 0
    mini_batch_step = 200

    for batch_step, (x, y) in enumerate(train_loader):
        writer_step = epoch * len(train_loader) + batch_step
        x = torch.tensor(x).to(device)
        y = torch.tensor(y).to(device)
        optimizer.zero_grad()
        
        output, _ = net(x, y)

        loss = criterion(input=output, target=y).to(device)
        loss.backward()

        optimizer.step()

        total_loss += loss.item()
        if batch_step % mini_batch_step == mini_batch_step - 1:
            log_step_loss = total_loss / mini_batch_step
            print(
                f"Train Epoch: {epoch} [{batch_step * len(x)}/{len(train_loader.dataset)} "
                f"({100 * batch_step / len(train_loader):.0f}%)]\tLoss: {log_step_loss:.6f}")
            total_loss = 0
    return net

In [None]:
def evaluate(net, epoch, data_loader, set_name):
        net.eval()  # Reqired parameter in evaluation mode
        num_correct = total_loss = 0
        with torch.no_grad():  # Turns off gradient update
            for batch_step, (x, y) in enumerate(data_loader):
                pos_true_batch = sum(y == 1).item()
                writer_step = epoch * len(train_loader) + batch_step
                x = torch.tensor(x, requires_grad=False).to(device)
                y = torch.tensor(y, requires_grad=False).to(device)

                output, _ = net(x)

                y_pred = output.data.max(1)[1]
                # Alternative prediction calculation
                # _, y_pred = torch.max(output, 1)
                loss = criterion(input=output, target=y).to(device)
                total_loss += loss.item()
                num_correct += y_pred.eq(y).long().sum().item()
            average_loss = total_loss / len(data_loader)
            accuracy = num_correct / len(data_loader.dataset) * 100
        print(
            f"{set_name} set:\tLoss: {average_loss:.6f} Accuracy: {num_correct}/{len(data_loader.dataset)} "
            f"({accuracy:.4f} %)")

In [None]:
net = CosineMarginNet()
for epoch in range(1, 5):
    train(net, epoch)
    evaluate(net, epoch, train_loader, set_name="Train")
    evaluate(net, epoch, test_loader, set_name="Test")

### Saving embeddings 

In [None]:
src_path = 'groups_to_cluster_from_tracker/olimpic__0-82__groups/'
files = sorted([src_path + x for x in os.listdir(src_path)])

In [None]:
images = []
for file in files:
    images.append(cv2.resize(cv2.imread(file), (25, 40)))
images = np.array(images)

In [None]:
embeddings = np.array([]).reshape((0, 64))
for batch in np.split(images, images.shape[0] / 1024):
    embeddings = np.concatenate([embeddings, np.array(net.predict(batch)[1])])

In [None]:
X_df = pd.DataFrame(embeddings)
y_df = pd.Series([[person_type in f for person_type in ["referee", "team1", "team2"]].index(True) for f in files])

In [None]:
X_df.to_pickle("X_df_embed")
y_df.to_pickle("y_df_embed")