In [None]:
!pip install segmentation-models-pytorch
!pip install lightning albumentations

In [None]:
# %% [code] Cellule 1 : Import des bibliothèques nécessaires
import os
import glob
import json
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import albumentations as A
# Vous pouvez ajouter d'autres imports d'albumentations si besoin

import segmentation_models_pytorch as smp

# Vérifier si un GPU est disponible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Utilisation de l'appareil :", device)


In [None]:
# %% [code] : Classe dataset adaptée sans fichier JSON
class Sentinel2Dataset(Dataset):
    """
    Dataset pour la segmentation des routes.
    Il extrait des patchs de taille patch_size x patch_size à partir d'images de grande taille
    en utilisant une fenêtre glissante sans chevauchement.
    """
    def __init__(self, root_dir, split='train', transform=None, patch_size=512, stride=512):
        """
        Args:
            root_dir (str): Chemin racine du dataset (ex: "/kaggle/input/mon_dataset").
            split (str): 'train', 'valid' ou 'test'.
            transform: Transformations à appliquer (albumentations).
            patch_size (int): Taille du patch (par défaut 512).
            stride (int): Pas de la fenêtre glissante (par défaut 512, donc sans chevauchement).
        """
        self.transform = transform
        self.patch_size = patch_size
        self.stride = stride
        
        # Définir les répertoires d'images et de masques en fonction du split
        self.image_dir = os.path.join(root_dir, split, "images")
        self.mask_dir = os.path.join(root_dir, split, "masks")
        
        # Récupérer la liste des fichiers d'images et masques
        self.image_files = sorted(glob.glob(os.path.join(self.image_dir, "*")))
        self.mask_files = sorted(glob.glob(os.path.join(self.mask_dir, "*")))
        
        # Vérifier que le nombre d'images et de masques correspond
        assert len(self.image_files) == len(self.mask_files), "Le nombre d'images et de masques ne correspond pas!"
        
        # Créer la liste des patchs : pour chaque image, on stocke (chemin_image, chemin_masque, x, y)
        self.patches = []
        for img_path, mask_path in zip(self.image_files, self.mask_files):
            img = cv2.imread(img_path)
            if img is None:
                continue
            h, w, _ = img.shape
            for y in range(0, h, self.stride):
                for x in range(0, w, self.stride):
                    if y + self.patch_size <= h and x + self.patch_size <= w:
                        self.patches.append((img_path, mask_path, x, y))
                        
        print(f"Nombre de patchs pour le split {split} : {len(self.patches)}")
    
    def __len__(self):
        return len(self.patches)
    
    def __getitem__(self, index):
        img_path, mask_path, x, y = self.patches[index]
        # Lire l'image et le masque
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    
        # Extraire le patch correspondant
        image_patch = image[y:y+self.patch_size, x:x+self.patch_size]
        mask_patch = mask[y:y+self.patch_size, x:x+self.patch_size]
    
        # Normaliser le masque (convertir 0/255 en 0/1)
        mask_patch = mask_patch.astype(np.float32) / 255.0
    
        # Appliquer les transformations (si définies, par exemple pour l'entraînement)
        if self.transform:
           augmented = self.transform(image=image_patch, mask=mask_patch)
           image_patch = augmented['image']
           mask_patch = augmented['mask']
    
        # Normaliser l'image de [0, 255] à [0, 1]
        image_patch = image_patch.astype(np.float32) / 255.0
       # Changer l'ordre des canaux pour obtenir (C, H, W)
        image_patch = np.transpose(image_patch, (2, 0, 1))
        image_patch = torch.tensor(image_patch, dtype=torch.float)
    
        # Convertir le masque en tensor (la dimension du canal sera ajoutée pour correspondre à la sortie du modèle)
        mask_patch = torch.tensor(mask_patch, dtype=torch.float)
        mask_patch = mask_patch.unsqueeze(0)
    
        return image_patch, mask_patch

     

In [None]:
# %% [code] Cellule 3 : Définition de la fonction d'augmentation
def transformer(p=0.5):
    """
    Fonction de transformation utilisant albumentations pour la data augmentation.
    Seules les images d'entraînement seront augmentées.
    """
    return A.Compose([
        # Recadrage aléatoire redimensionné
        A.RandomResizedCrop(
            height=512,
            width=512,
            p=0.2
        ),
        # Déplacement, mise à l'échelle et rotation
        A.ShiftScaleRotate(
            p=0.5,
            shift_limit=0.0625,
            scale_limit=0.05,
            rotate_limit=10,
            border_mode=0,  # bord constant
            value=0,
            mask_value=0,
            interpolation=2,  # interpolation bicubique
        ),
        # Flip, transposition et rotation de 90 degrés aléatoire
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Transpose(p=0.5),
        A.RandomRotate90(p=0.5),
    ], p=p)


In [None]:
# %% [code] : Création des datasets et DataLoaders sans fichier JSON
# Définir le chemin racine de votre dataset Kaggle
dataset_root = "/kaggle/input/deepglobe/deepglobe"  # Adaptez ce chemin selon votre environnement

