In [None]:
%config Completer.use_jedi = False

In [None]:
%matplotlib inline

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

import torchvision.transforms.functional as TF
from torchvision import datasets, transforms

import cv2
from PIL import Image

import numpy as np
import pandas as pd
import random

import os

import matplotlib.pyplot as plt
import matplotlib.cm as cm

plt.rcParams['figure.figsize'] = [12, 8]

## A simple average filter (`cv2.blur()`) with `PyTorch`
First we will implement an average filter using `PyTorch` with a `Conv2D` layer. As a reminder, a convolutional layer takes as inputs:
* the number of channel at the input
* the number of channel at the output
* the kernel size
* the stride
* the padding
* the dilation

In [None]:
img = cv2.imread('../Images/boat.png', cv2.IMREAD_GRAYSCALE)

plt.imshow(img, cmap=cm.gray)
plt.show()

In [None]:
class Unif_Blur(nn.Module):
    def __init__(self, kernel):
        super(Unif_Blur, self).__init__()

        assert(kernel % 2 == 1)

        padding = (kernel - 1)//2
        
        # out_dim = [(in_dim + 2*padding - dilation*(kernel-1) - 1)/stride] + 1
        self.filter = nn.Conv2d(1, 1, kernel_size=kernel, padding=padding, padding_mode='reflect', bias=False)
        
        self.filter.weight = nn.Parameter((1/kernel**2)*torch.ones_like(self.filter.weight))
        
    def forward(self, x):
        return self.filter(x)

In [None]:
blur = Unif_Blur(5)

img_tensor = torch.from_numpy(img)
img_tensor = torch.unsqueeze(img_tensor, 0) # add channel dim
img_tensor = torch.unsqueeze(img_tensor, 0) # add bs dim

torch_blur = blur(img_tensor.float())
torch_blur = np.reshape(torch_blur.data.numpy(), (512, 512))

plt.imshow(torch_blur, cmap=cm.gray)
plt.show()

In [None]:
cv_blur = cv2.blur(img.astype('float64'), (5, 5))

plt.imshow(cv_blur, cmap=cm.gray)
plt.show()

In [None]:
cv_blur

In [None]:
blured_image

## Classification
Classification is the most applied task in computer vision and is performed with convolutional neural networks (CNN). The basic idea for classification is to first extract features of an input image an then try to classify those features in order to predict the class of that image.  
In this tutorial, we will apply image classification on the MNIST dataset (handwritten numbers).  
The first task is to implement the `ConvNet` represented in the figure below.
![ConvNet.png](images/convnet.svg)

In [None]:
class ClassNet(nn.Module):
    def __init__(self, output_dim):
        super(ClassNet, self).__init__()
        
        self.feature_extractor = nn.Sequential(

        )
        
        self.classifier = nn.Sequential(

        )
        
    def forward(self, x):
        x = self.feature_extractor(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

### Definition of the data loader
We can retrieve the dataset from `PyTorch` and then define our `DataLoader`.

In [None]:
train_mnist = datasets.MNIST('data', train=True, download=True,
                             transform=transforms.Compose([
                             transforms.ToTensor(),
                             transforms.Normalize((0.1307,), (0.3081,))
                             ]))
test_mnist = datasets.MNIST('data', train=False, download=True,
                            transform=transforms.Compose([
                            transforms.ToTensor(),
                            transforms.Normalize((0.1307,), (0.3081,))
                            ]))

train_loader = DataLoader(train_mnist, batch_size=32, num_workers=2, shuffle=True)
test_loader = DataLoader(test_mnist, batch_size=256, num_workers=2, shuffle=False)

### Definition of the setup
Here we have to define the `ConvNet` model that we will use. We also need to define our loss function and an optimizer.

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model = ClassNet(10).to(device)

optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
criterion = nn.CrossEntropyLoss()

In [None]:
def compute_accuracy(y_pred, y_true):
    pred = y_pred.argmax(1, keepdim=True)
    correct = pred.eq(y_true.view_as(pred)).sum()
    accuracy = correct.float()/pred.shape[0]
    return accuracy

### Definition of a training loop
We can define a training loop where we pass sequentially the training data and we process the forward and backward passes.

In [None]:
def train_one_epoch(model, optimizer, train_loader, device, criterion):
    model.train()
    
    train_loss = 0
    train_acc = 0
    
    for iter_num, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs.to(device))
        
        loss = criterion(outputs, targets.to(device))
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        acc = compute_accuracy(outputs, targets.to(device))
        train_acc += acc.item()
        
    print('train loss: {}, train accuracy:{}'.format(train_loss/len(train_loader), 100*train_acc/len(train_loader)))
    
