### Mount Google Drive

Run the following cell to mount your Google Drive. You will be prompted to authorize this notebook to access your Drive files. Follow the instructions in the output to complete the authorization.

In [2]:
from google.colab import drive
drive.mount('/content/drive')

KeyboardInterrupt: 

### Explore your Google Drive

After successfully mounting, you can explore your Drive files by running shell commands like `!ls /content/drive/MyDrive/`. This will list the contents of your 'My Drive' folder. Once you know the path to your data file, please let me know its full path and type (e.g., CSV, Excel, Parquet) so I can help you load it.

In [14]:
# Example: List contents of your My Drive folder
!ls /content/drive/MyDrive/Project\ CElegans

worm_trajectories.zip


In [15]:
zip_path = "/content/drive/MyDrive/Project CElegans/worm_trajectories.zip"

In [16]:
print("Décompression en cours...")
!unzip -q "$zip_path" -d "/content/worm"
print("Terminé ! Vos données sont prêtes dans /content/worm")

Décompression en cours...
replace /content/worm/worm_trajectories/.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/__MACOSX/worm_trajectories/._.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/worm_trajectories/TERBINAFINE- (control)/.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/__MACOSX/worm_trajectories/TERBINAFINE- (control)/._.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/worm_trajectories/TERBINAFINE+/.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/__MACOSX/worm_trajectories/TERBINAFINE+/._.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/worm_trajectories/TERBINAFINE- (control)/20250311_piworm18_4/20250311_piworm18_4_seg_49.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /content/worm/worm_trajectories/TERBINAFINE- (control)/20250311_piworm18_4/20250311_piworm18_4_seg_61.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace /cont

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import os
import random
import numpy as np
from pathlib import Path
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
from collections import defaultdict

# ==========================================
# 1. CONFIGURATION
# ==========================================
# CHOISISSEZ VOTRE ARCHITECTURE ICI : "resnet", "efficientnet", ou "densenet"
MODEL_NAME = "densenet"

# Chemin par défaut si les données sont dans le même dossier que ce script
DEFAULT_DATA_DIR = Path("worm_trajectories")
# Chemin Colab/système de fichiers temporaire si les données sont décompressées là
COLAB_DATA_DIR = Path("/worm_trajectories")

BATCH_SIZE = 32
LEARNING_RATE = 0.001
NUM_EPOCHS = 15
IMG_SIZE = 224

# Détection automatique du matériel
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Entraînement sur : {device} avec le modèle {MODEL_NAME}")


def set_data_dir():
    """Vérifie le chemin de DATA_DIR et s'adapte à l'environnement Colab."""
    if DEFAULT_DATA_DIR.exists():
        print(f"Dossier de données trouvé : {DEFAULT_DATA_DIR}")
        return DEFAULT_DATA_DIR
    elif COLAB_DATA_DIR.exists():
        print(f"Dossier de données Colab trouvé : {COLAB_DATA_DIR}")
        return COLAB_DATA_DIR
    else:
        raise FileNotFoundError(
            f"ERREUR FATALE: Le dossier de données n'a été trouvé ni à '{DEFAULT_DATA_DIR}' (Local) ni à '{COLAB_DATA_DIR}' (Colab)."
        )

DATA_DIR = set_data_dir()

# ==========================================
# 2. PRÉPARATION DES DONNÉES
# ==========================================

class WormDataset(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform
        self.classes = sorted([d.name for d in DATA_DIR.iterdir() if d.is_dir()])
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        print(f"Classes détectées : {self.class_to_idx}")

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        label_str = img_path.parent.parent.name
        if label_str not in self.class_to_idx:
            raise ValueError(f"Label '{label_str}' non trouvé dans le mapping de classe: {self.class_to_idx}")
        label = self.class_to_idx[label_str]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

def split_dataset_by_worm(root_dir, val_split=0.2):
    """Sépare les données par ver pour éviter le Data Leakage."""
    train_files = []
    val_files = []

    for class_dir in root_dir.iterdir():
        if not class_dir.is_dir(): continue
        worm_dirs = [d for d in class_dir.iterdir() if d.is_dir()]

        if not worm_dirs:
            continue

        random.shuffle(worm_dirs)
        split_idx = int(len(worm_dirs) * (1 - val_split))
        train_worms = worm_dirs[:split_idx]
        val_worms = worm_dirs[split_idx:]

        for w in train_worms:
            train_files.extend(list(w.glob("*.png")))
        for w in val_worms:
            val_files.extend(list(w.glob("*.png")))

    print(f"Split terminé : {len(train_files)} images d'entraînement, {len(val_files)} images de validation.")
    if len(train_files) == 0 or len(val_files) == 0:
         raise ValueError("ERREUR: Le split n'a produit aucune image.")
    return train_files, val_files

# ==========================================
# 3. INITIALISATION DU MODÈLE
# ==========================================

def initialize_model(model_name, num_classes, use_pretrained=True):
    model = None

    if model_name == "resnet":
        model = models.resnet18(pretrained=use_pretrained)
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, num_classes)
    elif model_name == "efficientnet":
        try:
            model = models.efficientnet_b0(pretrained=use_pretrained)
            num_ftrs = model.classifier[1].in_features
            model.classifier[1] = nn.Linear(num_ftrs, num_classes)
        except:
             print("Fallback to ResNet due to torchvision version.")
             return initialize_model("resnet", num_classes, use_pretrained)
    elif model_name == "densenet":
        model = models.densenet121(pretrained=use_pretrained)
        num_ftrs = model.classifier.in_features
        model.classifier = nn.Linear(num_ftrs, num_classes)
    else:
        print("Modèle invalide.")
        exit()

    return model

