# TP: Segmentation par réseau de neurones convolutionnel

L'objectif du présent TP est d'entraîner un réseau de neurone à identifier le centre de particules sur des images obtenues dans des expériences de rhéologie. Le TP est implémenté dans le langage PyTorch, et l'architecture du réseau décrite dans le document PDF attaché. On s'intéressera à plusieurs aspects de l'implémentation:
- le chargement et la visualisation des données d'entraînement (Partie 1)
- l'implémentation du réseau (Partie 2)
- l'entraînement du réseau (Partie 3)

On visualisera enfin les résultats obtenus par le réseau sur la base de test.


### Partie 1: Données

Pour entraîner le réseau de neurones, nous disposons d'une base de données de 320 images, réparties en:
- 220 images d'entraînement
- 32 images de validation
- 64 images de test

Toutes les images ont une taille de 500 pixels par 500 pixels. La vérité terrain consiste en une image contenant un masque de segmentation et en une image contenant des marqueurs du centre des particules à détecter.

Afin d'augmenter artificiellement la taille des données, une pratique courante consiste à appliquer des transformations diverses aux images: distorsion d'histogramme, crops aléatoires, etc. Le code ci-dessous permet de charger des images d'entraînement et de validation, et d'appliquer certaines transformations aux images.

*Question 1.1: Lancer le code et modifier les différentes opérations d'augmentation de données pour visualiser les images sur lesquelles le réseau est entraîné.*

In [1]:
from dataset import *
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

ModuleNotFoundError: No module named 'dataset'

In [None]:
# Paths
root_dir = './dataset'
train_dir = 'train'
val_dir = 'val'
target_dir = 'labels'

# Training set
train_dataset = SegmentationDataset(
    root_dir= root_dir,
    input_dir= train_dir,
    target_dir= target_dir,
    transform=transforms.Compose([
        RandomCrop(200),
        Distort(),
        SetTarget(),
        Normalize(),
        ToTensor()]))

trainloader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size = 1,
    shuffle = True,
    num_workers = 0)



In [None]:
for i, sample in enumerate(trainloader):
                
        input_img = sample['input'][0]
        segm_true = sample['target'][0]
        
        fig, ax = plt.subplots(1, 3, figsize=(15, 5), sharex=True, sharey=True)
        ax[0].imshow(np.transpose(input_img, (2, 1, 0)), cmap = 'gray')
        ax[0].set_title('Input image')
        
        ax[1].imshow(np.transpose(segm_true[0]), cmap = 'gray')
        ax[1].set_title('Segmentation mask')
        
        ax[2].imshow(np.transpose(segm_true[1]))
        ax[2].set_title('particle markers')

        for a in ax.ravel():
            a.set_axis_off()
        plt.tight_layout()
        plt.show()
        if(i > 5):
            break

### Partie 2: Implémentation du réseau de neurone

Le réseau de neurone que nous utilisons s'appuie sur une structure de base constituée d'une couche de convolution, d'une non-linéarité de type RELU, et d'un module d'*adaptative batch normalization*, qui consiste à appliquer l'opération suivante au tenseur d'entrée:

$$
y = a BN(x) + bx,
$$

BN étant ici l'opérateur de *batch normalization*, donné par la formule mathématique suivante:

$$
BN(x) = \gamma \frac{x - \mathrm{E}[x]}{ \sqrt{\mathrm{Var}[x] + \epsilon}} + \beta
$$

*Question 2.1:* En utilisant la fonction BatchNorm2D déjà implémentée dans PyTorch, implémenter une classe permettant de définir une couche d'adaptative batch normalization en vous appuyant sur la structure de code ci-dessous.

