In [1]:
import tifffile
from skimage import io
from PIL import Image
import cv2
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import pandas as pd

import torch
from torch import nn
import torch.nn.functional as F

from glob import glob
import os.path as osp
import numpy as np
import random
from tqdm import tqdm

from sklearn.model_selection import train_test_split

In [2]:
BASE_PATH='input/breast-cancer-cell-segmentation/'
# Concat path
IMAGES_PATH = osp.join(BASE_PATH, 'Images')
LABELS_PATH = osp.join(BASE_PATH, 'Masks')

In [3]:
# Get all tif files
imgs_paths = glob(osp.join(IMAGES_PATH,"*.tif"))

In [15]:
# Get all mask label
masks_paths = [osp.join(LABELS_PATH, i.rsplit("/",1)[-1].split("_ccd")[0]+".TIF") for i in imgs_paths]

In [5]:
# Couple img/mask
img_mask_tuples = list(zip(imgs_paths, masks_paths))

# Random
random.shuffle(img_mask_tuples)

# Split training set 75/25
train_tuples, test_tuples = train_test_split(img_mask_tuples)

In [17]:
def get_tiff_image(path, normalized=True, resize=(512, 512)):
    """
    Read, preprocess, and optionally normalize a TIFF image.

    Parameters:
    - path (str): The file path to the TIFF image that needs to be loaded and processed.
    - normalized (bool, optional): A flag indicating whether the image should be normalized or not. Default is True.
    - resize (tuple, optional): A tuple specifying the target dimensions (width, height) for resizing the image. Default is (512, 512).

    Returns:
    - If normalized=True, the function returns the preprocessed image with pixel values scaled between 0 and 1 (normalized).
    - If normalized=False, the function returns the preprocessed image without normalization, retaining its original pixel values.
    """
    # Step 1: Read the TIFF image from the specified path
    image = io.imread(path)
    
    # Step 2: Convert the image from BGR to RGB format
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Step 3: Resize the image to the specified dimensions
    image = cv2.resize(image, resize)
    
    # Step 4: Normalize the image if 'normalized' is True
    if normalized:
        return image / 255.0
    
    # Step 5: If 'normalized' is False, return the original image
    return image


In [19]:
class BCDataset(torch.utils.data.Dataset):
    """
    Custom PyTorch dataset class for binary classification of images and masks.

    Parameters:
    - img_mask_tuples (list or tuple): A list of tuples, where each tuple contains the file paths to an image and its corresponding mask.

    Methods:
    - __init__(self, img_mask_tuples): The class constructor, initializes the dataset with the provided image-mask tuples.
    - __len__(self): Returns the total number of image-mask tuples in the dataset.
    - __getitem__(self, idx): Fetches and preprocesses the image-mask pair at the given index.
    """
    def __init__(self, img_mask_tuples):
        """
        Constructor method to initialize the dataset with image-mask tuples.

        Parameters:
        - img_mask_tuples (list or tuple): A list of tuples, where each tuple contains the file paths to an image and its corresponding mask.
        """
        self.img_mask_tuples = img_mask_tuples
        
    def __len__(self):
        """
        Returns the total number of image-mask tuples in the dataset.

        Returns:
        - int: The number of image-mask tuples in the dataset.
        """
        return len(self.img_mask_tuples)
    
    def __getitem__(self, idx):
        """
        Fetches and preprocesses the image-mask pair at the given index.

        Parameters:
        - idx (int): Index of the image-mask pair to retrieve.

        Returns:
        - torch.Tensor, torch.Tensor: A tuple containing the preprocessed image and mask as Torch Tensors.
        """

        # Step 1: Fetch the file paths of the image and mask at the given index
        img_path, mask_path = self.img_mask_tuples[idx]
        
        # Step 2: Load and preprocess the image using the 'get_tiff_image' function
        image = get_tiff_image(img_path)
        
        # Step 3: Load and preprocess the mask using the 'get_tiff_image' function with normalization disabled
        mask = get_tiff_image(mask_path, normalized=False)
        
        # Step 4: Convert the mask to a binary mask by thresholding (values > 0 become 1, otherwise 0)
        mask[mask > 0] = 1
        
        return image, mask


In [8]:
train_dataset = BCDataset(train_tuples)
test_dataset = BCDataset(test_tuples)

In [9]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=8, shuffle=False)

# UNET

In [23]:
import torch.nn.functional as F
import torch.nn as nn

# Classe DoubleConv
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()

        # Si mid_channels n'est pas spécifié, le nombre de canaux intermédiaires est égal au nombre de canaux de sortie.
        if not mid_channels:
            mid_channels = out_channels

        # Définition de la séquence de couches pour DoubleConv
        # La séquence comprend deux convolutions suivies de batch normalization (BN) et de ReLU.
        # La première convolution réduit le nombre de canaux à 'mid_channels'.
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            # La deuxième convolution ramène le nombre de canaux à 'out_channels' (nombre final de canaux de sortie).
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Passage avant de la couche DoubleConv
        # L'entrée 'x' passe à travers les deux convolutions suivies de BN et ReLU, puis la sortie est renvoyée.
        return self.double_conv(x)