# Création des datasets pour l'entraînement et la validation
train_dataset = Sentinel2Dataset(root_dir=dataset_root, split='train', transform=transformer(p=0.5))
valid_dataset = Sentinel2Dataset(root_dir=dataset_root, split='valid', transform=None)

# Création des DataLoaders (batch = 8)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False, num_workers=4)


In [None]:
# %% [code] Cellule 5 : Définition du modèle U-Net avec ResNet50
# Création du modèle avec segmentation_models_pytorch
model = smp.UnetPlusPlus(
    encoder_name="resnet50",        # Choix du backbone
    encoder_weights="imagenet",       # Poids pré-entraînés sur ImageNet
    in_channels=3,                    # Entrée en RGB
    classes=1,                        # Segmentation binaire (1 canal en sortie)
)

model.to(device)


In [None]:
# %% [code] Cellule 6 : Définition de la fonction de perte et de l'optimiseur
criterion = nn.BCEWithLogitsLoss()  # Adapté pour la segmentation binaire
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [None]:
# %% [code] Cellule 7 : Fonctions de calcul des métriques
def compute_metrics(outputs, masks, threshold=0.5):
    """
    Calcule les vrais positifs (TP), faux positifs (FP) et faux négatifs (FN) pour un batch.
    
    Args:
        outputs (tensor): sorties brutes du modèle (logits) de taille (B, 1, H, W)
        masks (tensor): masques ground truth de taille (B, 1, H, W)
        threshold (float): seuil pour binariser la sortie (après sigmoid)
        
    Returns:
        TP, FP, FN cumulés pour le batch.
    """
    # Appliquer la sigmoid pour obtenir les probabilités
    probs = torch.sigmoid(outputs)
    preds = (probs > threshold).float()
    
    masks = masks.float()
    TP = (preds * masks).sum()
    FP = (preds * (1 - masks)).sum()
    FN = ((1 - preds) * masks).sum()
    
    return TP.item(), FP.item(), FN.item()

def calculate_epoch_metrics(TP, FP, FN, eps=1e-6):
    """
    Calcule IoU, Précision, Rappel et F1-score à partir des sommes de TP, FP et FN.
    """
    IoU = TP / (TP + FP + FN + eps)
    precision = TP / (TP + FP + eps)
    recall = TP / (TP + FN + eps)
    f1 = 2 * precision * recall / (precision + recall + eps)
    return IoU, precision, recall, f1


In [None]:
# %% [code] Cellule 8 : Boucle d'entraînement et validation avec enregistrement des métriques
num_epochs = 8  # Vous pouvez ajuster le nombre d'époques
metrics_history = []   # Liste pour stocker les métriques par époque

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_TP, train_FP, train_FN = 0, 0, 0
    
    for images, masks in train_loader:
        images = images.to(device)
        masks = masks.to(device).float()  # Conversion en float pour la loss
        
        optimizer.zero_grad()
        outputs = model(images)  # Sortie de taille (B, 1, H, W)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * images.size(0)
        TP, FP, FN = compute_metrics(outputs, masks)
        train_TP += TP
        train_FP += FP
        train_FN += FN
        
    train_loss /= len(train_loader.dataset)
    train_IoU, train_precision, train_recall, train_f1 = calculate_epoch_metrics(train_TP, train_FP, train_FN)
    
    # Phase de validation
    model.eval()
    valid_loss = 0.0
    valid_TP, valid_FP, valid_FN = 0, 0, 0
    with torch.no_grad():
        for images, masks in valid_loader:
            images = images.to(device)
            masks = masks.to(device).float()
            outputs = model(images)
            loss = criterion(outputs, masks)
            valid_loss += loss.item() * images.size(0)
            TP, FP, FN = compute_metrics(outputs, masks)
            valid_TP += TP
            valid_FP += FP
            valid_FN += FN
            
    valid_loss /= len(valid_loader.dataset)
    valid_IoU, valid_precision, valid_recall, valid_f1 = calculate_epoch_metrics(valid_TP, valid_FP, valid_FN)
    
    print(f"Epoch [{epoch+1}/{num_epochs}] Train Loss: {train_loss:.4f} | Val Loss: {valid_loss:.4f}")
    print(f"Train - IoU: {train_IoU:.4f}, Précision: {train_precision:.4f}, Rappel: {train_recall:.4f}, F1: {train_f1:.4f}")
    print(f"Valid - IoU: {valid_IoU:.4f}, Précision: {valid_precision:.4f}, Rappel: {valid_recall:.4f}, F1: {valid_f1:.4f}\n")
    
    # Enregistrer les métriques de l'époque
    metrics_history.append({
         'epoch': epoch+1,
         'train_loss': train_loss,
         'valid_loss': valid_loss,
         'train_IoU': train_IoU,
         'train_precision': train_precision,
         'train_recall': train_recall,
         'train_f1': train_f1,
         'valid_IoU': valid_IoU,
         'valid_precision': valid_precision,
         'valid_recall': valid_recall,
         'valid_f1': valid_f1,
    })

# Sauvegarder l'historique des métriques dans un fichier CSV
metrics_df = pd.DataFrame(metrics_history)
metrics_df.to_csv("metrics_history111.csv", index=False)
print("Les métriques ont été sauvegardées dans 'metrics_history111.csv'.")