# ==========================================
# 4. ÉVALUATION PAR VOTE (Nouveau)
# ==========================================

def evaluate_by_vote(model, dataset, device):
    """
    Regroupe les prédictions par ID de ver et applique un vote majoritaire.
    """
    print("\n--- DÉBUT DE L'ÉVALUATION PAR VOTE (Worm-Level) ---")
    model.eval()

    # Dictionnaire pour stocker les votes : {worm_id: {'votes': [], 'true_label': int}}
    worm_results = defaultdict(lambda: {'votes': [], 'true_label': None})

    # On n'utilise pas de DataLoader mélangé ici pour pouvoir mapper fichier -> ver facilement
    # Mais le dataset est accessible par index
    loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False)

    all_preds_raw = []

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(loader):
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            # Récupérer les infos fichiers pour ce batch
            # Le loader charge dans l'ordre du dataset
            start_idx = i * BATCH_SIZE
            end_idx = start_idx + inputs.size(0)
            batch_files = dataset.file_list[start_idx:end_idx]

            for f_path, pred, label in zip(batch_files, predicted.cpu().numpy(), labels.numpy()):
                # Structure: .../LABEL/WORM_ID/image.png
                worm_id = f_path.parent.name
                worm_results[worm_id]['votes'].append(pred)
                worm_results[worm_id]['true_label'] = label

    # Agrégation des votes
    final_preds = []
    final_labels = []
    worm_ids = []

    correct_worms = 0
    total_worms = 0

    for w_id, data in worm_results.items():
        votes = data['votes']
        true_label = data['true_label']

        # Vote majoritaire
        # Si la moyenne > 0.5, alors la classe prédite est 1, sinon 0
        vote_score = sum(votes) / len(votes)
        final_pred = 1 if vote_score >= 0.5 else 0

        final_preds.append(final_pred)
        final_labels.append(true_label)
        worm_ids.append(w_id)

        if final_pred == true_label:
            correct_worms += 1
        total_worms += 1

    # Affichage des résultats
    accuracy = correct_worms / total_worms if total_worms > 0 else 0
    print(f"\n>>> PRÉCISION PAR VER (VOTE MAJORITAIRE) : {accuracy:.2%}")
    print(f"Total Vers testés : {total_worms}")

    print("\nRapport de Classification (Niveau Ver) :")
    print(classification_report(final_labels, final_preds, target_names=dataset.classes))

    return accuracy

# ==========================================
# 5. PIPELINE PRINCIPAL
# ==========================================

def train_model():
    try:
        train_files, val_files = split_dataset_by_worm(DATA_DIR)
    except ValueError as e:
        print(f"Échec : {e}")
        return

    data_transforms = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    train_dataset = WormDataset(train_files, transform=data_transforms)
    val_dataset = WormDataset(val_files, transform=data_transforms)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    num_classes = len(train_dataset.classes)
    print(f"Chargement de {MODEL_NAME}...")
    model = initialize_model(MODEL_NAME, num_classes, use_pretrained=True)
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)

    history = {'train_acc': [], 'val_acc': [], 'train_loss': [], 'val_loss': []}

    print(f"\n--- Début de l'entraînement ({num_classes} classes) ---")
    for epoch in range(NUM_EPOCHS):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_dataset)
        epoch_acc = correct / total

        # Validation classique (Segment Level)
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_epoch_loss = val_loss / len(val_dataset)
        val_acc = val_correct / val_total

        history['train_acc'].append(epoch_acc)
        history['val_acc'].append(val_acc)
        history['train_loss'].append(epoch_loss)
        history['val_loss'].append(val_epoch_loss)

        print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {epoch_loss:.4f} | Val Acc (Segment): {val_acc:.4f}")

    # Sauvegarde
    torch.save(model.state_dict(), f"worm_classifier_{MODEL_NAME}.pth")

    # APPEL DE L'ÉVALUATION PAR VOTE À LA FIN
    evaluate_by_vote(model, val_dataset, device)

    # Graphiques
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_acc'], label='Train')
    plt.plot(history['val_acc'], label='Val (Seg)')
    plt.title('Précision (Segment)')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(history['train_loss'], label='Train')
    plt.plot(history['val_loss'], label='Val')
    plt.title('Loss')
    plt.legend()
    plt.savefig(f'training_history_{MODEL_NAME}.png')

if __name__ == "__main__":
    train_model()

Entraînement sur : cuda avec le modèle densenet


FileNotFoundError: ERREUR FATALE: Le dossier de données n'a été trouvé ni à 'worm_trajectories' (Local) ni à '/worm_trajectories' (Colab).