# UNets and segmentation.
 
## The question we want to answer experimentally is: To what extent the architecture of UNet is responsible for the success of segmentation?


## Please check out the founding paper: https://arxiv.org/abs/1505.04597
    

In [3]:
### standard imports
import torch
from torch import nn
import numpy as np
import pylab as pl
import matplotlib as plt
import torch.optim as optim
import os
### line below is for mac in particular. Remove if not needed
os.environ['KMP_DUPLICATE_LIB_OK']='True'
%pylab inline

Populating the interactive namespace from numpy and matplotlib


`%matplotlib` prevents importing * from pylab and numpy
  warn("pylab import has clobbered these variables: %s"  % clobbered +


In [4]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import ExponentialLR
transform_train = transforms.Compose([
        transforms.ToTensor(),transforms.Resize((128, 128), interpolation=transforms.InterpolationMode.NEAREST)
    ])


## We use a dataset that is accessible within pytorch: the OxfordIIIPet dataset with segmentation of animals. 
## In the code above, we defined "transform_train" which is a simple resize operation since the images do not have the same size.
## It is also possible to use a class of transformation of the training set in order to augment the dataset, or to make it more invariant to some natural transformations.

In [5]:
### first introduce some parameters.
batch_size = 32 # you can modify.
test_batch_size = 32 # you can modify
data_augmentation = False # used to augment data with a set of transformations

In [6]:
class OxfordData(Dataset):
    def __init__(self,split = "trainval",option = True):
        self.data = datasets.OxfordIIITPet(root='.data/OxfordIIITPet', split=split, target_types="segmentation", download=True,
                               transform=transform_train)
        self.option = option
        print(len(self.data)," longueur des données")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, item):
        x,y = self.data[item]
        if data_augmentation == True:
            random_transform_1 = transforms.RandomCrop(80)
            #random_transform_2 = transforms.RandomAffine((-180, 179), scale=(0.8, 1.2))
            transform = transforms.Compose([transforms.ToTensor(),transforms.RandomApply(torch.nn.ModuleList([random_transform_1])), transforms.Resize((128, 128), interpolation=transforms.InterpolationMode.NEAREST)])
            transform2 = transforms.Compose([transforms.RandomApply(torch.nn.ModuleList([random_transform_1])), transforms.Resize((128, 128), interpolation=transforms.InterpolationMode.NEAREST)])
            x = transform2(x)
        else:
            transform = transforms.Compose([transforms.ToTensor(), transforms.Resize((128, 128), interpolation=transforms.InterpolationMode.NEAREST)])
        z = transform(y)
        z = z / torch.max(z)
        return torch.cat((x,z),dim=0)


def get_OxfordPet_loaders(batch_size= batch_size, test_batch_size=test_batch_size, perc=1.0):
    train_loader = DataLoader(
        OxfordData("trainval"), batch_size=batch_size,
        shuffle=True, num_workers=0, drop_last=True
    )

    train_eval_loader = DataLoader(
        OxfordData("trainval"),
        batch_size=test_batch_size, shuffle=False, num_workers=0, drop_last=True
    )

    test_loader = DataLoader(
        OxfordData(split = "test",option=False),
        batch_size=test_batch_size, shuffle=False, num_workers=0, drop_last=True
    )
    return train_loader, test_loader, train_eval_loader

def inf_generator(iterable):
    """Allows training with DataLoaders in a single infinite loop:
        for i, (x, y) in enumerate(inf_generator(train_loader)):
    """
    iterator = iterable.__iter__()
    while True:
        try:
            yield iterator.__next__()
        except StopIteration:
            iterator = iterable.__iter__()




## Data are four channels images. The first three ones contain the image and the last one the segmentation.

In [7]:
### On affiche le type de données.

train_loader, test_loader, train_eval_loader = get_OxfordPet_loaders(
        batch_size, test_batch_size
    )

data_gen = inf_generator(train_loader)
batches_per_epoch = len(train_loader)
test_gen = inf_generator(test_loader)
for i in range(2):
    x = data_gen.__next__()
    

figure = plt.figure(figsize=(12, 12))
cols,rows = 4,1
for i in range(0, cols * rows):
    z = x.numpy()
    label = x[:, -1, :, :].unsqueeze(1)
    index = i
    figure.add_subplot(rows, cols, i+1)
    plt.imshow(z[i, 0, :, :])
    plt.axis("off")

plt.show()

figure = plt.figure(figsize=(12, 12))
cols,rows = 4,1
for i in range(0, cols * rows):
    figure.add_subplot(rows, cols, i+1)
    plt.imshow(label[i, 0, :, :])
    
    plt.axis("off")

plt.show()