In [None]:
# %% [code] Cellule 9 : Affichage des courbes d'évolution des métriques
epochs = metrics_df['epoch']

plt.figure(figsize=(12, 8))

plt.subplot(2, 2, 1)
plt.plot(epochs, metrics_df['train_IoU'], label='Train IoU')
plt.plot(epochs, metrics_df['valid_IoU'], label='Valid IoU')
plt.xlabel("Epoch")
plt.ylabel("IoU")
plt.legend()

plt.subplot(2, 2, 2)
plt.plot(epochs, metrics_df['train_precision'], label='Train Précision')
plt.plot(epochs, metrics_df['valid_precision'], label='Valid Précision')
plt.xlabel("Epoch")
plt.ylabel("Précision")
plt.legend()

plt.subplot(2, 2, 3)
plt.plot(epochs, metrics_df['train_recall'], label='Train Rappel')
plt.plot(epochs, metrics_df['valid_recall'], label='Valid Rappel')
plt.xlabel("Epoch")
plt.ylabel("Rappel")
plt.legend()

plt.subplot(2, 2, 4)
plt.plot(epochs, metrics_df['train_f1'], label='Train F1-score')
plt.plot(epochs, metrics_df['valid_f1'], label='Valid F1-score')
plt.xlabel("Epoch")
plt.ylabel("F1-score")
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
# %% [code] Cellule 9 : Sauvegarde du modèle entraîné
# Sauvegarder le modèle après l'entraînement
torch.save(model.state_dict(), 'road_segmentation_model.pth')
print("Modèle sauvegardé avec succès.")

# %% [code] Cellule 10 : Fonction d'inférence pour les grandes images
def predict_large_image(image_path, model, patch_size=512, device=device):
    """
    Prédit le masque de segmentation pour une grande image sans redimensionnement.
    Découpe l'image en patches, effectue la prédiction sur chaque patch et recompose le masque final.
    """
    # Charger l'image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_h, original_w, _ = image.shape
    
    # Appliquer un padding si nécessaire
    pad_h = (patch_size - original_h % patch_size) % patch_size
    pad_w = (patch_size - original_w % patch_size) % patch_size
    
    padded_image = cv2.copyMakeBorder(
        image, 
        0, pad_h, 
        0, pad_w, 
        cv2.BORDER_CONSTANT, 
        value=[0, 0, 0]
    )
    
    # Convertir en float32 et normaliser
    padded_image = padded_image.astype(np.float32) / 255.0
    padded_image = np.transpose(padded_image, (2, 0, 1))  # (C, H, W)
    
    # Initialiser le masque complet
    full_mask = np.zeros((padded_image.shape[1], padded_image.shape[2]), dtype=np.float32)
    
    # Découpage en patches et prédiction
    model.eval()
    with torch.no_grad():
        for y in range(0, padded_image.shape[1], patch_size):
            for x in range(0, padded_image.shape[2], patch_size):
                # Extraire le patch
                patch = padded_image[
                    :, 
                    y:y+patch_size, 
                    x:x+patch_size
                ]
                
                # Convertir en tensor et prédire
                patch_tensor = torch.tensor(patch, dtype=torch.float32).unsqueeze(0).to(device)
                output = model(patch_tensor)
                
                # Post-traitement
                prob = torch.sigmoid(output)
                pred = (prob > 0.5).float().cpu().numpy().squeeze()
                
                # Ajouter au masque complet
                full_mask[y:y+patch_size, x:x+patch_size] = pred
                
    # Retirer le padding et retourner le masque final
    final_mask = full_mask[:original_h, :original_w]
    return final_mask

# %% [code] Cellule 11 : Exemple d'utilisation sur une image test
# Charger le modèle sauvegardé
model = smp.UnetPlusPlus(
    encoder_name="resnet50",
    encoder_weights=None,  # Désactiver les poids pré-entraînés
    in_channels=3,
    classes=1
)
model.load_state_dict(torch.load('road_segmentation_model.pth', map_location=device))
model.to(device)

# Chemin vers une image test (à adapter selon votre structure)
test_image_path = "/kaggle/input/testcity/region_25_sat.png"

# Générer la prédiction
predicted_mask = predict_large_image(test_image_path, model)

# Sauvegarder le résultat
cv2.imwrite("prediction_finale.png", (predicted_mask * 255).astype(np.uint8))
print("Prédiction sauvegardée sous 'prediction_finale.png'")

# Visualisation (optionnelle)
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(cv2.cvtColor(cv2.imread(test_image_path), cv2.COLOR_BGR2RGB))
plt.title('Image originale')
plt.axis('off')

plt.subplot(122)
plt.imshow(predicted_mask, cmap='gray')
plt.title('Masque prédit')
plt.axis('off')
plt.show()

In [None]:
import os
import glob
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import albumentations as A
import segmentation_models_pytorch as smp

# Vérifier si un GPU est disponible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Utilisation de l'appareil :", device)