# Classe Down
# Le downscaling ou réduction d'échelle est un processus de 
# diminution de la résolution spatiale d'une image ou d'une représentation numérique.
# Cela signifie qu'une image de départ, qui peut être de grande taille, est réduite 
# en taille en diminuant le nombre de pixels qu'elle contient.
# Le résultat est une image plus petite avec une résolution spatiale inférieure, ce 
# qui permet de réduire la complexité du traitement et de conserver uniquement les informations les plus importantes de l'image.
class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()

        # Utilisation de MaxPool suivi de DoubleConv pour le downscaling
        # MaxPool2d effectue un échantillonnage vers le bas en utilisant une fenêtre de 2x2 avec un décalage de 2 pixels.
        # Il réduit la taille spatiale de l'entrée de moitié.
        # Ensuite, l'objet DoubleConv est utilisé pour appliquer deux convolutions pour extraire des fonctionnalités.
        # DoubleConv est défini dans une autre classe et comprend deux convolutions, suivies de batch normalization et ReLU.
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        # Passage avant de la couche Down
        # L'entrée 'x' est soumise à la séquence de MaxPool et DoubleConv, puis la sortie est renvoyée.
        return self.maxpool_conv(x)
        
# Classe Up
# L'upscaling ou augmentation d'échelle est le processus inverse du downscaling.
# Il consiste à augmenter la résolution spatiale d'une image ou d'une représentation numérique.
# Cela est généralement fait en utilisant des techniques d'interpolation pour estimer les valeurs
# des nouveaux pixels ajoutés à l'image agrandie. L'upscaling est souvent utilisé pour améliorer
# la résolution d'une image, mais il est important de noter que cela ne restaure pas les détails perdus lors du downscaling.
class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # Si bilinear=True, utilise les convolutions normales pour réduire le nombre de canaux
        if bilinear:
            # Upsample avec le mode bilinéaire pour augmenter la taille spatiale de l'entrée de 2 fois
            # et align_corners=True pour assurer l'alignement des coins pendant l'interpolation.
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

            # DoubleConv avec 'in_channels' en entrée, 'out_channels' en sortie et 'in_channels // 2' pour le nombre de canaux intermédiaires.
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            # Si bilinear=False, utilise la transposition de convolution pour augmenter la taille spatiale de l'entrée de 2 fois.
            self.up = nn.ConvTranspose2d(in_channels , in_channels // 2, kernel_size=2, stride=2)

            # DoubleConv avec 'in_channels' en entrée et 'out_channels' en sortie.
            self.conv = DoubleConv(in_channels, out_channels)


    def forward(self, x1, x2):
        # Upsample de x1 à la taille de x2 et concaténation des deux
        x1 = self.up(x1)

        # L'entrée est de la forme CHW, calcul des différences de taille spatiale entre x2 et x1.
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        # Application de padding pour que les deux tensors aient les mêmes dimensions.
        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])

        # Concaténation le long de la dimension des canaux (dim=1) pour fusionner les informations des deux entrées.
        x = torch.cat([x2, x1], dim=1)

        # Passage avant de la couche Up, c'est-à-dire la couche de convolution double pour combiner les caractéristiques.
        return self.conv(x)


# Classe UNet
class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        # Définition des couches du modèle UNet
        # L'architecture UNet comprend une partie de descente (downsampling) et une partie d'ascension (upsampling).

        # Couche d'entrée
        self.inc = DoubleConv(n_channels, 64)

        # Couches de descente (downsampling)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)

        # Couches d'ascension (upsampling)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)

        # Couche de sortie
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        # Passage avant du modèle UNet en utilisant les couches définies

        # Couche d'entrée
        x1 = self.inc(x)

        # Couches de descente (downsampling)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)

        # Couches d'ascension (upsampling)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)

        # Couche de sortie
        logits = self.outc(x)
        return logits

In [11]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = UNet(n_channels=3, n_classes=1)
model.to(device);

In [12]:
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.02, momentum=0.5)

def dice_loss(inputs, target):
    inputs = torch.sigmoid(inputs)
    smooth = 1.
    iflat = inputs.contiguous().view(-1)
    tflat = target.contiguous().view(-1)
    intersection = (iflat * tflat).sum()
    loss = 1 - ((2. * intersection + smooth) / (iflat.sum() + tflat.sum() + smooth))
    return loss

# Training

In [13]:
for epoch in tqdm(range(100)):
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        images = images.permute(0,3,2,1).float()
        labels = labels.permute(0,3,2,1).float()
        labels = labels.sum(1,keepdim=True).bool().float()

        output = model(images)
        
        # process output by a threshold
        #output = torch.sigmoid(output)
        #output[output >= 0.5] = 1
        #output[output < 0.5] = 0

        loss = criterion(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

  2%|███▎                                                                                                                                                                  | 2/100 [2:27:56<120:49:20, 4438.37s/it]


KeyboardInterrupt: 

In [None]:
# process output by a threshold
output = torch.sigmoid(output)
output[output >= 0.5] = 1
output[output < 0.5] = 0

In [None]:

batch_index=0
print("image")
plt.imshow(images.cpu().detach().numpy()[batch_index,0])
plt.show()
print("output")
plt.imshow(output.cpu().detach().numpy()[batch_index,0])
plt.show()
print("label")
plt.imshow(labels.cpu().detach().numpy()[batch_index,0])
plt.show()