# U-Net Implementation

![U-Net Architecture](http://openresearch.ai/uploads/default/optimized/1X/ec0ac2e2d2df8f213b916453375ccee95a254ac3_1_616x500.png)



## Install Dependencies

Just for you know, this installation cell won't work on your computer. But, all the following cells will

In [0]:
# http://pytorch.org/
from os import path
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())

accelerator = 'cu80' if path.exists('/opt/bin/nvidia-smi') else 'cpu'

!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision 
  
!pip install --no-cache-dir -I pillow

Collecting pillow
[?25l  Downloading https://files.pythonhosted.org/packages/62/94/5430ebaa83f91cc7a9f687ff5238e26164a779cca2ef9903232268b0a318/Pillow-5.3.0-cp36-cp36m-manylinux1_x86_64.whl (2.0MB)
[K    100% |████████████████████████████████| 2.0MB 17.1MB/s 
[?25hInstalling collected packages: pillow
Successfully installed pillow-5.3.0


## Setup your running folder

This cell will mount your folder to use your files like in a normal computer. **MAKE SURE** you change the path with the path of the folder you're working on your Drive.

In [0]:
# Load the Drive helper and mount
from google.colab import drive

# Mount the drive folder. This will prompt for authorization.
drive.mount('/content/drive', force_remount=True)


# Opens the project folder. IMPORTANT: Change to your route
%cd 'drive/My Drive/UNet'

Mounted at /content/drive
/content/drive/My Drive/UNet


## Modules Importation


In [0]:
import sys
import os
import numpy as np
import random
import time
import math
from matplotlib import pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms
import torchvision.transforms.functional as Ft

from PIL import Image
def register_extension(id, extension):
    Image.EXTENSION[extension.lower()] = id.upper()
Image.register_extension = register_extension
def register_extensions(id, extensions): 
    for extension in extensions:
        register_extension(id, extension)
Image.register_extensions = register_extensions

# Use GPU or not
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

## Misc Functions

This functions add supportive funcionalities.

In [0]:
def save_and_print(text):
    print(text)
    with open(file_Name, 'a') as file:
        file.write(text + '\n')
        
def time_me(*arg):
    if len(arg) != 0: 
        elapsedTime = time.time() - arg[0];
        hours = math.floor(elapsedTime / (60*60))
        elapsedTime = elapsedTime - hours * (60*60);
        minutes = math.floor(elapsedTime / 60)
        elapsedTime = elapsedTime - minutes * (60);
        seconds = math.floor(elapsedTime);
        elapsedTime = elapsedTime - seconds;
        ms = elapsedTime * 1000;
        if(hours != 0):
            return "%d hours %d minutes %d seconds" % (hours, minutes, seconds)
        elif(minutes != 0):
            return "%d minutes %d seconds" % (minutes, seconds)
        else :
            return "%d seconds %f ms" % (seconds, ms)
    else:
        return time.time()

## Definition of the Architecture

As we show in the above image, U-Net consists on a big amounts of layers. In the following 2 cells all the parts of U-Net will be implemented. Each part have its own comment.

In [0]:
class double_conv(nn.Module):
    ''' Applies (conv => BN => ReLU) two times. '''

    def __init__(self, in_ch, out_ch):
        super(double_conv, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_ch),
            # inplace is for aply ReLU to the original place, saving memory
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_ch),
            # inplace is for aply ReLU to the original place, saving memory
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.conv(x)
        return x


class inconv(nn.Module):
    ''' First Section of U-Net. '''

    def __init__(self, in_ch, out_ch):
        super(inconv, self).__init__()

        self.conv = double_conv(in_ch, out_ch)

    def forward(self, x):
        x = self.conv(x)
        return x


class down(nn.Module):
    ''' Applies a MaxPool with a Kernel of 2x2,
        then applies a double convolution pack. '''

    def __init__(self, in_ch, out_ch):
        super(down, self).__init__()

        self.mpconv = nn.Sequential(
            nn.MaxPool2d(kernel_size=2),
            double_conv(in_ch, out_ch)
        )

    def forward(self, x):
        x = self.mpconv(x)
        return x


class up(nn.Module):
    ''' Applies a Deconvolution and then applies applies a double convolution pack. '''

    def __init__(self, in_ch, out_ch, bilinear=False):
        super(up, self).__init__()
        
        # Bilinear is used to save computational cost
        if bilinear:
            self.up = nn.Upsample(
                scale_factor=2, mode='bilinear', align_corners=True)
        else:
            self.up = nn.ConvTranspose2d(
                in_ch//2, in_ch//2, kernel_size=2, stride=2)

        self.conv = double_conv(in_ch, out_ch)

    # the layers on the right are x1 and the ones on the left are x2.
    def forward(self, x1, x2):
        x1 = self.up(x1)
        diffX = x1.size()[2] - x2.size()[2]
        diffY = x1.size()[3] - x2.size()[3]
        x2 = F.pad(input=x2, pad=(diffX // 2, diffX // 2,
                                  diffY // 2, diffY // 2))
        x = torch.cat([x2, x1], dim=1)
        x = self.conv(x)
        return x


class outconv(nn.Module):
    ''' Applies the last Convolution to give an answer. '''

    def __init__(self, in_ch, out_ch):
        super(outconv, self).__init__()

        self.conv = nn.Conv2d(in_ch, out_ch, kernel_size=1)

    def forward(self, x):
        x = self.conv(x)
        return x
      

In [0]:
class UNet(nn.Module):
    ''' This Object defines the architecture of U-Net. '''

    def __init__(self, n_channels, n_classes):
        super(UNet, self).__init__()

        self.inc = inconv(n_channels, 64)
        self.down1 = down(64, 128)
        self.down2 = down(128, 256)
        self.down3 = down(256, 512)
        self.down4 = down(512, 512)
        self.up1 = up(1024, 256)
        self.up2 = up(512, 128)
        self.up3 = up(256, 64)
        self.up4 = up(128, 64)
        self.outc = outconv(64, n_classes)
        
        
    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        x = self.outc(x)
        return x

## Functions for Loading the Data

The nexts 3 cells are for the loading process. In this case we use the Dataset and Dataloader objects given by Pytorch. In the training process we will call this section

In [0]:
class BBBCDataset(Dataset):
    def __init__(self, ids, dir_data, dir_gt, extension='.png', gt_label='_mask'):

        self.dir_data = dir_data
        self.dir_gt = dir_gt
        self.extension = extension
        self.gt_label = gt_label

        # Transforms
        self.transformations = transforms.ToTensor()

        # Images IDS
        self.ids = ids

        # Calculate len of data
        self.data_len = len(self.ids)

    def __getitem__(self, index):
        # Get an ID of a specific image
        id_img = self.dir_data + self.ids[index] + self.extension
        id_gt = self.dir_gt + self.ids[index] + self.extension
        # Open Image and GroundTruth
        img = Image.open(id_img)
        gt = Image.open(id_gt)
        # Applies transformations
        img = self.transformations(img)
        gt = self.transformations(gt)

        return (img, gt)

    def __len__(self):
        return self.data_len

In [0]:
def get_dataloaders(dir_img, dir_gt, test_percent=0.2, batch_size=10):
    # Validate a correct percentage
    test_percent = test_percent/100 if test_percent > 1 else test_percent
    # Read the names of the images
    ids = [f[:-4] for f in os.listdir(dir_img)]
    # Rearrange the images
    random.shuffle(ids)
    # Calculate index of partition
    part = int(len(ids) * test_percent)
    

    # Split dataset between train and test
    train_ids = ids[part:]
    test_ids = ids[:part]

    # Create the datasets
    train_dataset = BBBCDataset(ids=train_ids, dir_data=dir_img, dir_gt=dir_gt)
    test_dataset = BBBCDataset(ids=test_ids, dir_data=dir_img, dir_gt=dir_gt)

    # Create the loaders
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    return train_loader, test_loader

## Loss functions

In [0]:
def weight(X):
    # K Classes
    K = torch.from_numpy(np.unique(X.cpu().detach().numpy())).float().to(device)
    W = X.clone()
    for k in K:
        W[X == k] = W[X == k].size()[0]/X.size()[0]
    return W

def cross_entropy(y_pred, y_true, W):
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    return torch.mean(W*(-(y_true_f*torch.log10(y_pred_f) + (1 - y_true_f)*torch.log10(1 - y_pred_f)) + 1e-8))

def hinge(y_pred, y_true, W):
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    return torch.max(torch.cuda.FloatTensor([0]), torch.mean(W*(1 - y_pred_f*y_true_f)))

def huber(y_pred, y_true, W, delta=0.5):
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    delta = torch.cuda.FloatTensor([delta])
    _abs = torch.abs(y_true - y_pred)
    if _abs.mean() < delta:
        return torch.mean(0.5*W*(y_true_f - y_pred_f)**2)
    return torch.mean(W*(delta*_abs - 0.5*delta))

def MAE(y_pred, y_true, W):
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    return torch.mean(W*torch.abs(y_true_f - y_pred_f))

def MSE(y_pred, y_true, W):
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    return torch.mean(W*(y_pred_f - y_true_f)**2)

def dice_coef(y_pred, y_true, W):
    smooth = 1
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    intersection = torch.sum(y_true_f * y_pred_f)
    return -W*((2. * intersection + smooth) / (torch.sum(y_true_f) + torch.sum(y_pred_f) + smooth))

## Training the Model

The next 3 cells are for training the model. Each part is commented.

In [0]:
def train_net(net, device, loader, dir_checkpoint, optimizer,
              loss_function=dice_coef, epochs=5, run=""):
    ''' Train the CNN. '''
    loss_results = []
    for epoch in range(epochs):
        save_and_print('\nStarting epoch {}/{}.'.format(epoch + 1, epochs))

        net.train()
        train_loss = 0
        cont = 0
        time_var = time_me()
        for batch_idx, (data, gt) in enumerate(loader):

            # Use GPU or not
            data, gt = data.to(device, dtype=torch.float), gt.to(device, dtype=torch.float)

            optimizer.zero_grad()

            # Forward
            predictions = net(data)

            # To calculate Loss
            pred_probs = torch.sigmoid(predictions)
            pred_probs_flat = pred_probs.view(-1)
            gt_flat = gt.view(-1)

            # Loss Calculation
            W = weight(gt_flat)
            loss = loss_function(pred_probs, gt, W).to(device, dtype=torch.float)
            train_loss += loss.item()
            cont += 1

            # Backpropagation
            loss.backward()
            optimizer.step()

            save_and_print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch+1, batch_idx * len(data), len(loader.dataset),
                100. * batch_idx / len(loader), loss.item()))

        train_loss /= cont
        save_and_print('\nAverage Training Loss: ' + str(train_loss))
        save_and_print('Train Time: It tooks '+time_me(time_var)+' to finish the epoch.')
        loss_results.append(train_loss)
    print(loss_results)
    
    # Save the weights
    #torch.save(net.state_dict(), dir_checkpoint + 'weights'+run+'.pth')
        
    return train_loss

In [0]:
def test_net(net, device, loader, loss_function=dice_coef):
    ''' Test the CNN '''
    net.eval()
    test_loss = 0
    cont = 0
    time_var = time_me()
    with torch.no_grad():
        for data, gt in loader:

            # Use GPU or not
            data, gt = data.to(device, dtype=torch.float), gt.to(device, dtype=torch.float)

            # Forward
            predictions = net(data)

            # To calculate Loss
            pred_probs = torch.sigmoid(predictions)
            
            # Loss Calculation
            W = weight(gt.view(-1))
            test_loss += loss_function(pred_probs, gt, W).item()
            cont += 1

    test_loss /= cont
    save_and_print('\nTest set: Average loss: ' + str(test_loss))
    save_and_print('Test time: It tooks ' + time_me(time_var) + ' to finish the Test.')
    return test_loss

In [0]:
def setup_and_run_train(load=False, test_perc=0.2, batch_size=10,
                        loss_function=dice_coef, optimizer=optim.Adam, epochs=5,
                        lr=0.1, run=""):

    # Use GPU or not
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    # Create the model
    net = UNet(n_channels=1, n_classes=1).to(device)

    # Load old weights
    if load:
        net.load_state_dict(torch.load(load))
        save_and_print('Model loaded from {}'.format(load))

    # Location of the images to use
    dir_img = 'data/original/'
    dir_gt = 'data/gt/'
    dir_checkpoint = 'checkpoints/'

    # Load the dataset
    train_loader, test_loader = get_dataloaders(dir_img, dir_gt,
                                                test_perc, batch_size)
    # Optimizer
    optimizer = optimizer(net.parameters(), lr=lr)
    
    # Pretty print of the run
    save_and_print('''
    Starting training:
        Epochs: {}
        Batch size: {}
        Learning rate: {}
        Training size: {}
        Testing size: {}
        CUDA: {}
    '''.format(epochs, batch_size, lr, len(train_loader.dataset),
               len(test_loader.dataset), str(use_cuda)))

    # Run the training and testing
    try:
        time_var = time_me()
        train_loss = train_net(net=net,
                  epochs=epochs,
                  device=device,
                  dir_checkpoint=dir_checkpoint,
                  loader=train_loader,
                  optimizer=optimizer,
                  loss_function=loss_function,
                  run = run)
        test_loss = test_net(net=net, device=device, loader=test_loader, loss_function=loss_function)
        save_and_print('\nRun time: It tooks ' + time_me(time_var) + ' to finish the run.')
        return net
    except KeyboardInterrupt:
        torch.save(net.state_dict(), 'INTERRUPTED.pth')
        save_and_print('Saved interrupt')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

## Result

In [0]:
def predict_imgs(net, device, loader, optim_name, loss_function_name, show=False):
    with torch.no_grad():
        i = 0
        for batch_idx, (data, gt) in enumerate(loader):
            # Use GPU or not
            data, gt = data.to(device, dtype=torch.float), gt.to(device, dtype=torch.float)
            
            if show:
                # Shows original image
                data_img = transforms.ToPILImage()(data.squeeze(0).cpu()).convert('RGB')
                fig=plt.figure(figsize=(20, 20))
                fig.add_subplot(2, 2, 1)
                plt.imshow(data_img)
            
            # Forward
            predictions = net(data)

            # Apply sigmoid
            pred_probs = torch.sigmoid(predictions).squeeze(0)
            
            # Shows prediction
            if show:
                fig_name = "outputs/{}_{}{}.png"
                # Shows prediction
                pred = transforms.ToPILImage()(predictions.squeeze(0).cpu()).convert('RGB')
                fig.add_subplot(2, 2, 2)
                plt.savefig(fig_name.format(optim_name, loss_function_name, i))
                plt.imshow(pred)
                # Shows prediction probability
                pred_p = transforms.ToPILImage()(pred_probs.cpu()).convert('RGB')
                fig.add_subplot(2, 2, 3)
                plt.imshow(pred_p)
                plt.savefig(fig_name.format(optim_name, loss_function_name, i))
                # Shows gt
                gt_img = transforms.ToPILImage()(gt.squeeze(0).cpu()).convert('RGB')
                fig.add_subplot(2, 2, 4)
                plt.imshow(gt_img)
                plt.savefig(fig_name.format(optim_name, loss_function_name, i))
                plt.show()
            
            i += 1;

def get_predloader(dir_img, dir_gt, batch_size=1):
    # Read the names of the images
    ids = [f[:-4] for f in os.listdir(dir_img)]
    # Rearrange the images
    random.shuffle(ids)
    # Calculate index of partition
    ids_pred = ids[:10]

    # Create the datasets
    pred_dataset = BBBCDataset(ids=ids_pred, dir_data=dir_img, dir_gt=dir_gt)

    # Create the loaders
    pred_loader = DataLoader(pred_dataset, batch_size=batch_size, shuffle=True)

    return pred_loader

def predict(net, optim_name, loss_function_name):
    # Use GPU or not
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    # Location of the images to use
    dir_pred = 'data/original/'
    dir_gt = 'data/gt/'

    # Load the dataset
    pred_loader = get_predloader(dir_pred, dir_gt)

    # Run the prediction
    predict_imgs(net=net,
                 device=device,
                 loader=pred_loader,
                 optim_name=optim_name,
                 loss_function_name=loss_function_name,
                 show=True)

## Running the Training

Now, we will run the training. You can change this hyperparameters to see how affects the results.

### Only one run

In [0]:
file_Name = "Prueba.txt"

# Definition of the loss functions
loss_functions_names = ['Cross Entropy', 'Hinge', 'Huber', 'MAE', 'MSE',
                        'DiceCoefficient']
loss_functions = [cross_entropy, hinge, huber, MAE, MSE, dice_coef]

# Definition of the optimizers
optimizers_names = ['AdaGrad', 'Adadelta', 'Adam', 'Adamax', 'ASGD',
                    'RMSprop', 'Rprop', 'SGD']
optimizers = [optim.Adagrad, optim.Adadelta, optim.Adam, optim.Adamax,
              optim.ASGD, optim.RMSprop, optim.Rprop, optim.SGD]

_epochs = 20
alpha = 0.0001

if torch.cuda.is_available():
    torch.cuda.manual_seed_all(0)
else:
    torch.manual_seed(0)

for optimizer, optimizer_name in zip(optimizers, optimizers_names):
    save_and_print(optimizer_name)
    
    for loss_f, loss_f_name in zip(loss_functions, loss_functions_names):
        save_and_print(loss_f_name)
        net = setup_and_run_train(load = False,
                                  test_perc = 0.2,
                                  batch_size = 10,
                                  loss_function=loss_f,
                                  optimizer=optimizer,
                                  epochs = _epochs,
                                  lr = alpha)
        predict(net, optimizer_name, loss_f_name)

AdaGrad
Cross Entropy

    Starting training:
        Epochs: 20
        Batch size: 10
        Learning rate: 0.0001
        Training size: 144
        Testing size: 36
        CUDA: True
    

Starting epoch 1/20.

Average Training Loss: 0.17273051639397938
Train Time: It tooks 36 seconds 759.271622 ms to finish the epoch.

Starting epoch 2/20.

Average Training Loss: 0.15374727149804432
Train Time: It tooks 22 seconds 47.223568 ms to finish the epoch.

Starting epoch 3/20.

Average Training Loss: 0.1467400868733724
Train Time: It tooks 22 seconds 56.663990 ms to finish the epoch.

Starting epoch 4/20.

Average Training Loss: 0.1425210217634837
Train Time: It tooks 22 seconds 136.067390 ms to finish the epoch.

Starting epoch 5/20.

Average Training Loss: 0.13957193990548453
Train Time: It tooks 22 seconds 127.024174 ms to finish the epoch.

Starting epoch 6/20.

Average Training Loss: 0.13757597506046296
Train Time: It tooks 22 seconds 119.784594 ms to finish the epoch.

Starting ep

RuntimeError: ignored

### Several runs - For result reports

In [0]:
# file_Name = "USM-Vainilla.txt"
# runs = 5
# acum_train = 0
# acum_test = 0
# for i in range(runs):
#     save_and_print('-'*10 + 'Start run {}'.format(i+1) + '-'*10)
#     train_loss, test_loss = setup_and_run_train(load = False,
#               test_perc = 0.2,
#               batch_size = 10,
#               epochs = 20,
#               lr = 0.01, run=str(i+1))
#     acum_train += train_loss
#     acum_test += test_loss

# acum_train /= runs
# acum_test /= runs

# save_and_print('\nAfter '+str(runs)+' runs: \n\tAverage Train Loss: '+str(acum_train)+'\n\tAverage Test Loss: '+str(acum_test))

## Seeing the results

In the following charts it's the code that uses the trained model and predict some samples.

In [0]:
def predict_imgs(net, device, loader, show=False):
    with torch.no_grad():
        for batch_idx, (data, gt) in enumerate(loader):
   
            # Use GPU or not
            data, gt = data.to(device, dtype=torch.float), gt.to(device, dtype=torch.float)
            
            
            if show:
                # Shows original image
                data_img = transforms.ToPILImage()(data.squeeze(0).cpu()).convert('RGB')
                fig=plt.figure(figsize=(20, 20))
                fig.add_subplot(1, 4, 1)
                plt.imshow(data_img)
            
            # Forward
            predictions = net(data)

            # Apply sigmoid
            pred_probs = torch.sigmoid(predictions).squeeze(0)
            
            # Shows prediction
            if show:
                # Shows prediction
                pred = transforms.ToPILImage()(predictions.squeeze(0).cpu()).convert('RGB')
                fig.add_subplot(1, 4, 2)
                plt.imshow(pred)
                # Shows prediction probability
                pred_p = transforms.ToPILImage()(pred_probs.cpu()).convert('RGB')
                fig.add_subplot(1, 4, 3)
                plt.imshow(pred_p)
                # Shows gt
                gt_img = transforms.ToPILImage()(gt.squeeze(0).cpu()).convert('RGB')
                fig.add_subplot(1, 4, 4)
                plt.imshow(gt_img)
                plt.show()

In [0]:
def get_predloader(dir_img, dir_gt, batch_size=1):
    # Read the names of the images
    ids = [f[:-4] for f in os.listdir(dir_img)]
    # Rearrange the images
    random.shuffle(ids)
    # Calculate index of partition
    ids_pred = ids[:10]

    # Create the datasets
    pred_dataset = BBBCDataset(ids=ids_pred, dir_data=dir_img, dir_gt=dir_gt)

    # Create the loaders
    pred_loader = DataLoader(pred_dataset, batch_size=batch_size, shuffle=True)

    return pred_loader

def predict(load='checkpoints/CP1.pth'):

    # Use GPU or not
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    # Create the model
    net = UNet(n_channels=1, n_classes=1).to(device)

    # Load trained weights
    net.load_state_dict(torch.load(load))
    print('Model loaded from {}'.format(load))

    # Location of the images to use
    dir_pred = 'data/original/'
    dir_gt = 'data/gt/'

    # Load the dataset
    pred_loader = get_predloader(dir_pred, dir_gt)

    # Run the prediction
    predict_imgs(net=net,
                device=device,
                loader=pred_loader,
                show=True)

In [0]:
predict(load='checkpoints/weights.pth')