# =================== Classe du Dataset ===================
class Sentinel2Dataset(Dataset):
    def __init__(self, root_dir, split='train', transform=None, patch_size=512, stride=512):
        self.transform = transform
        self.patch_size = patch_size
        self.stride = stride
        
        self.image_dir = os.path.join(root_dir, split, "images")
        self.mask_dir = os.path.join(root_dir, split, "masks")
        
        self.image_files = sorted(glob.glob(os.path.join(self.image_dir, "*")))
        self.mask_files = sorted(glob.glob(os.path.join(self.mask_dir, "*")))
        
        assert len(self.image_files) == len(self.mask_files), "Mismatch images/masks!"
        
        self.patches = []
        for img_path, mask_path in zip(self.image_files, self.mask_files):
            img = cv2.imread(img_path)
            if img is None:
                continue
            h, w, _ = img.shape
            for y in range(0, h, self.stride):
                for x in range(0, w, self.stride):
                    if y + self.patch_size <= h and x + self.patch_size <= w:
                        self.patches.append((img_path, mask_path, x, y))
                        
        print(f"Nombre de patchs pour {split}: {len(self.patches)}")
    
    def __len__(self):
        return len(self.patches)
    
    def __getitem__(self, index):
        img_path, mask_path, x, y = self.patches[index]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    
        image_patch = image[y:y+self.patch_size, x:x+self.patch_size]
        mask_patch = mask[y:y+self.patch_size, x:x+self.patch_size]

        mask_patch = mask_patch.astype(np.float32) / 255.0
    
        if self.transform:
           augmented = self.transform(image=image_patch, mask=mask_patch)
           image_patch = augmented['image']
           mask_patch = augmented['mask']

        image_patch = image_patch.astype(np.float32) / 255.0
        image_patch = np.transpose(image_patch, (2, 0, 1))
        image_patch = torch.tensor(image_patch, dtype=torch.float)
    
        mask_patch = torch.tensor(mask_patch, dtype=torch.float).unsqueeze(0)
    
        return image_patch, mask_patch

# =================== Transformations ===================
def transformer(p=0.5):
    return A.Compose([
        A.RandomResizedCrop(height=512, width=512, p=0.2),
        A.ShiftScaleRotate(p=0.5, shift_limit=0.0625, scale_limit=0.05, rotate_limit=10),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Transpose(p=0.5),
        A.RandomRotate90(p=0.5),
    ], p=p)

# =================== Chargement du dataset ===================
dataset_root = "/kaggle/input/deepglobe/deepglobe"

train_dataset = Sentinel2Dataset(root_dir=dataset_root, split='train', transform=transformer(p=0.5))
valid_dataset = Sentinel2Dataset(root_dir=dataset_root, split='valid', transform=None)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False, num_workers=4)

# =================== Chargement du modèle pré-entraîné ===================
model = smp.UnetPlusPlus(
    encoder_name="resnet50",
    encoder_weights=None,  # Pas besoin d'ImageNet, on charge les poids entraînés
    in_channels=3,
    classes=1,
)

# Charger les poids du modèle entraîné
model.load_state_dict(torch.load("/kaggle/input/modelupp/pytorch/default/1/road_segmentation_model.pth", map_location=device))

model.load_state_dict(checkpoint['model_state_dict'])

# Envoyer sur GPU/CPU
model.to(device)

# =================== Définition de la loss et de l'optimiseur ===================
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# =================== Fonction de calcul des métriques ===================
def compute_metrics(outputs, masks, threshold=0.5):
    probs = torch.sigmoid(outputs)
    preds = (probs > threshold).float()
    
    masks = masks.float()
    TP = (preds * masks).sum()
    FP = (preds * (1 - masks)).sum()
    FN = ((1 - preds) * masks).sum()
    
    return TP.item(), FP.item(), FN.item()

def calculate_epoch_metrics(TP, FP, FN, eps=1e-6):
    IoU = TP / (TP + FP + FN + eps)
    precision = TP / (TP + FP + eps)
    recall = TP / (TP + FN + eps)
    f1 = 2 * precision * recall / (precision + recall + eps)
    return IoU, precision, recall, f1

# =================== Réentraînement du modèle ===================
num_epochs = 6  # Nombre d'époques supplémentaires
metrics_history = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_TP, train_FP, train_FN = 0, 0, 0
    
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device).float()
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * images.size(0)
        TP, FP, FN = compute_metrics(outputs, masks)
        train_TP += TP
        train_FP += FP
        train_FN += FN
        
    train_loss /= len(train_loader.dataset)
    train_IoU, train_precision, train_recall, train_f1 = calculate_epoch_metrics(train_TP, train_FP, train_FN)
    
    model.eval()
    valid_loss = 0.0
    valid_TP, valid_FP, valid_FN = 0, 0, 0
    with torch.no_grad():
        for images, masks in valid_loader:
            images, masks = images.to(device), masks.to(device).float()
            outputs = model(images)
            loss = criterion(outputs, masks)
            valid_loss += loss.item() * images.size(0)
            TP, FP, FN = compute_metrics(outputs, masks)
            valid_TP += TP
            valid_FP += FP
            valid_FN += FN
            
    valid_loss /= len(valid_loader.dataset)
    valid_IoU, valid_precision, valid_recall, valid_f1 = calculate_epoch_metrics(valid_TP, valid_FP, valid_FN)
    
    print(f"Epoch [{epoch+1}/{num_epochs}] Train Loss: {train_loss:.4f} | Val Loss: {valid_loss:.4f}")
    
    metrics_history.append({
         'epoch': epoch+1,
         'train_loss': train_loss,
         'valid_loss': valid_loss,
         'train_IoU': train_IoU,
         'train_precision': train_precision,
         'train_recall': train_recall,
         'train_f1': train_f1,
         'valid_IoU': valid_IoU,
         'valid_precision': valid_precision,
         'valid_recall': valid_recall,
         'valid_f1': valid_f1,
    })

