In [None]:
from google.colab import drive
from __future__ import print_function, division
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from torchsummary import summary
import time

# mount project dir
drive.mount('/content/gdrive')
# !ls '/content/gdrive/My Drive/'
path = '/content/gdrive/MyDrive/'
project_path = path + 'LR Research/Coding/notebooks-Colab/gate_detection/'
gate_path = path + 'TU Delft/Brightspace Courses/AE4317/Washington_OB_Race/Gate_Imgs/'

We are interested in detecting the next closest gate in the track, but the most important is to detect the flyable area that allows the drone to cross the gate. For this task, the bounding box should not enclose the entire gate; instead detects the inner area of the gate. Thus the SSD network is selected to perform this objective due to its high accuracy reported; we use a reduced variant named SSD7 that has only seven convolutional layers.

AlexNet and GoogLeNet [13], [14] showed excellent speed and performance in the ImageNet competition, and algorithms such as SSD and YOLO [4], [15] showed good performance in the detection part.

However, these algorithms were still inadequate in terms of the computing power requirement imposed on an embedded system.

When we overlay a single channel of our target (or prediction), we refer to this as a *mask* which illuminates the regions of an image where a specific class is present.

Ronneberger et al. (U-Net paper) discuss a loss weighting scheme for each pixel such that there is a higher weight at the border of segmented objects.

In [None]:
# get the CSV and the annotations
corner_landmarks = pd.read_csv(gate_path + 'corners.csv', header=None)
print(corner_landmarks, '\n')
# pick an image
n = 200
img_name = corner_landmarks.iloc[n, 0]
img_corners = corner_landmarks.iloc[n, 1:].to_numpy()
# get all corners from this image
while corner_landmarks.iloc[n, 0] == corner_landmarks.iloc[n+1, 0]:
    corner_to_append = corner_landmarks.iloc[n+1, 1:].to_numpy()
    img_corners = np.append(img_corners, corner_to_append)
    n += 1

img_corners = img_corners.astype('float').reshape(-1, 2)

In [None]:
# a helper function to show an image and its corners
def show_landmarks(img, corners):
    plt.axis('off')
    plt.imshow(img)
    plt.scatter(corners[:, 0], corners[:, 1], s=100, marker='.', c='r')
    
show_landmarks(io.imread(os.path.join(gate_path, img_name)), img_corners)

## The Gate Dataset

`torch.utils.data.Dataset` is an abstract class representing a dataset. The `GateDataset` should 
inherit `Dataset` and override the 
following methods:
+ `__len__` so that `len(dataset)` returns the size of the dataset.
+ `__getitem__` to support the indexing such that `dataset[i]` can be used to get $i$th sample.


In [None]:
class GateDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.gate_corner_frames = pd.read_csv(csv_file, header=None)
        self.gate_img_names = []
        self.gate_mask_names = []
        self.cnt_img = 0
        self.cnt_mask = 0

        for root, dirs, files in os.walk(self.root_dir):
            for filename in files:
                if filename[:4] == 'img_':
                    self.cnt_img += 1
                    self.gate_img_names.append(filename)
                    self.gate_mask_names.append('mask_' + filename[4:])
                elif filename[:4] == 'mask':
                    self.cnt_mask += 1
        
        assert self.cnt_img == self.cnt_mask == len(self.gate_mask_names) \
                            == len(self.gate_img_names), "Number of masks and imgs need to be the same"
        
        
    def __len__(self):
        return self.cnt_img
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir, self.gate_img_names[idx])
        mask_name = os.path.join(self.root_dir, self.gate_mask_names[idx])
        img = io.imread(img_name) # read as ndarray
        mask = io.imread(mask_name)
        sample = {'img': img, 'mask': mask}
        
        # apply transform if transform is not None
        if self.transform:
            sample = self.transform(sample)

        return sample

# initialzie the dataset
gate_dataset = GateDataset(gate_path + 'corners.csv', gate_path)

Now we have built the dataset for gates and masks, the `GateDataset` can iterate through the original dataset and get access to every image and mask.

In [None]:
# plot images and masks
fig = plt.figure(dpi=100)

num_of_subs = 3
sample_seed = np.random.randint(low=1, high=300, size=num_of_subs)

for id, i in enumerate(sample_seed):
    sample = gate_dataset[i]
    print(i, sample['img'].shape, sample['mask'].shape)
    # show img
    ax1 = plt.subplot(2, num_of_subs, id + 1)
    ax1.set_title('Sample #{}'.format(i))
    ax1.axis('off')
    ax1.imshow(sample['img'])
    # show mask
    ax2 = plt.subplot(2, num_of_subs, id + num_of_subs + 1)
    ax2.set_title('Sample #{}'.format(i))
    ax2.axis('off')
    ax2.imshow(sample['mask'])

    if i == num_of_subs - 1:
        break