In [None]:
class AdaptiveBatchNorm2d(nn.Module):

    """
    Adaptative batch normalization implementation
    """
    def __init__(self, num_features, momentum=.1, eps=1e-5, affine=True):

        """
        Class constructor

        An adaptative batch normalization layer takes as input a tensor x and outputs a tensor y.

        The shape of the input tensor is BxCxWxH where B is the number of images in each batch,
        C the number of features maps, W the width of the image and H the height of the image, 
        respectively.

        :param num_features: Number of features map
        :param momentum: Parameter used by the batch normalization layer to compute the statistics
        :param eps: Value added to the denominator for ensuring stability
        :param affine: When set to True, indicates that the batch normalization 
         layer has learnable affine parameters.

        :type num_features: int
        :type momentum: float
        :type eps: float
        :type affine: Boolean

        ..seealso:: Pytorch documentation for nn.BatchNorm2D
        """

        super(AdaptiveBatchNorm2d, self).__init__()
        self.bn = nn.BatchNorm2d(num_features, momentum, eps, affine)
        tens_a = torch.FloatTensor(1, 1, 1, 1)
        tens_b = torch.FloatTensor(1, 1, 1, 1)
        tens_a[0, 0, 0, 0] = 1
        tens_b[0, 0, 0, 0] = 0
        self.a = nn.Parameter(tens_a)
        self.b = nn.Parameter(tens_b)


    def forward(self, x):

        """
        Forward pass in the adaptative batch normalization layer
        
        .. math::
        y = a BN(x) + bx

        where BN is a batch normalization layer, and a and b are learneable parameters.

        :param x: Input tensor, with size BxCxWxH
        :type x: PyTorch tensor

        :return: Transformed tensor
        :rtype: PyTorch tensor
        """

        return self.a * x + self.b * self.bn(x)




*Question 2.2:* En utilisant les modules nn.Conv2d et nn.LeakyReLU déjà implémentés dans PyTorch, coder une classe permettant de définir une couche de convolutions dilatée en vous appuyant sur la structure de code ci-dessous. L'utilisateur devra pouvoir spécifier la profondeur des tenseurs d'entrée et de sortie, ainsi que la taille de la dilatation.

In [None]:
class BaseBlock(nn.Module):

    """
    Convolution module implementation: 2D dilated Convolution 
    followed by a batch normalization layer and a leaky RELU
    activation function.
    """

    def __init__(self, in_channels, out_channels, s):

        """
        Constructor

        :param in_channels: Shape of the input tensor
        :param out_channes: Shape of the output tensor
        :param s: Dilation scale

        :type in_channels: int
        :type out_channels: int
        :type s: int
        """

        super(BaseBlock, self).__init__()

        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, 
          padding = 2**(s-1), dilation = 2**(s-1), bias=True)

        self.conv_abn = AdaptiveBatchNorm2d(out_channels)
        self.LReLU = nn.LeakyReLU(0.2, inplace = True)


    def forward(self, x):

        """
        Forward pass in the convolution module

        :param x: Input tensor, with size BxCxWxH batch canal width height
        :type x: PyTorch tensor

        :return: Transformed tensor
        :rtype: PyTorch tensor
        """

        return self.LReLU(self.conv_abn(self.conv(x)))





*Question 2.3:* A partir des deux classes que vous venez d'implémenter, construisez le réseau de neurone proposé pour la détection des particules. L'utilisateur devra en particulier pouvoir spécifier le nombre de modules de base dans le réseau, ainsi que le nombre de canaux des tenseurs des couches intermédiaires.

In [None]:
class Net(nn.Module):

    """
    Implementation of the neural network architecture
    """

    def __init__(self, d=7, w=24):

        """
        Class constructor

        :param d: number of blocks in the architecture
        :type d: int
        """

        super(Net, self).__init__()
        self.first_layer = BaseBlock(3, w, 1)  #3 input channels
        self.intermediate_layers = nn.ModuleList([BaseBlock(w, w, s) for s in range(2, d)])   
        self.final_layer = nn.Conv2d(w, 2, kernel_size=1, bias=True)  #2 output channels
        


    def forward(self, x):

        """
        Forward pass in the convolutional network

        :param x: Input tensor with size Bx3xWxH
         (B: batch size, 3: number of channels, W: image width, H: image height)
        :type x: PyTorch tensor

        :return: Transformed tensor
        :rtype: PyTorch Tensor
        """

        x = self.first_layer(x)
        for l in self.intermediate_layers:
            x = l(x)
        x = self.final_layer(x)
        return x


    def num_flat_features(self, x):

        """
        Size of the flattened tensor

        :param x: Input tensor with size BxCxWxH
         (B: batch size, C: number of channels, W: image width, H: image height)
        :type x: PyTorch tensor

        :return: CxWxH
        :rtype: int
        """

        size = x.size()[1:] 
        num_features = 1
        for s in size:
            num_features *= s

        return num_features