# Sauvegarde du modèle mis à jour
torch.save({'model_state_dict': model.state_dict()}, "road_segmentation_updated.pth")


In [None]:
checkpoint = torch.load("/kaggle/input/modelupp/pytorch/default/1/road_segmentation_model.pth", map_location=device)

if "model_state_dict" in checkpoint:
    model.load_state_dict(checkpoint["model_state_dict"])
    print("✅ Modèle chargé avec 'model_state_dict'.")
elif "state_dict" in checkpoint:
    model.load_state_dict(checkpoint["state_dict"])
    print("✅ Modèle chargé avec 'state_dict'.")
elif isinstance(checkpoint, dict):
    model.load_state_dict(checkpoint)
    print("✅ Modèle chargé directement.")
else:
    model = checkpoint  # Chargement direct si le modèle entier a été sauvegardé
    print("✅ Modèle chargé complètement.")


In [None]:
import os
import glob
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import albumentations as A
import segmentation_models_pytorch as smp

# Vérifier si un GPU est disponible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("✅ Utilisation de l'appareil :", device)

# =================== Classe du Dataset ===================
class Sentinel2Dataset(Dataset):
    def __init__(self, root_dir, split='train', transform=None, patch_size=512, stride=512):
        self.transform = transform
        self.patch_size = patch_size
        self.stride = stride
        
        self.image_dir = os.path.join(root_dir, split, "images")
        self.mask_dir = os.path.join(root_dir, split, "masks")
        
        self.image_files = sorted(glob.glob(os.path.join(self.image_dir, "*")))
        self.mask_files = sorted(glob.glob(os.path.join(self.mask_dir, "*")))
        
        assert len(self.image_files) == len(self.mask_files), "Mismatch images/masks!"
        
        self.patches = []
        for img_path, mask_path in zip(self.image_files, self.mask_files):
            img = cv2.imread(img_path)
            if img is None:
                continue
            h, w, _ = img.shape
            for y in range(0, h, self.stride):
                for x in range(0, w, self.stride):
                    if y + self.patch_size <= h and x + self.patch_size <= w:
                        self.patches.append((img_path, mask_path, x, y))
                        
        print(f"📌 Nombre de patchs pour {split}: {len(self.patches)}")
    
    def __len__(self):
        return len(self.patches)
    
    def __getitem__(self, index):
        img_path, mask_path, x, y = self.patches[index]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    
        image_patch = image[y:y+self.patch_size, x:x+self.patch_size]
        mask_patch = mask[y:y+self.patch_size, x:x+self.patch_size]

        mask_patch = mask_patch.astype(np.float32) / 255.0
    
        if self.transform:
           augmented = self.transform(image=image_patch, mask=mask_patch)
           image_patch = augmented['image']
           mask_patch = augmented['mask']

        image_patch = image_patch.astype(np.float32) / 255.0
        image_patch = np.transpose(image_patch, (2, 0, 1))
        image_patch = torch.tensor(image_patch, dtype=torch.float)
    
        mask_patch = torch.tensor(mask_patch, dtype=torch.float).unsqueeze(0)
    
        return image_patch, mask_patch

# =================== Transformations ===================
def transformer(p=0.5):
    return A.Compose([
        A.RandomResizedCrop(height=512, width=512, p=0.2),
        A.ShiftScaleRotate(p=0.5, shift_limit=0.0625, scale_limit=0.05, rotate_limit=10),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Transpose(p=0.5),
        A.RandomRotate90(p=0.5),
    ], p=p)

# =================== Chargement du dataset ===================
dataset_root = "/kaggle/input/deepglobe/deepglobe"

train_dataset = Sentinel2Dataset(root_dir=dataset_root, split='train', transform=transformer(p=0.5))
valid_dataset = Sentinel2Dataset(root_dir=dataset_root, split='valid', transform=None)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False, num_workers=4)

# =================== Chargement du modèle pré-entraîné ===================
model_path = "/kaggle/input/modelupp/pytorch/default/1/road_segmentation_model.pth"

checkpoint = torch.load(model_path, map_location=device)

# Vérifier comment le modèle a été sauvegardé
model = smp.UnetPlusPlus(
    encoder_name="resnet50",
    encoder_weights=None,
    in_channels=3,
    classes=1
)

# Charger les poids directement sans passer par un dictionnaire
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()

print("✅ Modèle chargé avec succès avec load_state_dict() !")