def test(model, test_loader, device, critetion):
    model.eval()
    
    test_loss = 0
    test_acc = 0
    
    with torch.no_grad():
        for iter_num, (inputs, targets) in enumerate(test_loader):
            outputs = model(inputs.to(device))

            loss = criterion(outputs, targets.to(device))        
            test_loss += loss.item()

            acc = compute_accuracy(outputs, targets.to(device))
            test_acc += acc.item()
        
    print('test loss: {}, test accuracy:{}'.format(test_loss/len(test_loader), 100*test_acc/len(test_loader)))
    
def train(num_epoch=10):
    for epoch in range(num_epoch):
        print('epoch: {}/{}'.format(epoch+1, num_epoch))
        train_one_epoch(model, optimizer, train_loader, device, criterion)
        test(model, test_loader, device, criterion)

In [None]:
train()

## Object detection
For object detection, the basic idea is the same as for the classification task. First we extract the features, then we perform a classification (to predict the classes) and a regression (to predict the bouding boxes).  
For this tutorial we will use the `RetinaNet` architecture. It is composed of a backbone network (extract the features), a Feature Pyramid Network (merge the features of different pyramid layer) and 2 subnetworks (classification/regression).

### Anchors
One fundamental concept with this architecture is the anchor. Anchors are pre-defined boxes with multiple scales and aspect ratios which are used as reference boxes. Anchors are assigned to a ground truth object’s box using an intersection-over-union (IoU) threshold of $0.5$, and to background if their IoU is in the interval $[0, 0.4)$.
Most of the time, we define $\sim100k$ anchors per images.  
In the following figure, we can see some anchors with different scales and aspect ratios.  
![anchors.png](images/anchors2.png)
Finally the following figure shows how anchors (green) are assigned to grounf truths (red).
![gt.png](images/anchor_box.png)

In [None]:
from models import RetinaNet, Anchors, FocalLoss, BBoxTransform, ClipBoxes, inference, RandAugmentation

### Definition of a `Dataset`
For this example, we will use a custom dataset (cards) thus we will define a `Dataset class` to retrieve the data.  
We will define a `Dataset`, a `Resizer`, a `Augmenter`, a `Normalizer` and an `UnNormalizer`.  
Finally, we will define a `collater` given that our data have not the same number of object per image.

In [None]:
class CSVDataset(Dataset):
    def __init__(self, annot, transform=None, ratio=None):
        self.annot = annot
        self.transform = transform

        self.image_data, self.classes = self.read_annotations()
        
        self.classes = sorted(list(set(self.classes)))
        
        self.image_names = list(self.image_data.keys())
        if ratio:
            self.image_names = self.image_names[:round(len(self.image_names)*ratio)]
        
    def __len__(self):
        return len(self.image_names)
    
    def __getitem__(self, idx):
        image = self.load_image(idx)
        annotation = self.load_annotations(idx)
        idx = self.image_names[idx]
        scale = torch.ones((2))
        
        sample = {'image': image, 'annotation': annotation, 'idx': idx, 'scale': scale}

        if self.transform:
            sample = self.transform(sample)
        
        return sample

    def num_classes(self):
        return len(self.classes)

    def index_to_class(self, idx):
        return self.classes[idx]

    def label_to_index(self, label):
        return self.classes.index(label)
        
    def load_annotations(self, index):
        annotation_list = self.image_data[self.image_names[index]]
        
        annotations = torch.zeros((0, 5))
        
        if len(annotation_list) == 0:
            return annotations
        
        for _, annot in enumerate(annotation_list):
            x1 = int(annot['x1'])
            y1 = int(annot['y1'])
            x2 = int(annot['x2'])
            y2 = int(annot['y2'])
            
            annotation = torch.zeros((1, 5))
            
            annotation[0, 0] = x1
            annotation[0, 1] = y1
            annotation[0, 2] = x2
            annotation[0, 3] = y2
            
            annotation[0, 4] = int(self.classes.index(annot['label']))
            
            annotations = torch.cat((annotations, annotation), axis=0)
            
        return annotations
        
    def load_image(self, index):
        image = Image.open(self.image_names[index]).convert('RGB')
        
        return image
    
    def read_annotations(self):
        with open(self.annot, newline='') as csvfile:
            f = pd.read_csv(csvfile, header=None)
            
            annotations = {}
            classes = []
            for index, row in f.iterrows():
                image_name = row[0]
                x1 = row[4]
                y1 = row[5]
                x2 = row[6]
                y2 = row[7]
                label = row[3].lower()
                
                classes.append(label)
                                
                if image_name not in annotations:
                    annotations[image_name] = []
                    
                annotations[image_name].append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2, 'label': label})
        
        return annotations, classes