Downloading https://thor.robots.ox.ac.uk/datasets/pets/images.tar.gz to .data/OxfordIIITPet/oxford-iiit-pet/images.tar.gz


100%|██████████| 791918971/791918971 [00:11<00:00, 66539989.29it/s]


Failed download. Trying https -> http instead. Downloading http://thor.robots.ox.ac.uk/datasets/pets/images.tar.gz to .data/OxfordIIITPet/oxford-iiit-pet/images.tar.gz


  0%|          | 524288/791918971 [00:00<01:49, 7251686.78it/s]


OSError: [Errno 122] Disk quota exceeded

## Question 1: Implement a UNet class:
### 1. Below are two elementary functions: one which is downsampling after convolutions, nonlinearities and maxpooling. Batchnorm is also used.
### 2. Using these two functions, construct a simple UNet:
### The downsampling path is relatively straightforward, it is simply given by the composition of contract_blocks.
### In the upsampling path, you need to concatenate the results obtained at each level in the downsampling path with the result in the corresponding expand_block.

In [None]:
def contract_block(self, in_channels, out_channels, kernel_size, padding):
    contract = nn.Sequential(
        torch.nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding),
        torch.nn.BatchNorm2d(out_channels),
        torch.nn.ReLU(),
        torch.nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding),
        torch.nn.BatchNorm2d(out_channels),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=kernel_size, stride=2, padding=1)
    )
    return contract


def expand_block(self, in_channels, out_channels, kernel_size, padding, output_padding=1):
    expand = nn.Sequential(torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=padding),
                           torch.nn.BatchNorm2d(out_channels),
                           torch.nn.ReLU(),
                           torch.nn.Conv2d(out_channels, out_channels, kernel_size, stride=1, padding=padding),
                           torch.nn.BatchNorm2d(out_channels),
                           torch.nn.ReLU(),
                           torch.nn.ConvTranspose2d(out_channels, out_channels, kernel_size=kernel_size, stride=2,
                                                    padding=1, output_padding=output_padding)
                           )
    return expand

In [51]:
### your code here.
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        

    def __call__(self, x):
        # downsampling part
        result = 0
        # upsampling part       
        return result


## Question 2: Adapt the code below to implement a simple convolution model, possibly with skip connections as a ResNet like architecture with elementary blocks which are convolutions.

In [53]:
class CNET(nn.Module):
    def __init__(self, in_channels, out_channels,resnet = False):
        super().__init__()
        self.layers = nn.ModuleList()
        self.depth = 4
        self.resnet = resnet
        kernel_size = 3
        for i in range(self.depth):
            self.layers.append(self.block(in_channels,out_channels,kernel_size))
        

    def __call__(self, x):
        # downsampling part
        y = self.layers[0](x)
        for i in range(1,(self.depth)-1):
            if self.resnet == True:
                y = y + self.layers[i](y)
            else: 
                y = self.layers[i](y)
        return self.layers[-1](y)
        

    def block(self, in_channels, out_channels, kernel_size, padding = 1):
        bloc = nn.Sequential(
            torch.nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding),
            torch.nn.BatchNorm2d(out_channels),
            torch.nn.ReLU())
        return bloc


### We set up an optimizer (from published experiences, Adam seems the best default optimizer) and a scheduler to decrease the learning rate at every epoch.  

In [72]:
### In order to evaluate the results, you can use the following functions:
def dice(a,b):
    return - 2 * (torch.sum(a * b) +0.01)/(torch.sum(a) + torch.sum(b) - torch.sum(a * b) + 0.01)
def dice_on_batch(a,b):
    bs = a.shape[0]
    loss = 0
    for i in range(bs):
        loss+= dice(a[i,:,:,:], b[i,:,:,:])
    return loss

def ssd(a,b):
    return torch.sum((a - b)**2)

def performance(loader,n_iterations=20,genre = "training"):
    performance = 0
    with torch.no_grad():
        for _ in range(n_iterations):
            x = loader.__next__()
            label = x[:, -1, :, :]
            x = x[:, :3, :, :]
            x = x.to(device)
            label = label.unsqueeze(1)
            label = label.to(device)
            h = model(x)
            loss = ssd(h, label)
            performance += loss
        result = performance/n_iterations
    print("performance in "+genre+" is on a batch of "+str(n_iterations) +" : ", result)
    return result

In [73]:
#set the learning rate:
lr = 0.01

In [74]:
model = CNET(3,3,resnet = True)
optimizer = torch.optim.Adam(model.parameters(),lr=lr, betas=(0.95, 0.95))
scheduler = ExponentialLR(optimizer, gamma=0.95)

## Question 3: Implement the learning loop and test it on a UNet or CNet and show the results.

In [64]:
### your code here.