# =================== Fonction de test sur des images ===================
def test_model(dataloader, num_samples=3):
    model.eval()
    with torch.no_grad():
        for i, (images, masks) in enumerate(dataloader):
            if i >= num_samples:
                break
            images, masks = images.to(device), masks.to(device)
            outputs = torch.sigmoid(model(images))  # Appliquer sigmoid
            preds = (outputs > 0.5).float()
            
            img = np.transpose(images[0].cpu().numpy(), (1, 2, 0))
            mask_gt = masks[0].cpu().numpy()[0]
            mask_pred = preds[0].cpu().numpy()[0]
            
            plt.figure(figsize=(12, 4))
            plt.subplot(1, 3, 1)
            plt.imshow(img)
            plt.title("Image Originale")
            plt.axis("off")

            plt.subplot(1, 3, 2)
            plt.imshow(mask_gt, cmap='gray')
            plt.title("Masque Vérité Terrain")
            plt.axis("off")

            plt.subplot(1, 3, 3)
            plt.imshow(mask_pred, cmap='gray')
            plt.title("Masque Prédit")
            plt.axis("off")

            plt.show()

# Test du modèle sur quelques images de validation
test_model(valid_loader)

# =================== Fine-Tuning (Réentraînement) ===================
num_epochs = 5  # Nombre d'époques pour le fine-tuning
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)  # Réduction du learning rate

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f"📌 Epoch {epoch+1}/{num_epochs} - Loss: {total_loss / len(train_loader):.4f}")

# =================== Sauvegarde du modèle mis à jour ===================
torch.save(model, "/kaggle/working")


In [None]:
# Sauvegarde du modèle sous un fichier .pth
torch.save(model.state_dict(), "/kaggle/working/unetpp_model.pth")


In [None]:
def predict_large_image(image_path, model, patch_size=512, device="cuda"):
    # Charger l'image
    image = cv2.imread(image_path)

    # Vérifier si l'image est chargée correctement
    if image is None:
        raise FileNotFoundError(f"Erreur : Impossible de charger l'image '{image_path}'. Vérifiez le chemin du fichier.")

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_h, original_w, _ = image.shape
    
    # Appliquer un padding si nécessaire
    pad_h = (patch_size - original_h % patch_size) % patch_size
    pad_w = (patch_size - original_w % patch_size) % patch_size
    
    padded_image = cv2.copyMakeBorder(
        image, 
        0, pad_h, 
        0, pad_w, 
        cv2.BORDER_CONSTANT, 
        value=[0, 0, 0]
    )
    
    # Convertir en float32 et normaliser
    padded_image = padded_image.astype(np.float32) / 255.0
    padded_image = np.transpose(padded_image, (2, 0, 1))  # (C, H, W)
    
    # Initialiser le masque complet
    full_mask = np.zeros((padded_image.shape[1], padded_image.shape[2]), dtype=np.float32)
    
    # Découpage en patches et prédiction
    model.eval()
    with torch.no_grad():
        for y in range(0, padded_image.shape[1], patch_size):
            for x in range(0, padded_image.shape[2], patch_size):
                # Extraire le patch
                patch = padded_image[
                    :, 
                    y:y+patch_size, 
                    x:x+patch_size
                ]
                
                # Convertir en tensor et prédire
                patch_tensor = torch.tensor(patch, dtype=torch.float32).unsqueeze(0).to(device)
                output = model(patch_tensor)
                
                # Post-traitement
                prob = torch.sigmoid(output)
                pred = (prob > 0.5).float().cpu().numpy().squeeze()
                
                # Ajouter au masque complet
                full_mask[y:y+patch_size, x:x+patch_size] = pred
                
    # Retirer le padding et retourner le masque final
    final_mask = full_mask[:original_h, :original_w]
    return final_mask

# %% [code] Cellule 11 : Exemple d'utilisation sur une image test
# Charger le modèle sauvegardé
model = smp.UnetPlusPlus(
    encoder_name="resnet50",
    encoder_weights=None,  # Désactiver les poids pré-entraînés
    in_channels=3,
    classes=1
)
model.load_state_dict(torch.load('unetpp_model.pth', map_location=device))
model.to(device)

# Chemin vers une image test (à adapter selon votre structure)
test_image_path = "/kaggle/input/deepglobe/deepglobe/valid/images/100712_sat.jpg"

# Générer la prédiction
predicted_mask = predict_large_image(test_image_path, model)

# Sauvegarder le résultat
cv2.imwrite("prediction_finale.jpg", (predicted_mask * 255).astype(np.uint8))
print("Prédiction sauvegardée sous 'prediction_finale.jpg'")

# Visualisation (optionnelle)
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(cv2.cvtColor(cv2.imread(test_image_path), cv2.COLOR_BGR2RGB))
plt.title('Image originale')
plt.axis('off')

plt.subplot(122)
plt.imshow(predicted_mask, cmap='gray')
plt.title('Masque prédit')
plt.axis('off')
plt.show()

In [None]:
import torch
import numpy as np
from sklearn.metrics import jaccard_score, precision_score, recall_score, f1_score