In [None]:
class Resizer(object):
    def __init__(self, dims=(224, 224)):
        self.dims = dims

    def __call__(self, sample):
        image, annots = sample['image'], sample['annotation']

        w, h = image.size
        
        image = TF.resize(image, self.dims)
        image = TF.to_tensor(image)

        scale = torch.tensor([self.dims[0]/w, self.dims[1]/h])

        annots[:, 0] *= scale[0]
        annots[:, 1] *= scale[1]
        annots[:, 2] *= scale[0]
        annots[:, 3] *= scale[1]

        sample['image'] = image
        sample['annotation'] = annots
        sample['scale'] = scale

        return sample

In [None]:
class Augmenter(object):

    def __init__(self, ra=False):

        self.augment = RandAugmentation()
        self.ra = ra

    def __call__(self, sample, flip_x=0.5, prob_a=0.5):

        image, annots = sample['image'], sample['annotation']

        if self.ra:
            if random.random() < prob_a:

                image = TF.to_pil_image(image)

                augment_img, method = self.augment(image)
                
                op = method['method']
                val = method['value']

                boxes = annots[:, :4].clone()

                if op == 'rotate':
                    boxes = augmentations.rotate_boxes(image, boxes, val)

                if op == 'flip':
                    boxes = augmentations.flip_boxes(image, boxes)

                if op == 'mirror':
                    boxes = augmentations.mirror_boxes(image, boxes)

                new_boxes = torch.zeros((boxes.shape[0], 5))

                new_boxes[:, :4] = boxes
                new_boxes[:, 4] = annots[:, 4]

                augment_img = TF.to_tensor(augment_img)

                sample['image'] = augment_img
                sample['annotation'] = new_boxes

        else:
            if random.random() < flip_x:

                image = TF.to_pil_image(image)

                image = TF.hflip(image)

                w, h = image.size

                image = TF.to_tensor(image)

                x1 = annots[:, 0].clone()
                x2 = annots[:, 2].clone()
                
                x_tmp = x1.clone()

                annots[:, 0] = w - x2
                annots[:, 2] = w - x_tmp

                sample['image'] = image
                sample['annotation'] = annots

        return sample