### Partie 3: entraînement du réseau 

*Question 3.1:* Pour chaque batch, implémenter dans les fonctions "train" et "test" ci-dessous:
1. la propagation directe des images du batch dans le réseau (train et test)
2. la retro-propagation du gradient (train)
3. la mise à jour des poids du réseau (test)

In [None]:
def train(epoch):

    """
    Train the network

    :param epoch: current epoch
    :type epoch: int
    """

    epoch_loss = []
    for i, sample in enumerate(trainloader):
                
        # Load batch
        input_img = sample['input'].to(device)
        segm_true = sample['target'].to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward/Backward pass
        outputs = net(input_img)
        loss = criterion(outputs, segm_true)
        loss.backward()
        epoch_loss.append(loss.item())

        # Weights update
        optimizer.step()

    # Write the training loss in a .dat file
    training_loss = np.mean(np.array(epoch_loss))
    print('Training loss: ' + str(training_loss))
    with open('training_loss.dat', "a") as f:
        f.write('%d %.15f\n' % (epoch + 1, training_loss))

    # Save weights
    torch.save(net.state_dict(), os.path.join(weights_path, str(epoch) + '.pth'))
    



def validate(epoch):

    """
    Test the network on the validation set

    :param epoch: current epoch
    :type epoch: int
    """
    epoch_loss = []
    for i, sample in enumerate(validationloader):
                
        # Load batch
        input_img = sample['input'].to(device)
        segm_true = sample['target'].to(device)

        # Forward/Backward pass
        outputs = net(input_img)
        loss = criterion(outputs, segm_true)
        epoch_loss.append(loss.item())

    validation_loss = np.mean(np.array(epoch_loss))
    print('Validation loss: ' + str(validation_loss))
    with open('val_loss.dat', "a") as f:
        f.write('%d %.15f\n' % (epoch + 1, loss.item()))


*Question 3.2:* Implémenter l'entraînement du réseau et entraîner le réseau avec des valeurs de $d=5$, $d=6$ et $d=7$. Tracer la fonction de perte pour l'entraînement et la validation.

In [None]:
# Training and architecture parameters
d = 7
w = 24
dict_weights = None
batch_size = 16
start_epoch = 0
num_epochs = 80
learning_rate = 0.005
divide = 2.  
each = 20

In [None]:
# Load the training/validation data
train_dataset = SegmentationDataset(
    root_dir= root_dir,
    input_dir= train_dir,
    target_dir= target_dir,
    transform=transforms.Compose([
      RandomCrop(128),
      Distort(),
      SetTarget(),
      Normalize(),
      ToTensor()]))

trainloader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size = batch_size,
    shuffle = True,
    num_workers = 0)

# Test set
val_dataset = SegmentationDataset(
    root_dir= root_dir,
    input_dir= val_dir,
    target_dir= target_dir,
    transform=transforms.Compose([
       RandomCrop(128),
       SetTarget(),
       Normalize(),
       ToTensor()]))

validationloader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size = batch_size,
    num_workers = 0)

In [None]:
# Select a device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device: ", device)
    
# Constructs the neural network
net = Net(d, w)
net.to(device)
if(dict_weights != None):
    net.load_state_dict(torch.load(dict_weights))