def evaluate_model(model, dataloader, device="cuda"):
    model.eval()
    total_iou, total_precision, total_recall, total_f1 = 0, 0, 0, 0
    num_samples = 0
    
    with torch.no_grad():
        for images, masks in dataloader:
            batch_size, num_patches, channels, height, width = images.shape
            images = images.view(batch_size * num_patches, channels, height, width).to(device)
            masks = masks.view(batch_size * num_patches, 1, height, width).to(device)

            outputs = model(images)
            preds = (outputs > 0.5).float()  # Binarisation des prédictions
            
            # Conversion en vecteur pour sklearn
            preds = preds.cpu().numpy().flatten()
            masks = masks.cpu().numpy().flatten()

            total_iou += jaccard_score(masks, preds)
            total_precision += precision_score(masks, preds)
            total_recall += recall_score(masks, preds)
            total_f1 += f1_score(masks, preds)

            num_samples += 1
    
    # Moyenne des métriques sur tous les échantillons
    avg_iou = total_iou / num_samples
    avg_precision = total_precision / num_samples
    avg_recall = total_recall / num_samples
    avg_f1 = total_f1 / num_samples

    return avg_iou, avg_precision, avg_recall, avg_f1


In [None]:
# Évaluation du modèle après l'entraînement
iou, precision, recall, f1 = evaluate_model(model, valid_loader, device)

print("\n🔹 **Résultats après entraînement :**")
print(f"IoU (Intersection over Union)  : {iou:.4f}")
print(f"Précision                      : {precision:.4f}")
print(f"Rappel                         : {recall:.4f}")
print(f"F1-score                       : {f1:.4f}")


In [None]:
from tqdm import tqdm
import numpy as np
from sklearn.metrics import jaccard_score, precision_score, recall_score, f1_score

def evaluate_model(model, dataloader, device="cuda"):
    model.eval()
    iou_scores, precision_scores, recall_scores, f1_scores = [], [], [], []
    
    with torch.no_grad():
        for images, masks in tqdm(dataloader, desc="Évaluation en cours"):
            images, masks = images.to(device), masks.to(device)

            # Prédictions
            outputs = model(images)
            preds = (outputs > 0.5).float()  # Binarisation des prédictions

            # Conversion en numpy sans .flatten()
            preds_np = preds.cpu().numpy().astype(int)
            masks_np = masks.cpu().numpy().astype(int)

            # Calcul des métriques sur l'ensemble du batch
            for i in range(preds_np.shape[0]):  # Parcourir chaque image du batch
                iou_scores.append(jaccard_score(masks_np[i].flatten(), preds_np[i].flatten()))
                precision_scores.append(precision_score(masks_np[i].flatten(), preds_np[i].flatten()))
                recall_scores.append(recall_score(masks_np[i].flatten(), preds_np[i].flatten()))
                f1_scores.append(f1_score(masks_np[i].flatten(), preds_np[i].flatten()))

    # Moyenne des scores
    avg_iou = np.mean(iou_scores)
    avg_precision = np.mean(precision_scores)
    avg_recall = np.mean(recall_scores)
    avg_f1 = np.mean(f1_scores)

    return avg_iou, avg_precision, avg_recall, avg_f1


In [None]:
# Chargement de toutes les images de validation
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False)



In [None]:
# Vérification du device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Évaluer le modèle optimisé
iou, precision, recall, f1 = evaluate_model(model, valid_loader, device)

# Affichage des résultats
print("\n🔹 **Résultats après entraînement :**")
print(f"IoU (Intersection over Union)  : {iou:.4f}")
print(f"Précision                      : {precision:.4f}")
print(f"Rappel                         : {recall:.4f}")
print(f"F1-score                       : {f1:.4f}")


In [None]:
import matplotlib.pyplot as plt

# Stocker les métriques
metrics = [iou, precision, recall, f1]
labels = ["IoU", "Précision", "Rappel", "F1-score"]

# Affichage des métriques
plt.figure(figsize=(8, 5))
plt.bar(labels, metrics, color=["blue", "green", "orange", "red"])
plt.ylim(0, 1)
plt.ylabel("Score")
plt.title("Métriques de Performance du Modèle U-Net++")
plt.show()


In [None]:
def predict_large_image(image_path, model, patch_size=512, device="cuda"):
    # Charger l'image
    image = cv2.imread(image_path)

    # Vérifier si l'image est chargée correctement
    if image is None:
        raise FileNotFoundError(f"Erreur : Impossible de charger l'image '{image_path}'. Vérifiez le chemin du fichier.")

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_h, original_w, _ = image.shape
    
    # Appliquer un padding si nécessaire
    pad_h = (patch_size - original_h % patch_size) % patch_size
    pad_w = (patch_size - original_w % patch_size) % patch_size
    
    padded_image = cv2.copyMakeBorder(
        image, 
        0, pad_h, 
        0, pad_w, 
        cv2.BORDER_CONSTANT, 
        value=[0, 0, 0]
    )
    
    # Convertir en float32 et normaliser
    padded_image = padded_image.astype(np.float32) / 255.0
    padded_image = np.transpose(padded_image, (2, 0, 1))  # (C, H, W)
    
    # Initialiser le masque complet
    full_mask = np.zeros((padded_image.shape[1], padded_image.shape[2]), dtype=np.float32)
    
    # Découpage en patches et prédiction
    model.eval()
    with torch.no_grad():
        for y in range(0, padded_image.shape[1], patch_size):
            for x in range(0, padded_image.shape[2], patch_size):
                # Extraire le patch
                patch = padded_image[
                    :, 
                    y:y+patch_size, 
                    x:x+patch_size
                ]
                
                # Convertir en tensor et prédire
                patch_tensor = torch.tensor(patch, dtype=torch.float32).unsqueeze(0).to(device)
                output = model(patch_tensor)
                
                # Post-traitement
                prob = torch.sigmoid(output)
                pred = (prob > 0.5).float().cpu().numpy().squeeze()
                
                # Ajouter au masque complet
                full_mask[y:y+patch_size, x:x+patch_size] = pred
                
    # Retirer le padding et retourner le masque final
    final_mask = full_mask[:original_h, :original_w]
    return final_mask

