# Semantic Segmantation for cars driving data

## Importing Data

In [1]:
import torch as tch
import numpy as np
import plotly.graph_objects as go
from pathlib import Path

In [3]:
# Loading tensor

path_tensor = Path("%pwd").resolve().parent / "data" / "training_tensor.pt"

training_tensor = tch.load(path_tensor)

FileNotFoundError: [Errno 2] No such file or directory: '/home/jupyter/sem_seg_cars_driving/cnn/data/training_dataset.pt'

In [2]:
from torch.utils.data import Dataset
class ImageMaskDataset(Dataset):
    def __init__(self, data_tensor):
        self.data = data_tensor

    def __len__(self):
        return self.data.shape[1]  # Nombre d'exemples dans le tensor data

    def __getitem__(self, index):
        # Extraire l'image et le masque correspondant à l'index donné
        image = self.data[0, index]  # Première dimension pour les images
        mask = self.data[1, index]   # Deuxième dimension pour les masques
        
        return image, mask

In [None]:
# Splitting data into training/test datasets

training_data, test_data = ImageMaskDataset(training_tensor[:,:160]), ImageMaskDataset(training_tensor[:,160:])

In [None]:
# # Shows if everything is fine in test_data

# ind = np.random.randint(0, 40, 2)
# fig = go.Figure()
# c=0
# for i in ind :
#     # Transpose back the images to the good dimensions
#     image, mask = test_data[i]
#     fig.add_trace(go.Image(z=np.concatenate((image.permute(1, 2, 0)*255, mask.permute(1, 2, 0)*255),1), y0 = c*h_size))
# c+=1
# fig.show()

As the model will be a custom U-Net, the batch will be one image at a time and we will not work on patches as it is said in their article.

In [None]:
# Création du DataLoader
batch_size = 1
data_loader = tch.utils.data.DataLoader(training_data, batch_size=batch_size, shuffle=True)

# Affichage de la taille du DataLoader, number of batch in our dataloader
print(len(data_loader))

## Building the CNN

### Creating the CNN

Our model is a custom U-Net Model. We decided to suppress the last couple of layer composed of a maxpool, a double-down convolution, an upsampling, a stacking, a double-up convolution layers. We adapted a bit the hyperparameters when needed in order to make everything work back together. 

We hope that it accelerates the computing without loosing too much accuracy. We may increase the number of channels through the model as we saved some layers. We will see that in the fine-tuning part.

In [None]:
import torch.nn as nn

class DoubleConv(nn.Module): # Creating a class merging the double conv
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size = 3, padding = 1),          # X_out=X_in cf formula applied with these parameters' values
            nn.BatchNorm2d(out_channels),                                                # keeps size
            nn.ReLU(inplace=True),                                                       # keeps size 
            nn.Conv2d(out_channels, out_channels, kernel_size = 3, padding = 1),         
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )                                                                                # Keeps the same image size of the input

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        self.dconv_down1 = DoubleConv(in_channels, 64)        # keeps image size 
        self.dconv_down2 = DoubleConv(64, 128)                # keeps image size 
        self.dconv_down3 = DoubleConv(128, 256)               # keeps image size 
        self.dconv_down4 = DoubleConv(256, 512)               # keeps image size 
        
        self.maxpool = nn.MaxPool2d(kernel_size = 2)          # X_out=int((X_in/2) + 1)   # Caution : default stride is equal to kernel-size here
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)  # X_out=int(X_in*2)       # Fasten the process as it hasn't to learn weights unlike the convtranspose (which is so )
        
        self.dconv_up3 = DoubleConv(256 + 512, 256)          # keeps image size 
        self.dconv_up2 = DoubleConv(128 + 256, 128)          # keeps image size
        self.dconv_up1 = DoubleConv(128 + 64, 64)            # keeps image size

        self.conv_last = nn.Conv2d(64, out_channels, 1)      # keeps image size

    def forward(self, x): 
        conv1 = self.dconv_down1(x)          
        x = self.maxpool(conv1)     

        conv2 = self.dconv_down2(x)          
        x = self.maxpool(conv2)     

        conv3 = self.dconv_down3(x)          
        x = self.maxpool(conv3)     

        x = self.dconv_down4(x)    
        x = self.upsample(x)        
        # print('La taille de x est ', x.shape, 'et la taille de conv3 est ', conv3.shape)
        x = tch.cat([x, conv3], dim=1) 

        x = self.dconv_up3(x)
        x = self.upsample(x)
        # print('La taille de x est ', x.shape, 'et la taille de conv2 est ', conv2.shape)
        x = tch.cat([x, conv2], dim=1)

        x = self.dconv_up2(x)
        x = self.upsample(x)
        #  print('La taille de x est ', x.shape, 'et la taille de conv1 est ', conv3.shape)
        x = tch.cat([x, conv1], dim=1)

        x = self.dconv_up1(x)
        out = self.conv_last(x)
        return out
    