weights_path = './weights'
loss_path = './output'
if not os.path.exists(weights_path):
    os.makedirs(weights_path)
if not os.path.exists(loss_path):
    os.makedirs(loss_path)

In [None]:
# Optimizer
import torch.optim as optim
criterion = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr = learning_rate)

In [None]:
for epoch in range(start_epoch, num_epochs):

        print("Epoch: " + str(epoch))
        if (epoch % each == 0):
            for param_group in optimizer.param_groups:
                param_group['lr'] /= divide
                
        train(epoch)
        validate(epoch)
        
train_err = np.loadtxt("training_loss.dat")[:, 1]
val_err = np.loadtxt("val_loss.dat")[:, 1]
plt.figure()
plt.plot(train_err, label='train')
plt.plot(val_err, label='val')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Error')
plt.show()

In [None]:
train_err = np.loadtxt("training_loss.dat")[100:, 1]
val_err = np.loadtxt("val_loss.dat")[100:, 1]
plt.figure()
plt.plot(train_err, label='train')
plt.plot(val_err, label='val')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Error')
plt.show()

In [None]:
import numpy as np
train_err = np.loadtxt("training_loss.dat")[100:, 1]
val_err = np.loadtxt("val_loss.dat")[100:, 1]
plt.figure()
plt.plot(train_err, label='train')
plt.plot(val_err, label='val')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Error')
plt.show()
print(np.argmin(val_err))

### Partie 4: test du réseau 

*Question 4.1:* Le code ci-dessous permet de visualiser les résultats du réseau de neurones sur la base de test. En vous appuyant sur l'erreur de validation, sélectionnez un des fichiers de poids que vous avez obtenus durant l'entraînement et lancez l'évaluation. 

In [None]:
def display(img, output, target):

    img = (255*img.data.cpu().numpy()[0]).astype('int')
    output = (255*output.data.cpu().numpy()[0]).astype('int')
    target = (255*target.data.cpu().numpy()[0]).astype('int')
    
    fig, ax = plt.subplots(1,3, figsize=(10, 5), sharex=True, sharey=True)

    target=np.transpose(target, (2, 1, 0) )
    output=np.transpose(output, (2, 1, 0) )

    # Gaussian mask
    GaussMask=output[...,1]
    from skimage.feature import peak_local_max
    from skimage.filters import gaussian
    GaussMask = gaussian(GaussMask, sigma = 2)
    GaussMask = ( GaussMask / np.max(GaussMask) ) * 255
    coordinates = peak_local_max(GaussMask,min_distance=7,threshold_abs=100)

    #target segmentation
    ax[0].set_title("Segmentation mask")
    ax[0].imshow(output[...,0]>180.)

    #input image
    ax[1].set_title("Input image")
    ax[1].imshow(np.transpose(img, (2, 1, 0)))

    #Network segmentation
    ax[2].set_title("Network segmentation mask")
    ax[2].imshow(np.transpose(img, (2, 1, 0)))  #>210
    ax[2].plot(coordinates[:, 1], coordinates[:, 0], 'b.')

    for a in ax.ravel():
        a.set_axis_off()
    plt.tight_layout()
    plt.show()


In [None]:
# Paths
test_dir = 'test'

# Select a device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Parameters
dict_weights = './weights/199.pth'
batch_size = 1

# Test set
test_dataset = SegmentationDataset(
    root_dir= root_dir,
    input_dir= test_dir,
    target_dir= target_dir,
    transform=transforms.Compose([
       Crop(256),
       SetTarget(),
       Normalize(),
       ToTensor()]))

testloader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size = batch_size,
    num_workers = 0)

# Constructs the neural network
net = Net(d, w)
net.to(device)
net.load_state_dict(torch.load(dict_weights))

for i, sample in enumerate(testloader):
                
    # Load batch
    input_img = sample['input'].to(device)
    segm_true = sample['target'].to(device)

    # Forward/Backward pass
    outputs = net(input_img)

    # Display
    display(input_img, outputs, segm_true)