# %% [code] Cellule 11 : Exemple d'utilisation sur une image test
# Charger le modèle sauvegardé
model = smp.UnetPlusPlus(
    encoder_name="resnet50",
    encoder_weights=None,  # Désactiver les poids pré-entraînés
    in_channels=3,
    classes=1
)
model.load_state_dict(torch.load('unetpp_model.pth', map_location=device))
model.to(device)

# Chemin vers une image test (à adapter selon votre structure)
test_image_path = "/kaggle/input/testcity/region_25_sat.png"

# Générer la prédiction
predicted_mask = predict_large_image(test_image_path, model)

# Sauvegarder le résultat
cv2.imwrite("prediction_finale.png", (predicted_mask * 255).astype(np.uint8))
print("Prédiction sauvegardée sous 'prediction_finale.png'")

# Visualisation (optionnelle)
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(cv2.cvtColor(cv2.imread(test_image_path), cv2.COLOR_BGR2RGB))
plt.title('Image originale')
plt.axis('off')

plt.subplot(122)
plt.imshow(predicted_mask, cmap='gray')
plt.title('Masque prédit')
plt.axis('off')
plt.show()

In [None]:
def predict_large_image(image_path, model, patch_size=512, device="cuda"):
    # Charger l'image
    image = cv2.imread(image_path)

    # Vérifier si l'image est chargée correctement
    if image is None:
        raise FileNotFoundError(f"Erreur : Impossible de charger l'image '{image_path}'. Vérifiez le chemin du fichier.")

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_h, original_w, _ = image.shape
    
    # Appliquer un padding si nécessaire
    pad_h = (patch_size - original_h % patch_size) % patch_size
    pad_w = (patch_size - original_w % patch_size) % patch_size
    
    padded_image = cv2.copyMakeBorder(
        image, 
        0, pad_h, 
        0, pad_w, 
        cv2.BORDER_CONSTANT, 
        value=[0, 0, 0]
    )
    
    # Convertir en float32 et normaliser
    padded_image = padded_image.astype(np.float32) / 255.0
    padded_image = np.transpose(padded_image, (2, 0, 1))  # (C, H, W)
    
    # Initialiser le masque complet
    full_mask = np.zeros((padded_image.shape[1], padded_image.shape[2]), dtype=np.float32)
    
    # Découpage en patches et prédiction
    model.eval()
    with torch.no_grad():
        for y in range(0, padded_image.shape[1], patch_size):
            for x in range(0, padded_image.shape[2], patch_size):
                # Extraire le patch
                patch = padded_image[
                    :, 
                    y:y+patch_size, 
                    x:x+patch_size
                ]
                
                # Convertir en tensor et prédire
                patch_tensor = torch.tensor(patch, dtype=torch.float32).unsqueeze(0).to(device)
                output = model(patch_tensor)
                
                # Post-traitement
                prob = torch.sigmoid(output)
                pred = (prob > 0.5).float().cpu().numpy().squeeze()
                
                # Ajouter au masque complet
                full_mask[y:y+patch_size, x:x+patch_size] = pred
                
    # Retirer le padding et retourner le masque final
    final_mask = full_mask[:original_h, :original_w]
    return final_mask

# %% [code] Cellule 11 : Exemple d'utilisation sur une image test
# Charger le modèle sauvegardé
model = smp.UnetPlusPlus(
    encoder_name="resnet50",
    encoder_weights=None,  # Désactiver les poids pré-entraînés
    in_channels=3,
    classes=1
)
model.load_state_dict(torch.load('unetpp_model.pth', map_location=device))
model.to(device)

# Chemin vers une image test (à adapter selon votre structure)
test_image_path = "/kaggle/input/deepglobe/deepglobe/valid/images/114405_sat.jpg"

# Générer la prédiction
predicted_mask = predict_large_image(test_image_path, model)

# Sauvegarder le résultat
cv2.imwrite("prediction_finale.jpg", (predicted_mask * 255).astype(np.uint8))
print("Prédiction sauvegardée sous 'prediction_finale.jpg'")

# Visualisation (optionnelle)
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(cv2.cvtColor(cv2.imread(test_image_path), cv2.COLOR_BGR2RGB))
plt.title('Image originale')
plt.axis('off')

plt.subplot(122)
plt.imshow(predicted_mask, cmap='gray')
plt.title('Masque prédit')
plt.axis('off')
plt.show()