unet_model = UNet(in_channels = 3, out_channels = 3)

We had a problem in the concatening steps, the size doesn't match. After investigating the problem, we found that at a step the height size became odd and so the division by 2 let us loss a range of pixel on the height and after that it creates an offset with the mutltiplication so instead of cropping twice inside the model at each stacjking steep with an issue, we found it better to crop 2 range of pixels from each image on the height side in the dataset's generation, hence the height will be more divisible by 2 (more 2 in the prime factors decomposition).

Also in that case, we could change our custom Unet model back to the usual one if needed without changing more the dataset.

In [None]:
# # Testing the architecture
# unet_model.eval()

# image, mask = test_data[0]

# with tch.no_grad():
#     predictions = unet_model(image.unsqueeze(0))
# fig = go.Figure()
# fig.add_trace(go.Image(z=np.concatenate((image.squeeze(0).permute(1, 2, 0)*255, (predictions.squeeze(0)).permute(1, 2, 0)*255),1)))

### Training

In [None]:
# Définir la fonction de perte (criterion) et l'optimiseur
criterion = nn.CrossEntropyLoss()
optimizer = tch.optim.Adam(unet_model.parameters(), lr=0.001)

unet_model.train()

n_epoch = 10


for epoch in range(n_epoch) :
    running_loss = 0.0
    for image, mask in data_loader :
        # Remettre à zéro les gradients
        optimizer.zero_grad()

        pred = unet_model(image)

        # Calculate the loss
        loss = criterion(pred, mask)

        # Backpropagation and update of the weights
        loss.backward()
        optimizer.step()

        # Calculate the whole loss of the epoch
        running_loss += loss.item()

    # Afficher la perte moyenne de l'époque
    print(f"Epoch [{epoch+1}/{n_epoch}], Loss: {running_loss/len(data_loader)}")


In [None]:
# PATH = Path("%pwd").resolve().parent / 'unet_model.pt'

In [None]:
# Model saving
# tch.save(unet_model.state_dict(), PATH)

In [None]:
# # Model Loading
# unet_model = UNet(in_channels = 3, out_channels = 3)
# unet_model.load_state_dict(tch.load(PATH))

In [None]:
# unet_model.eval()

# image, mask = test_data[1]

# with tch.no_grad():
#     predictions = unet_model(image.unsqueeze(0))
# fig = go.Figure()
# fig.add_trace(go.Image(z=np.concatenate((image.squeeze(0).permute(1, 2, 0)*255, (predictions.squeeze(0)).permute(1, 2, 0)*255),1)))

### Evaluation

In [None]:
len(test_data)

In [None]:
# Évaluation sur l'ensemble de test
total_correct = 0
total_samples = 0
with tch.no_grad():
    for i in len(test_data) :
        image, mask = test_data[0,i], test_data[1,i]
        outputs = unet_model(image)
        _, predicted = tch.max(outputs, 1)
        total_correct += (predicted == mask).sum().item()
        total_samples += mask.size(0)

In [None]:
# Calculer la précision
accuracy = total_correct / total_samples
print(f"Accuracy on test set: {accuracy}")