In [None]:
class Normalizer(object):
    def __init__(self, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
        self.mean = mean
        self.std = std

    def __call__(self, sample):
        image = sample['image']
        image = TF.normalize(image, self.mean, self.std)

        sample['image'] = image

        return sample

class UnNormalizer(object):
    def __init__(self, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)

        return tensor

In [None]:
def collater(data):
    images = [s['image'] for s in data]
    annots = [s['annotation'] for s in data]
    scales = [s['scale'] for s in data]
    idxs = [s['idx'] for s in data]

    max_num_annots = max(annot.shape[0] for annot in annots)

    if max_num_annots > 0:
        annot_ = torch.ones((len(annots), max_num_annots, 5)) * -1

        if max_num_annots > 0:
            for idx, annot in enumerate(annots):
                if annot.shape[0] > 0:
                    annot_[idx, :annot.shape[0], :] = annot
    else:
        annot_ = torch.ones((len(annots), 1, 5)) * -1

    images = torch.stack(images)

    return {'image': images, 'annotation': annot_, 'scale': scales, 'idx': idxs}

In [None]:
def draw_caption(image, box, caption):
    b = np.array(box).astype(int)
    cv2.putText(image, caption, (b[0], b[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

def draw_bb(data, classes):
    image_tensor = data['image']
    annotations = data['annotation']
        
    image = np.array(255*UnNormalizer()(image_tensor))
    image = np.clip(image, 0, 255)
    image = np.transpose(image, (1, 2, 0))

    image = image.astype(np.uint8).copy()
    
    for i in range(annotations.shape[0]):
        x1 = int(annotations[i, 0])
        y1 = int(annotations[i, 1])
        x2 = int(annotations[i, 2])
        y2 = int(annotations[i, 3])
        
        image = cv2.rectangle(image, (x1, y1),(x2, y2), (0, 255, 0), 2)
        draw_caption(image, (x1, y1, x2, y2), classes[int(annotations[i, 4])])

    plt.imshow(image)
    plt.show()

In [None]:
train_dataset = CSVDataset('data/cards/train_labels.csv',
                              transform=transforms.Compose([Resizer((512, 512)), Augmenter(), Normalizer()]))

test_dataset = CSVDataset('data/cards/test_labels.csv',
                             transform=transforms.Compose([Resizer((512, 512)), Normalizer()]))

train_loader = DataLoader(train_dataset, num_workers=2, collate_fn=collater, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, num_workers=2, collate_fn=collater, batch_size=8, shuffle=False)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = FocalLoss(alpha=0.95, gamma=2, weight=1., device=device)
anchors = Anchors(ratios=[0.625, 1.0, 1.6], scales=[0.803, 1.017, 1.312], device=device)
regressBoxes = BBoxTransform()
clipBoxes = ClipBoxes()

In [None]:
model = RetinaNet('resnet101', num_classes=len(train_dataset.classes), pretrained=True)
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, verbose=True)

num_epochs = 100
for i in range(num_epochs):
    print('epoch: {}/{}'.format(i+1, num_epochs))
    model.train()

    class_loss = []
    reg_loss = []

    optimizer.zero_grad()
    for iter_num, data in enumerate(train_loader):
        imgs = data['image'].float().to(device)
        classification, regression = model(imgs)
        anchrs = anchors(imgs)
        classification_loss, regression_loss = criterion(classification, regression, anchrs, data['annotation'].to(device))
        classification_loss = classification_loss.mean()
        regression_loss = regression_loss.mean()

        loss = classification_loss + regression_loss

        class_loss.append(classification_loss.item())
        reg_loss.append(regression_loss.item())

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)

        optimizer.step()
        optimizer.zero_grad()

    classification_loss = np.mean(class_loss)
    regression_loss = np.mean(reg_loss)
    loss = classification_loss + regression_loss
    print('Training loss: {}'.format(loss))
    
    model.eval()

    class_loss = []
    reg_loss = []

    with torch.no_grad():
        for iter_num, data in enumerate(test_loader):

            imgs = data['image'].float().to(device)

            classification, regression = model(imgs)
            anchrs = anchors(imgs)
            classification_loss, regression_loss = criterion(classification, regression, anchrs, data['annotation'].to(device))

            classification_loss = classification_loss.mean()
            regression_loss = regression_loss.mean()

            loss = classification_loss + regression_loss

            class_loss.append(classification_loss.item())
            reg_loss.append(regression_loss.item())

    classification_loss = np.mean(class_loss)
    regression_loss = np.mean(reg_loss)
    loss = classification_loss + regression_loss
    print('Validation loss: {}'.format(loss))
    scheduler.step(loss)

In [None]:
predict_loader = DataLoader(test_dataset, num_workers=1, collate_fn=collater, batch_size=1, shuffle=False)
iter_loader = iter(predict_loader)

In [None]:
model.eval()

data = next(iter_loader)
with torch.no_grad():

    img = data['image'][0].float().to(device).unsqueeze(dim=0)
    classification, regression = model(img)
    anchrs = anchors(img)

    scores, labels, boxes = inference(classification, regression, img, anchrs, regressBoxes, clipBoxes, device, score_threshold=0.05, nms_threshold=0.2)

    idxs = np.where(scores.cpu()>=0.2)
    boxes = boxes[idxs]
    labels = labels[idxs].view((boxes.shape[0], 1))

    annotations = torch.cat((boxes, labels), 1)

    draw_bb({'image': data['image'][0], 'annotation': annotations}, train_dataset.classes)