# Setup

In [4]:
import pandas as pd
import os
import sys

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset')))

data_path = '../dataset/archive/BBBC005_v1_images/BBBC005_v1_images'

def load_data_image(folder=data_path):
    
    # folder = 'dataset/archive/BBBC005_v1_images'
    img_list = os.listdir(folder)
    img_list.remove('.htaccess') # remove this file
    

    def get_num_cells(x):
        #SIMCEPImages_A13_C53_F1_s09_w2.TIF -> C53

        a = x.split('_') # e.g. ['SIMCEPImages', 'A13', 'C53', 'F1', 's09', 'w2.TIF']
        b = a[2] # e.g. C53
        num_cells = int(b[1:]) # e.g. 53
        
        return num_cells
        
    df = pd.DataFrame({'image_id': img_list})
    df['image_id'] = df[df['image_id'] != '.htaccess']
    df['nb_cells'] = df['image_id'].apply(get_num_cells)
    
    return df


# DATALOADER

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import pandas as pd
from sklearn.model_selection import train_test_split
from PIL import Image

class CustomImageDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame avec deux colonnes : 'path' et 'label'.
            transform (callable, optional): Transformations à appliquer aux images.
        """
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # Charger l'image
        img_path = self.dataframe.iloc[idx, 0]  # Supposons que la première colonne est 'path'
        label = self.dataframe.iloc[idx, 1]# Supposons que la deuxième colonne est 'label'
        
        image = Image.open(data_path + '/' + img_path)
        # Appliquer les transformations, si elles existent
        if self.transform:
            image = self.transform(image)
        
        label = torch.tensor(label , dtype = torch.float32)
        return image, label

df_data = load_data_image()

# Proportions pour train, valid, test
train_ratio = 0.7
valid_ratio = 0.1
test_ratio = 0.2

# Étape 1 : Diviser entre train et temp (valid+test)
train_df, temp_df = train_test_split(df_data, test_size=(1 - train_ratio), random_state=42, shuffle=True)

# Étape 2 : Diviser temp entre valid et test
valid_df, test_df = train_test_split(temp_df, test_size=(test_ratio / (test_ratio + valid_ratio)), random_state=42)

IMG_HEIGHT = 128
IMG_WIDTH = 128

transform = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.ToTensor(),
])



train_dataset = CustomImageDataset(dataframe=train_df, transform=transform)
valid_dataset = CustomImageDataset(dataframe=valid_df, transform=transform)
test_dataset = CustomImageDataset(dataframe=test_df, transform=transform)


batch_size = 16 # adapté à la taille de la mémoire
torch.manual_seed(42)
train_dataloader= DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)

print(f'Data Loaded')

Data Loaded


# MODEL

In [13]:
conv_layers=3 # Number of convolutional layers
stride = 1 # Stride for each convolutional layer
padding = 1 # Padding for each convolutional layer
dilation = 1 # Dilation for each convolutional layer 

filters_per_layer=[64,128,256] # Number of filters per convolutional layer
kernel_sizes=[5,10,5] # Kernel size for each convolutional layer
pool_size=2 # Pooling size
dropout_rate_conv=0.2 # Dropout rate for each convolutional layers


dense_layers=1 # Number of dense layers
dense_units=[1] # Number of units per dense layer
dropout_rate_fc = [0.5] # Dropout rate for dense layer

alpha = 0.1 # alpha of the LeakyReLU

In [14]:
import torch
import torch.nn as nn
from torchsummary import summary

class CNN(nn.Module):
    def __init__(self, 
                 nb_conv_layers,
                 stride,
                 padding,
                 dilation, 
                 filters_per_layer, 
                 kernel_sizes,
                 pool_size, 
                 dropout_rate_conv,
                 nb_dense_layers, 
                 dense_units, 
                 dropout_rate_fc,
                 alpha,
                 img_height,
                 img_width):
        
        super(CNN, self).__init__()
        self.nb_conv_layers = nb_conv_layers
        self.nb_dense_layers = nb_dense_layers
        
        self.conv_layers = nn.ModuleList()
        self.pool_layers = nn.ModuleList()
        self.dropout_conv = nn.ModuleList()
        self.bn2 = nn.ModuleList()
        self.dense_layers = nn.ModuleList()
        self.dropout_fc = nn.ModuleList()
        self.relu = nn.LeakyReLU(negative_slope=alpha)

        # Convolutional layers
        in_channels = 1  # Nombre de canaux d'entrée (ex. : images en niveaux de gris)
        for i in range(nb_conv_layers):
            conv_layer = nn.Conv2d(in_channels=in_channels, 
                                   out_channels=filters_per_layer[i], 
                                   kernel_size=kernel_sizes[i],
                                   stride=stride,
                                   padding=padding,
                                   dilation=dilation)
            self.conv_layers.append(conv_layer)
            self.bn2.append(nn.BatchNorm2d(filters_per_layer[i]))
            self.pool_layers.append(nn.MaxPool2d(pool_size))
            self.dropout_conv.append(nn.Dropout(dropout_rate_conv))
            in_channels = filters_per_layer[i]

        # Calculate the flattened feature map size
        self.flattened_size = self._get_flattened_size(img_height, img_width, kernel_sizes, stride, padding, dilation, pool_size)

        # Dense layers
        for i in range(nb_dense_layers):
            if i == 0:
                dense_layer = nn.Linear(self.flattened_size, dense_units[i])
            else:
                dense_layer = nn.Linear(dense_units[i - 1], dense_units[i])
            self.dense_layers.append(dense_layer)
            self.dropout_fc.append(nn.Dropout(dropout_rate_fc[i]))

    def _get_flattened_size(self, height, width, kernel_sizes, stride, padding, dilation, pool_size):
        for i in range(self.nb_conv_layers):
            height = (height + 2 * padding - dilation * (kernel_sizes[i] - 1) - 1) // stride + 1
            width = (width + 2 * padding - dilation * (kernel_sizes[i] - 1) - 1) // stride + 1
            if i % 2 == 1:
                height //= pool_size
                width //= pool_size
        return height * width * filters_per_layer[-1]

    def forward(self, x):
        # Convolutional layers
        for i in range(self.nb_conv_layers):
            x = self.conv_layers[i](x)
            x = self.bn2[i](x)
            x = self.relu(x)
            if i % 2 == 1:
                x = self.pool_layers[i](x)
            x = self.dropout_conv[i](x)
            
        # Flatten
        x = x.view(x.size(0), -1)

        # Dense layers
        for i in range(self.nb_dense_layers - 1):
            x = self.dense_layers[i](x)
            x = self.relu(x)
            x = self.dropout_fc[i](x)
        x = self.dense_layers[-1](x)
        return x

model = CNN(nb_conv_layers = conv_layers,
            stride = stride,
            padding = padding,
            dilation = dilation,
            filters_per_layer = filters_per_layer,
            kernel_sizes = kernel_sizes,
            pool_size = pool_size,
            dropout_rate_conv = dropout_rate_conv,
            nb_dense_layers = dense_layers,
            dense_units = dense_units,
            dropout_rate_fc = dropout_rate_fc,
            alpha = alpha,
            img_height = IMG_HEIGHT,
            img_width = IMG_WIDTH)

print(f'Model instantiated')

model = model.to("cpu")
summary(model, (1,IMG_HEIGHT,IMG_WIDTH))

Model instantiated
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 126, 126]           1,664
       BatchNorm2d-2         [-1, 64, 126, 126]             128
         LeakyReLU-3         [-1, 64, 126, 126]               0
           Dropout-4         [-1, 64, 126, 126]               0
            Conv2d-5        [-1, 128, 119, 119]         819,328
       BatchNorm2d-6        [-1, 128, 119, 119]             256
         LeakyReLU-7        [-1, 128, 119, 119]               0
         MaxPool2d-8          [-1, 128, 59, 59]               0
           Dropout-9          [-1, 128, 59, 59]               0
           Conv2d-10          [-1, 256, 57, 57]         819,456
      BatchNorm2d-11          [-1, 256, 57, 57]             512
        LeakyReLU-12          [-1, 256, 57, 57]               0
          Dropout-13          [-1, 256, 57, 57]               0
           Linear-14

# TRAINING

In [15]:
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
model.to(device)

print("device : ",device)

device :  mps


In [16]:
import torch.optim as optim
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.1, verbose=True)




In [17]:
import numpy as np

n_epochs = 70  # Number of epochs to train the model

def training(n_epochs, train_dataloader, valid_dataloader, model, criterion, optimizer, scheduler=None):
    train_losses, valid_losses = [], []
    valid_loss_min = np.inf  # Initialize minimum validation loss as infinity
    best_model_state = None  # Variable to store the best model state

    for epoch in range(n_epochs):
        train_loss, valid_loss = 0.0, 0.0  # Reset running losses

        # --- Training Phase ---
        model.train()  # Set model to training mode
        for data, label in train_dataloader:
            # Move data and labels to the correct device
            data = data.to(device)
            label = label.to(device)
            label = label.unsqueeze(1)  # Ensure label shape matches model output

            # Zero gradients
            optimizer.zero_grad()
            
            # Forward pass
            output = model(data)
            loss = criterion(output, label)  # Compute loss
            
            # Backward pass
            loss.backward()
            
            # Gradient clipping to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            # Optimizer step
            optimizer.step()
            
            # Accumulate training loss
            train_loss += loss.item() * data.size(0)

        # --- Validation Phase ---
        model.eval()  # Set model to evaluation mode
        with torch.no_grad():  # Disable gradient computation for validation
            for data, label in valid_dataloader:
                data = data.to(device)
                label = label.to(device)
                label = label.unsqueeze(1)

                # Forward pass
                output = model(data)
                loss = criterion(output, label)
                
                # Accumulate validation loss
                valid_loss += loss.item() * data.size(0)

        # Calculate average losses for the epoch
        train_loss /= len(train_dataloader.dataset)
        valid_loss /= len(valid_dataloader.dataset)
        train_losses.append(train_loss)
        valid_losses.append(valid_loss)

        # Print epoch statistics
        current_lr = optimizer.param_groups[0]['lr']
        print(f"Epoch: {epoch+1}, LR: {current_lr:.6e}, "
              f"Training Loss: {train_loss:.6f}, Validation Loss: {valid_loss:.6f}")

        # Check if validation loss improved
        if valid_loss <= valid_loss_min:
            print(f"Validation loss decreased ({valid_loss_min:.6f} --> {valid_loss:.6f}). Saving model ...")
            best_model_state = model.state_dict()
            torch.save(best_model_state, '../model/ccn_magique.pt')
            valid_loss_min = valid_loss

        # Step the scheduler (if any)
        if scheduler:
            scheduler.step(valid_loss)

    return train_losses, valid_losses, best_model_state

In [18]:
print(f'Start Training')



train_losses, valid_losses , best_model_state = training(n_epochs, train_dataloader, valid_dataloader, model, criterion, optimizer,scheduler)

Start Training


KeyboardInterrupt: 

# TEST

In [19]:
def evaluation(model, test_dataloader, criterion):

    # initialize lists to monitor test loss and accuracy
    test_loss = 0.0
    error = 0.0

    model.eval() # prep model for evaluation
    for data, label in test_dataloader:
        data = data.to(device=device, dtype=torch.float32)
        label = label.to(device=device, dtype=torch.float32)
        
        # unsqueeze the label
        label = label.unsqueeze(0)
        
        with torch.no_grad():
            output = model(data) # forward pass: compute predicted outputs by passing inputs to the model
            loss = criterion(output,label)
        error += torch.abs(output - label)
        test_loss += loss.item()*data.size(0)

    # calculate and print avg test loss
    test_loss = test_loss/test_dataset.__len__()
    mean_error = error/test_dataset.__len__()
    print('test Loss: {:.6f}\n'.format(test_loss))
    print('Mean Error of Prediction: {:.6f}'.format(mean_error.item()))

In [21]:
best_model_state = torch.load('../model/ccn_model_magique.pt' , map_location=device)

model.load_state_dict(best_model_state)
evaluation(model,test_dataloader,criterion)

  best_model_state = torch.load('../model/ccn_model_magique.pt' , map_location=device)


KeyboardInterrupt: 

In [None]:
def l_plot(model, test_dataloader):

    # initialize lists to monitor test loss and accuracy
    y_prediction = []
    x_label = []
    model.eval() # prep model for evaluation
    for data, label in test_dataloader:
        data = data.to(device=device, dtype=torch.float32)
        label = label.to(device=device, dtype=torch.float32)
        
        # unsqueeze the label
        label = label.unsqueeze(0)
        
        with torch.no_grad():
            output = model(data) # forward pass: compute predicted outputs by passing inputs to the model
            y_prediction.append(output.cpu().numpy())
            x_label.append(label.cpu().numpy())

    return y_prediction , x_label

In [None]:
y_prediction , x_label = l_plot(model, test_dataloader)

  best_state = torch.load("../model/cnn_model_magique.pt")


FileNotFoundError: [Errno 2] No such file or directory: '../model/cnn_model_magique.pt'

In [None]:
y_prediction = np.array(y_prediction)
x_label = np.array(x_label)


y_prediction = y_prediction.ravel()
x_label = x_label.ravel()

import matplotlib.pyplot as plt


x = np.linspace(0 , 120 , 10)
y =x

plt.scatter(x_label , y_prediction)
plt.plot(x,y,c='r')

In [None]:
model.eval() # prep model for evaluation
i = 0
for data, label in test_dataloader:
    data = data.to(device=device, dtype=torch.float32)
    label = label.to(device=device, dtype=torch.float32)
    
    # unsqueeze the label
    label = label.unsqueeze(0)
    
    with torch.no_grad():
        output = model(data) # forward pass: compute predicted outputs by passing inputs to the model
        loss = criterion(output,label)
        
        if i % 100 == 0 :
            print(f'True value {label.item()} and the value predict {output.item()}')
        i+=1

In [None]:
import torch
import torch.onnx

image , label = next(iter(train_dataloader))

example_input = image[0].unsqueeze(0)
model = model.cpu()
model.eval()

# Chemin d'exportation pour le modèle ONNX
onnx_path = "/kaggle/working/cobalt.onnx"

# Exporter le modèle en ONNX
torch.onnx.export(
    model,                    # Le modèle PyTorch
    example_input,            # Exemple d'entrée
    onnx_path,                # Destination du fichier ONNX
    export_params=True,       # Inclure les poids dans le fichier ONNX
    opset_version=11,         # Version ONNX (généralement >= 11)
    do_constant_folding=True, # Activer l'optimisation des constantes
    input_names=['input'],    # Nom des entrées du modèle
    output_names=['output'],  # Nom des sorties du modèle
    dynamic_axes={            # Dimensions dynamiques pour gérer plusieurs tailles d'entrée
        'input': {0: 'batch_size'}, 
        'output': {0: 'batch_size'}
    }
)

print(f"Modèle exporté avec succès vers {onnx_path}")