## Transforms

Before we feed the data into our neural network, we need `transforms` to transform the data to `tensor` and make some data augmentations.

One issue we can see from the above is that the samples are not of the same size. Most neural networks expect the image of a fixed size. Therefore, we will need to write some preprocessing code. Let's create three transforms:

+ `Rescale`: to scale the image
+ `RandomCrop`: to crop from image randomly. This is data augmentation.
+ `ToTensor`: to convert the numpy images to torch images (**we need to swap axes**).

We will write them as *callable classes* instead of simple functions so that parameters of the transform need not to be passed everytime it's called. For this, we just need to implement `__call__` method and if required, `__init__` method. We can then use a transform like this:

```python
tsfm = Transform(params) # callable object like a function
transformed_sample = tsfm(sample)
```

In the preceding classification tasks, we scaled images to make them fit the input shape of the model. In *semantic segmentation*, this method would require us to re-map the predicted pixel categories back to the original-size input image. It would be very difficult to do this precisely, especially in segmented regions with different semantics. To avoid this problem, **we crop the images to set dimensions and do not scale them.** Specifically, we use the random cropping method used in image augmentation to crop the same region from input images and their labels.

### Segmentation Transform
```python
class ToTensor(object):
    def __call__(self, image, target):
        image = F.to_tensor(image)
        target = torch.as_tensor(np.array(target), dtype=torch.int64)
        return image, target
```

In [None]:
class Rescale():
    """Rescale the image in a sample to a given size."""
    def __init__(self, output_size):
        """
        Args:
            output_size (tuple or int): Desired output size.
            If tuple, output is matched to output_size. If int,
            smaller of image edges is matched to output_size keeping
            aspect ratio the same.
        """
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size
    
    def __call__(self, sample):
        img, mask = sample['img'], sample['mask']

        h, w = img.shape[:2]
        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size
        new_h, new_w = int(new_h), int(new_w)

        rescaled_img = transform.resize(img, (new_h, new_w))
        rescaled_mask = transform.resize(mask, (new_h, new_w))

        return {'img': rescaled_img, 'mask': rescaled_mask}

class CustomToTensor():
    """Convert ndarrays in sample to Tensors."""
    def __call__(self, sample):
        img, mask = sample['img'], sample['mask']

        # swap color axis
        # numpy image: H x W x C
        # torch image: C x H x W
        to_tensor = transforms.ToTensor()
        img = to_tensor(img).float()
        # implementing the transforms for image masks.
        #mask = torch.as_tensor(mask / 255).unsqueeze(0)
        mask = to_tensor(mask).int().float()
        return {'img': img, 'mask': mask}

## Compose Transforms

Let's say we want to rescale the shorter side if the image to 256 and then randomly crop a square of size 224 from it. i.e. we want to compose `Rescale` and `RandomCrop` transforms. `torchvision.transforms.Compose` is a simple callable class which allows us to do this.



In [None]:
new_hw = 224
print("Data rescaled to new width and height:", new_hw)
tsfm_dataset = GateDataset(gate_path + 'corners.csv', gate_path,
                           transform=transforms.Compose([Rescale((new_hw, new_hw)),
                                                         CustomToTensor()]))


In [None]:
# plot images and masks
fig = plt.figure(dpi=100)

num_of_subs = 3
sample_seed = np.random.randint(low=1, high=308, size=num_of_subs)
for id, i in enumerate(sample_seed):
    sample = tsfm_dataset[i]
    print(i, sample['img'].shape, sample['mask'].shape)
    # show img
    ax1 = plt.subplot(2, num_of_subs, id + 1)
    ax1.set_title('Sample #{}'.format(i))
    ax1.axis('off')
    ax1.imshow(sample['img'].numpy().transpose(1, 2, 0))
    # show mask
    ax2 = plt.subplot(2, num_of_subs, id + num_of_subs + 1)
    ax2.set_title('Sample #{}'.format(i))
    ax2.axis('off')
    #ax2.imshow(transforms.ToPILImage()(sample['mask']))
    ax2.imshow(sample['mask'].squeeze(0))

    if i == num_of_subs - 1:
        break

## Iterating through the dataset by using `DataLoader`

Let's put this all together to create a dataset with composed transforms. To summarize, every time this dataset is sampled:

 1. An image is read from the file on the fly
 2. Transforms are applied on the read image
 3. Since one of the transforms is random, data is augmentated on sampling

However, we are losing a lot of features by using a simple `for` loop to iterate over the data. In particular, we are missing out on:

+ Batching the data
+ Shuffling the data
+ Load the data in parallel using `multiprocessing` workers

`torch.utils.data.DataLoader` is an iterator which provides all these features. Parameters used below should be clear. One parameter of interest is `collate_fn`. You can specify how exactly the sample need to be batched using `collate_fn`. However, default collate should work fine for most use cases.

In [None]:
dataloader = DataLoader(tsfm_dataset,
                        batch_size=32,
                        shuffle=True)

In [None]:
# helper function to show a batch
def show_samples_batch(sample_batched):
    """Show image and masks for a batch of samples"""
    imgs_batch, masks_batch = sample_batched['img'], sample_batched['mask']
    grid_padding = 2 # default padding = 2
    ax1 = plt.subplot(2,1,1)
    ax1.axis('off')
    img_grid = utils.make_grid(imgs_batch, nrow=4, padding=grid_padding)
    ax1.imshow(img_grid.numpy().transpose((1, 2, 0)))

    ax2 = plt.subplot(2,1,2)
    mask_grid = utils.make_grid(masks_batch, nrow=4, padding=grid_padding)
    ax2.imshow(mask_grid.numpy().transpose((1, 2, 0)))

# for i_batch, sample_batched in enumerate(dataloader):
#     print(i_batch, sample_batched['img'].size(), sample_batched['mask'].size())
#     # observe 4th batch and stop

#     if i_batch == 1:
#         plt.figure()
#         plt.title('Batch from dataloader')
#         show_samples_batch(sample_batched)
#         plt.axis('off')
#         plt.ioff()
#         plt.show()
#         break

## Transposed Convolution

As `nn.Conv2d`, both input and kernel should be 4-D tensors.

If the stride is $s$, the padding is $s/2$ (assuming padding is an integer), and the height and width of the convolution kernel are $2s$, the transposed convolution kernel will magnify both the height and width of the input by a factor of $s$.

- Initialize upsampling to bilinear interpolation, but allow the parameters to be learned.

- Skip net with finer upsampling - make local predictions that respect global structure. 

In [None]:
X = torch.tensor([[0., 1], [2, 3]])
K = torch.tensor([[0., 1], [2, 3]])
X, K = X.reshape(1, 1, 2, 2), K.reshape(1, 1, 2, 2)
# tconv = nn.ConvTranspose2d(1, 1, kernel_size=2, padding=1, bias=False)
# tconv = nn.ConvTranspose2d(1, 1, kernel_size=2, bias=False)
tconv = nn.ConvTranspose2d(1, 1, kernel_size=2, stride=2, bias=False)
tconv.weight.data = K
tconv(X)

## Build the Fully Convolutional Neural Network

### VGG Architecture 

During training, the input to our ConvNets is a fixed-size 224 × 224 RGB image. The only pre- processing we do is subtracting the mean RGB value, computed on the training set, from each pixel. The image is passed through a stack of convolutional (conv.) layers, where we use filters with a very small receptive field: 3 × 3 (which is the smallest size to capture the notion of left/right, up/down, center). In one of the configurations we also utilise 1 × 1 convolution filters, which can be seen as a linear transformation of the input channels (followed by non-linearity). The convolution stride is fixed to 1 pixel; the spatial padding of conv. layer input is such that the spatial resolution is preserved after convolution, i.e. the padding is 1 pixel for 3 × 3 conv. layers. Spatial pooling is carried out by five max-pooling layers, which follow some of the conv. layers (not all the conv. layers are followed by max-pooling). Max-pooling is performed over a 2 × 2 pixel window, with stride 2.

In [None]:
class FCNNet(nn.Module):
  def __init__(self):
    super(FCNNet, self).__init__()
    self.conv1 = nn.Conv2d(3,   16,  kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(16,  32,  kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(32,  64,  kernel_size=3, stride=1, padding=1)
    self.conv4 = nn.Conv2d(64,  96,  kernel_size=3, stride=1, padding=1)
    self.conv5 = nn.Conv2d(96,  128, kernel_size=3, stride=1, padding=1)

    self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

    self.deconv1 = nn.ConvTranspose2d(128, 96, kernel_size=2, stride=2)
    self.deconv2 = nn.ConvTranspose2d(96,  64, kernel_size=2, stride=2)
    self.deconv3 = nn.ConvTranspose2d(64,  32, kernel_size=2, stride=2)
    self.deconv4 = nn.ConvTranspose2d(32,  16, kernel_size=2, stride=2)

    self.classifier = nn.Conv2d(16, 1, kernel_size=1) # num_class = 1

  def forward(self, x):
    # subsampling
    out = F.relu(self.conv1(x)) 
    out = F.relu(self.maxpool(self.conv2(out)))
    out = F.relu(self.maxpool(self.conv3(out)))
    out = F.relu(self.maxpool(self.conv4(out)))
    out = F.relu(self.maxpool(self.conv5(out)))
    
    # upsampling
    out = F.relu(self.deconv1(out))
    out = F.relu(self.deconv2(out))
    out = F.relu(self.deconv3(out))
    out = F.relu(self.deconv4(out))
    score = self.classifier(out)

    return score

summary(FCNNet().cuda(), (3,224,224))

## FCN with skip layers and initialize Upsampling with bilinear

In [None]:
class FCNNet16s(nn.Module):
  def __init__(self):
    super(FCNNet16s, self).__init__()
    self.conv1 = nn.Conv2d(3,   32,  kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(32,  32,  kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(32,  128,  kernel_size=3, stride=1, padding=1)
    self.conv4 = nn.Conv2d(128,  512,  kernel_size=3, stride=1, padding=1)
    self.conv5 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)
    
    self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

    self.deconv16 = nn.Upsample(scale_factor=8, mode='bilinear')
    self.deconv2 = nn.Upsample(scale_factor=2, mode='bilinear')

    self.classifier = nn.Conv2d(512, 1, kernel_size=1) # num_class = 1

  def forward(self, x):
    # subsampling
    out  = F.relu(self.conv1(x))
    out  = F.relu(self.maxpool(self.conv2(out)))
    out  = F.relu(self.maxpool(self.conv3(out)))
    out4 = F.relu(self.maxpool(self.conv4(out)))
    out5 = F.relu(self.maxpool(self.conv5(out4)))
    
    # upsampling
    out = self.deconv2(out5) + out4
    out = F.relu(self.deconv16(out))
    score = self.classifier(out)

    return score

summary(FCNNet16s().cuda(), (3,224,224))

In [None]:
class FCNNet8s(nn.Module):
  def __init__(self):
    super(FCNNet8s, self).__init__()
    self.conv1 = nn.Conv2d(3,   32,  kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(32,  32,  kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(32,  128, kernel_size=3, stride=1, padding=1)
    self.conv4 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
    self.conv5 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)

    self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

    self.deconv4  = nn.Upsample(scale_factor=4, mode='bilinear')
    self.deconv2  = nn.Upsample(scale_factor=2, mode='bilinear')

    self.classifier = nn.Conv2d(128, 1, kernel_size=1) # num_class = 1

  def forward(self, x):
    # subsampling
    out  = F.relu(self.conv1(x))
    out  = F.relu(self.maxpool(self.conv2(out)))
    out3 = F.relu(self.maxpool(self.conv3(out)))
    out4 = F.relu(self.maxpool(self.conv4(out3)))
    out5 = F.relu(self.maxpool(self.conv5(out4)))
    
    # upsampling
    out = self.deconv2(out5) + out4
    out = self.deconv2(out)  + out3
    out = F.relu(self.deconv4(out))
    score = self.classifier(out)

    return score

summary(FCNNet8s().cuda(), (3,224,224))

In [None]:
class FCNNet8sWithBN(nn.Module):
  def __init__(self):
    super(FCNNet8sWithBN, self).__init__()
    self.conv1 = nn.Conv2d(3,   32,  kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(128, 512, kernel_size=3, stride=1, padding=1)
    self.conv4 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)
    self.conv5 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)

    self.conv_additional1 = nn.Conv2d(32,  128, kernel_size=3, stride=1, padding=1)
    self.conv_additional2 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)

    self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

    self.bn1 = nn.BatchNorm2d(32)
    self.bn2 = nn.BatchNorm2d(128)
    self.bn3 = nn.BatchNorm2d(256)
    self.bn4 = nn.BatchNorm2d(512)

    self.deconv4  = nn.Upsample(scale_factor=4, mode='bilinear')
    self.deconv2  = nn.Upsample(scale_factor=2, mode='bilinear')

    self.classifier = nn.Conv2d(512, 1, kernel_size=1) # num_class = 1

  def forward(self, x):
    # subsampling
    out  = F.relu(self.bn1(self.conv1(x))) #32
    out  = F.relu(self.bn2(self.conv_additional1(out))) #128
    out  = F.relu(self.maxpool(self.bn2(self.conv_additional2(out)))) #128

    out3 = F.relu(self.maxpool(self.bn4(self.conv3(out))))
    out4 = F.relu(self.maxpool(self.bn4(self.conv4(out3))))
    out5 = F.relu(self.maxpool(self.bn4(self.conv5(out4))))
    
    # upsampling
    out = self.deconv2(out5) + out4
    out = self.deconv2(out)  + out3
    out = F.relu(self.deconv4(out))
    score = self.classifier(out)

    return score

summary(FCNNet8sWithBN().cuda(), (3,224,224))

## Test BCE Criterion

In [None]:
loss = nn.BCEWithLogitsLoss()
input = torch.randn(2, requires_grad=True)
target = torch.empty(2).random_(2)
output = loss(input, target)
output.backward()
print(input, target, output)

In [None]:
-torch.log(torch.sigmoid(input)).sum()/2

## Training Model

Load transformed dataset consisting of gate images and masks and train the FCN model.

In [None]:
torch.cuda.empty_cache()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
if torch.cuda.is_available():
    fcn_model = FCNNet8sWithBN().to(device)

loss_fn = nn.BCEWithLogitsLoss().to(device)
optimizer = torch.optim.Adam(fcn_model.parameters(), lr=1e-4)
model_path = os.path.join(project_path, 'pretrained_models/fcn8s_wth_bn')

dataloader = DataLoader(tsfm_dataset,
                        batch_size=8,
                        shuffle=True)

In [None]:
epochs = 801

def train():
    for epoch in range(1, epochs):
        ts = time.time()
        for iter, batch in enumerate(dataloader):
            optimizer.zero_grad()

            if torch.cuda.is_available():
                inputs = batch['img'].to(device)
                labels = batch['mask'].to(device)

            outputs = fcn_model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

        print("epoch{}, loss: {}".format(epoch, loss.data))

        if epoch % 50 == 0:
            torch.save(fcn_model, model_path + '_{}_epochs.pt'.format(epoch))

if __name__ == '__main__':
    train()

## Vsualizing the output of the model

In [None]:
model_path = os.path.join(project_path, 'pretrained_models/fcn8s_wth_bn')
fcn_pretrained = torch.load(model_path + '_{}_epochs.pt'.format(500))
num_of_subs = 3
sample_seed = np.random.randint(low=1, high=308, size=num_of_subs)

In [None]:
# plot images and masks
fig = plt.figure(dpi=200)

for id, i in enumerate(sample_seed):
    sample = tsfm_dataset[i]
    print(i, sample['img'].shape, sample['mask'].shape)
    # show img
    ax1 = plt.subplot(3, num_of_subs, id + 1)
    ax1.axis('off')
    ax1.imshow(sample['img'].numpy().transpose(1, 2, 0))
    # show mask
    ax2 = plt.subplot(3, num_of_subs, id + num_of_subs + 1)
    ax2.axis('off')
    ax2.imshow(sample['mask'].squeeze(0))
    # show model output
    ax3 = plt.subplot(3, num_of_subs, id + 2 * num_of_subs + 1)
    ax3.axis('off')
    output = fcn_pretrained(sample['img'].unsqueeze(0).cuda()).detach().cpu().squeeze().numpy()
    ax3.imshow(output)

In [None]:
model_path = os.path.join(project_path, 'pretrained_models/fcn16s')
fcn_pretrained = torch.load(model_path + '_{}_epochs.pt'.format(500))
# plot images and masks
fig = plt.figure(dpi=200)

for id, i in enumerate(sample_seed):
    sample = tsfm_dataset[i]
    print(i, sample['img'].shape, sample['mask'].shape)
    # show mask
    ax2 = plt.subplot(2, num_of_subs, id + 1)
    ax2.axis('off')
    ax2.imshow(sample['mask'].squeeze(0))
    # show model output
    ax3 = plt.subplot(2, num_of_subs, id + num_of_subs + 1)
    ax3.axis('off')
    output = fcn_pretrained(sample['img'].unsqueeze(0).cuda()).detach().cpu().squeeze().numpy()
    ax3.imshow(output)

In [None]:
model_path = os.path.join(project_path, 'pretrained_models/fcn8s')
fcn_pretrained = torch.load(model_path + '_{}_epochs.pt'.format(500))
# plot images and masks
fig = plt.figure(dpi=200)

for id, i in enumerate(sample_seed):
    sample = tsfm_dataset[i]
    print(i, sample['img'].shape, sample['mask'].shape)
    # show mask
    ax2 = plt.subplot(2, num_of_subs, id + 1)
    ax2.axis('off')
    ax2.imshow(sample['mask'].squeeze(0))
    # show model output
    ax3 = plt.subplot(2, num_of_subs, id + num_of_subs + 1)
    ax3.axis('off')
    output = fcn_pretrained(sample['img'].unsqueeze(0).cuda()).detach().cpu().squeeze().numpy()
    ax3.imshow(output)