<a href="https://colab.research.google.com/github/Lyes-Im/Imine_Lyes/blob/main/FISSURE_PROJECT(second_part)_appliquer_sur_Snowflake_IMINE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Projet fait par :
IMINE Belaid Lyes

Projet : Détection des fissure.

## **2 ème partie du projet :**
Voici les étapes en bref :

**1.Préparation et gestion des données dans Snowflake**

Afin de gérer efficacement un volume conséquent de données (11298 images de fissures), la plateforme Snowflake a été utilisée comme environnement centralisé de stockage et de calcul.

Optimisation des fichiers :

Les images sont stockées sous forme compressée (.gz), ce qui permet de :

- réduire l’espace de stockage,

- accélérer les transferts,

- améliorer les performances de lecture pendant l’entraînement.

**2.Choix de l’architecture : le modèle U-Net**

Le problème traité dans ce projet une segmentation sémantique, où chaque pixel de l’image doit être classé comme fissure ou béton sain

Le modèle U-Net est spécialisé pour ce genre de tache :

- Il permet une localisation précise des fissures pixel par pixel.

- Il est particulièrement performant pour détecter des structures fines et allongées, comme les fissures.

- Son architecture à connexions de saut (skip connections) permet de préserver les détails spatiaux essentiels lors de la reconstruction du masque.

**3.Entraînement et optimisation du modèle**
L’apprentissage du modèle a été réalisé directement dans Snowflake à l’aide de PyTorch.

- Fonction de perte (Loss) :

  - Utilisation de fonctions adaptées à la segmentation binaire (ex. Binary Cross Entropy).

  - Ce choix est important car les fissures représentent une très faible proportion des pixels, entraînant un fort déséquilibre entre les classes.

- Suivi de l’apprentissage :

    - Surveillance de la perte (loss) et de la métrique IoU (Intersection over Union).

  - Mise en place d’un mécanisme d’early stopping afin d’éviter le surapprentissage.

  - Enregistrer le meuilleur model trouvé avec un loss bas et IoU haut

Ces stratégies permettent de garantir que le modèle apprend réellement la géométrie des fissures

**4.Phase de test et diagnostic du modèle**

Le modèle a ensuite été évalué sur un jeu de test  de 1 130 images.

- Défi des données réelles :

  - Une analyse approfondie du jeu de test a révélé qu’il est majoritairement composé d’images de béton sain (Support Fissure = 0).

- Impact sur les métriques :

  - Cette distribution explique des métriques globales faibles ou nulles, malgré est un  modèle fonctionnel vu que je l'ai testé sur les images d'entrainement.


Cette seconde partie du projet met en évidence :

- une intégration complète du deep learning dans Snowflake.
- la maitrise de l'outil snowflake.



**Etape 01: Accès aux données avec Snowpark**

Dans cette étape, nous utilisons Snowpark afin d’interagir directement avec Snowflake depuis le notebook.

Nous définissons ici la base de données et le schéma de travail contenant la vue
`V_TRAINING_DATA`, qui référence les images (URL), masques (URL) et image_groupe stockés dans un stage Snowflake.


In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# # Importation de Snowpark
from snowflake.snowpark.context import get_active_session
# Récupération de la session Snowflake active
session = get_active_session()
# Définition du contexte de travail (Database, schema)
session.use_database("PROJET_FISSURES")
session.use_schema("STORAGE_SCHEMA")

# Chargement de la vue V_TRAINING_DATA dans un DataFrame Pandas
try:
    df_training = session.table("V_TRAINING_DATA").to_pandas()
    print(f" {len(df_training)} lignes récupérées.")
except Exception as e:
    print(f" Erreur : {e}")

# Vérification du contenu
df_training.head()




**Etape 02: Construction des paires image–masque internes au stage Snowflake**

Les images et masques étant stockés sous forme de fichiers compressés (`.gz`)
dans un stage Snowflake, nous devons construire des chemins internes compatibles
avec l’environnement Snowflake ML.

▶ Pour chaque groupe (TRAIN, VAL, TEST), nous générons une liste de paires
(image, masque) pointant directement vers les fichiers stockés dans le stage.

In [None]:
def get_internal_pairs(df, group_name):

   """
    Construit les paires image–masque internes au stage Snowflake
    pour un groupe donné (TRAIN, VAL ou TEST).

    """

    pairs = []
    # On filtre par groupe (TRAIN, VAL, TEST)
    subset = df[df['IMAGE_GROUP'] == group_name]

    for _, row in subset.iterrows():
        # On construit le chemin interne au Stage

        img_path = f"@PROJET_FISSURES.STORAGE_SCHEMA.MY_FISSURE_STAGE/images/{row['IMAGE_NAME']}.gz"
        mask_path = f"@PROJET_FISSURES.STORAGE_SCHEMA.MY_FISSURE_STAGE/masks/{row['MASK_NAME']}.gz"

        pairs.append((img_path, mask_path)) #Ajouter les pairs

    return pairs



In [None]:
#test
print(test_pairs)

### **Etape 03: Séparation des données par sous-ensemble**

À partir de la vue Snowflake, les données sont déjà réparties en trois groupes :
- TRAIN : données d’entraînement
- VAL : données de validation
- TEST : données de test

Nous générons ici les listes finales de paires image–masque pour chaque sous-ensemble.

   - (image,mask) :spécial training

- (image,mask) :spécial validation

- (image,mask) :spécial test


In [None]:
# On génère nos listes de paires en filtrant par TRAIN | VAL | TEST
train_pairs = get_internal_pairs(df_training, 'TRAIN')
val_pairs   = get_internal_pairs(df_training, 'VAL')
test_pairs  = get_internal_pairs(df_training, 'TEST')

print(f" Paires constituées! Exemple de chemin : {train_pairs[0][0]}")

#Afficher la taille de train, test, validation -> pour objectif de confirmation de split
print(f"Train : {len(train_pairs)}")
print(f"Val   : {len(val_pairs)}")
print(f"Test  : {len(test_pairs)}")

### **Etape 04: Lecture directe des images depuis le stage Snowflake et test de lecture image–masque**

Snowflake ML permet de lire directement des fichiers stockés dans un stage,
sans téléchargement local ni accès Internet (stockage sur Amazon S3).

▶ Dans cette étape, nous testons la lecture des fichiers `.gz` contenant
les images et les masques afin de vérifier que les données sont exploitables (sont bien chargée)
pour l’entraînement du modèle.

▶ Un test visuel est effectué sur une paire image–masque issue du jeu
d’entraînement afin de confirmer la validité du chargement des données
depuis le stage Snowflake.

In [None]:
from snowflake.snowpark.files import SnowflakeFile
import io
import gzip
from PIL import Image
import matplotlib.pyplot as plt

def load_snowflake_gz_internal(path, is_mask=False):
    try:
        # SnowflakeFile.open ne nécessite pas d'internet
        with SnowflakeFile.open(path, 'rb') as f:
            # On décompresse le flux gzip
            with gzip.GzipFile(fileobj=f) as gz:
                img = Image.open(gz)
                # Conversion selon le type :
                # - masque en niveaux de gris
                # - image couleur (RGB)
                return img.convert("L") if is_mask else img.convert("RGB")
    except Exception as e:
        print(f"Erreur sur {path} : {e}")
        return None

# --- Test pour afficher la paire (image - mask) ---
print("Test de lecture en cours...")
img_test = load_snowflake_gz_internal(train_pairs[0][0], is_mask=False)
mask_test = load_snowflake_gz_internal(train_pairs[0][1], is_mask=True)
#Affichage la paire (image - mask)
if img_test and mask_test:
    print("  Lire les images directement depuis le Stage.")
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1); plt.imshow(img_test); plt.title("Image")
    plt.subplot(1, 2, 2); plt.imshow(mask_test, cmap='gray'); plt.title("Masque")
    plt.show()
else:
    print(" Erreur")

### **Etape 05:  Création d’un Dataset PyTorch personnalisé pour Snowflake**
Dans cette étape, nous définissons une classe `Dataset` PyTorch permettant
de charger dynamiquement les images et masques directement depuis
le stage Snowflake.

Chaque élément du dataset correspond à une paire (image, masque),
chargée et transformée avant d’être envoyée au modèle.

In [None]:
from torch.utils.data import Dataset
from torchvision import transforms

class SnowflakeFissureDataset(Dataset):
    def __init__(self, pairs, transform=None):
        self.pairs = pairs
        self.transform = transform

    def __len__(self):
      # Nombre total d'échantillons
        return len(self.pairs)

    def __getitem__(self, idx):
        # Récupération des chemins internes (@...)
        img_path, mask_path = self.pairs[idx]

        # Chargement depuis le stage Snowflake
        image = load_snowflake_gz_internal(img_path, is_mask=False)
        mask = load_snowflake_gz_internal(mask_path, is_mask=True)

        # Application des transformations (Redimensionnement, Tenseurs)
        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        return image, mask



### **Etape 05: Transformations des données**

▶ Les images originales ayant une résolution élevée (448 x 448), elles sont redimensionnées
en 256×256 pixels afin de réduire le temps de calcul tout en conservant
une bonne précision de segmentation.

▶ Les images sont ensuite converties en tenseurs PyTorch.

▶ Création des datasets d’entraînement, de validation et de test
à partir des paires image–masque précédemment construites.


In [None]:
# --- Définition des transformations ---
# On redimensionne à 256x256 car c'est un bon compromis vitesse/précision
data_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(), # Conversion en tenseur pytorch
])

# --- Instanciation des Datasets ---

train_dataset = SnowflakeFissureDataset(train_pairs, transform=data_transforms) #Dataset training
val_dataset   = SnowflakeFissureDataset(val_pairs,   transform=data_transforms) #Dataset validation
test_dataset  = SnowflakeFissureDataset(test_pairs,  transform=data_transforms) #Dataset test


print(f"   - Train : {len(train_dataset)} images")
print(f"   - Val   : {len(val_dataset)} images")
print(f"   - Test  : {len(test_dataset)} images")

### **Etape 06: Configuration du GPU et des DataLoaders**

Cette étape permet de :
- détecter automatiquement la présence d’un GPU Snowflake (NVIDIA A10G)
- définir les DataLoaders PyTorch pour l’entraînement, la validation et le test

Les DataLoaders assurent le chargement par lots (batchs) des données.


In [None]:
import torch
from torch.utils.data import DataLoader

# 1. Vérification du GPU Snowflake
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device détecté : {device}")
if device.type == 'cuda':
    print(f" Modèle de GPU : {torch.cuda.get_device_name(0)}")

# 2. Configuration des DataLoaders
# On utilise un batch_size de 16.
BATCH_SIZE = 16

#DataLoaders

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False)

# 3. Test ultime du loader
# On essaie de récupérer un paquet d'images pour voir si tout s'enchaîne bien
images, masks = next(iter(train_loader))

print(f"Prêt à envoyer {len(train_loader)} paquets de 16 images au modèle.")

### **Etape 07: Architecture du modèle ' *ResNet-UNet* '**

Le modèle utilisé est une architecture hybride combinant :
- un encodeur ResNet34
- un décodeur de type U-Net

Cette architecture est particulièrement adaptée aux tâches
de segmentation d’images grâce à ses connexions de saut
(skip connections).


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
         """
    Bloc standard U-Net : deux convolutions successives
    """

    def forward(self, x):
        return self.double_conv(x)


class ResNetUNet(nn.Module):
    def __init__(self, n_classes=1):
        super().__init__()
        # Encodeur ResNet34 (sans poids pré-entraînés pour éviter l'erreur de connexion Internet)

        resnet = models.resnet34(weights=None)

        self.encoder0 = nn.Sequential(resnet.conv1, resnet.bn1, resnet.relu)
        self.pool0 = resnet.maxpool
        self.encoder1 = resnet.layer1
        self.encoder2 = resnet.layer2
        self.encoder3 = resnet.layer3
        self.encoder4 = resnet.layer4

        self.up4 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec4 = DoubleConv(512, 256)
        self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec3 = DoubleConv(256, 128)
        self.up2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec2 = DoubleConv(128, 64)
        self.up1 = nn.ConvTranspose2d(64, 64, kernel_size=2, stride=2)
        self.dec1 = DoubleConv(128, 64)

        self.final = nn.Conv2d(64, n_classes, kernel_size=1)

    def forward(self, x):
        e0 = self.encoder0(x)
        e1 = self.encoder1(self.pool0(e0))
        e2 = self.encoder2(e1)
        e3 = self.encoder3(e2)
        e4 = self.encoder4(e3)

        d4 = self.up4(e4)
        d4 = self.dec4(torch.cat([d4, e3], dim=1))
        d3 = self.up3(d4)
        d3 = self.dec3(torch.cat([d3, e2], dim=1))
        d2 = self.up2(d3)
        d2 = self.dec2(torch.cat([d2, e1], dim=1))
        d1 = self.up1(d2)
        d1 = self.dec1(torch.cat([d1, e0], dim=1))

        out = self.final(d1)
        # Redimensionnement final
        return F.interpolate(out, size=x.shape[2:], mode='bilinear', align_corners=False)

### **Etape 08: Initialisation du model et entraînement du modèle**

L’entraînement repose sur :
- une fonction de perte BCEWithLogitsLoss
- une métrique IoU (Intersection over Union)
- un optimiseur Adam
- une stratégie d’Early Stopping pour éviter le surapprentissage

Une barre de progression est utilisée pour suivre l’évolution
de l’entraînement en temps réel.

▶ Après un certain nombre d'epoch, si on ne voit pas qu'il y a une augmentation d'IoU, on s'arrete et on enregistre le meilleur model pour l'utiliser dans l'étape de test.

In [None]:
import torch
from tqdm.auto import tqdm
import os

# --- Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 11  # géré par Early Stopping
learning_rate = 1e-4
best_val_iou = 0.0
patience = 7  # Nombre d'époques à attendre sans amélioration
counter = 0

model = ResNetUNet(n_classes=1).to(device)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

def iou_score(outputs, masks, threshold=0.5):
    outputs = torch.sigmoid(outputs)
    outputs = (outputs > threshold).float()
    intersection = (outputs * masks).sum()
    union = outputs.sum() + masks.sum() - intersection
    return (intersection + 1e-6) / (union + 1e-6)

train_losses, val_losses = [], []
train_ious, val_ious = [], []

In [None]:

# --- Boucle d'entraînement ---
for epoch in range(num_epochs):
    # --------- Entraînement ----------
    model.train()
    epoch_loss, epoch_iou = 0.0, 0.0

    train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]", leave=False)
    for images, masks in train_pbar:
        images, masks = images.to(device), masks.to(device).float()

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        current_iou = iou_score(outputs, masks).item()
        epoch_iou += current_iou
        train_pbar.set_postfix({'loss': f"{loss.item():.4f}", 'iou': f"{current_iou:.4f}"})

    train_losses.append(epoch_loss / len(train_loader))
    train_ious.append(epoch_iou / len(train_loader))

    # --------- Validation ----------
    model.eval()
    val_loss, val_iou = 0.0, 0.0
    torch.cuda.empty_cache() # Libère la mémoire pour la validation

    with torch.no_grad():
        val_pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]", leave=False)
        for images, masks in val_pbar:
            images, masks = images.to(device), masks.to(device).float()
            outputs = model(images)
            loss = criterion(outputs, masks)

            val_loss += loss.item()
            current_val_iou = iou_score(outputs, masks).item()
            val_iou += current_val_iou
            val_pbar.set_postfix({'val_loss': f"{loss.item():.4f}", 'val_iou': f"{current_val_iou:.4f}"})

    val_losses.append(val_loss / len(val_loader))
    val_ious.append(val_iou / len(val_loader))

    # Affichage des résultats de l'époque
    print(f"Epoch {epoch+1:02d}: Train Loss: {train_losses[-1]:.4f} | IoU: {train_ious[-1]:.4f} || Val Loss: {val_losses[-1]:.4f} | Val IoU: {val_ious[-1]:.4f}")

    # --------- Sauvegarde du meilleur modèle et Early Stopping ----------
    if val_ious[-1] > best_val_iou:
        best_val_iou = val_ious[-1]
        torch.save(model.state_dict(), 'best_model.pth')
        print(f" Nouveau meilleur modèle sauvegardé ! (IoU: {best_val_iou:.4f})")
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print(f"Early stopping à l'époque {epoch+1}. Le modèle ne progresse plus.")
            break

Le modèle améliore progressivement ses performances en IoU sur l’ensemble d’entraînement.
La performance sur le jeu de validation atteint un maximum à l’epoch 7 (IoU = 0.37), avant de décroître, indiquant un début d’overfitting.
Le modèle retenu est donc celui de l’epoch 7.

In [None]:
# On récupère le meilleur model qui est stocké sur le STAGE.
session.file.get("@MY_FISSURE_STAGE/models/best_model.pth", os.getcwd())
print("Fichier best_model.pth qui contient le model est récupéré du stage !")

J'avais une erreur sur le fichier 'best_model.pth.gz' qui est compressé, donc lors d'initaliser l'architecture et charger les poids, j'ai eu une erreur de type ( No such file or directory: 'best_model.pth')

Décompresser le fichier :

In [None]:
import gzip
import shutil

# Décompression du fichier best_model.pth pour l'utiliser
with gzip.open('best_model.pth.gz', 'rb') as f_in:
    with open('best_model.pth', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

print("Fichier décompressé !")

In [None]:
# Charger dans PyTorch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 1. Créer une instance vierge du modèle
model_test = ResNetUNet(n_classes=1).to(device)


 # 2. Initialiser l'architecture et charger les poids
model_test.load_state_dict(torch.load("best_model.pth", map_location=device))
model_test.eval()
print("Modèle chargé depuis le Stage Snowflake et pret ")

## **Faire le test pour évaluer notre MODEL :**

### **Visualisation qualitative des résultats**

Analyse visuelle
des prédictions du modèle afin d’évaluer sa capacité à détecter correctement
les fissures sur des images jamais vues.

Pour cela, nous affichons :
- l’image originale
- le masque réel
- le masque prédit par le modèle

Chaque prédiction est accompagnée de son score **IoU** individuel.



In [None]:
import matplotlib.pyplot as plt
import numpy as np

def visualize_results(model, loader, num_samples=4):
    model.eval()
    images, masks = next(iter(loader))

    # Inférence
    with torch.no_grad():
        outputs = model(images.to(device))
        preds = torch.sigmoid(outputs) > 0.5 # Seuil de détection

    # Préparation de l'affichage
    plt.figure(figsize=(15, 4 * num_samples))

    for i in range(num_samples):
        # Image originale (On dé-normalise si nécessaire ou on utilise permute)
        plt.subplot(num_samples, 3, i*3 + 1)
        img = images[i].permute(1, 2, 0).cpu().numpy()
        plt.imshow(img)
        plt.title(f"Image test {i+1}")
        plt.axis('off')

        # Masque Réel (Ground Truth)
        plt.subplot(num_samples, 3, i*3 + 2)
        plt.imshow(masks[i].squeeze(), cmap='gray')
        plt.title("Fissures réelles ")
        plt.axis('off')

        # Prédiction de l'IA
        plt.subplot(num_samples, 3, i*3 + 3)
        plt.imshow(preds[i].cpu().squeeze(), cmap='jet')
        # Calcul de l'IoU individuel pour cette image
        iou = iou_score(outputs[i:i+1], masks[i:i+1].to(device))
        plt.title(f"Détection IA (IoU: {iou:.4f})")
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Exécuter la visualisation
visualize_results(model_test, test_loader)

### Évaluation quantitative finale

Afin de mesurer précisément la performance du modèle, une évaluation
pixel par pixel est réalisée sur l’ensemble du jeu de test.

Les métriques calculées sont :
- Précision
- Rappel
- F1-score
- Matrice de confusion globale

Cette analyse permet de quantifier la capacité du modèle à distinguer
les zones fissurées du béton sain.

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

def compute_final_metrics(model, loader):
    model.eval()
    y_true = []
    y_pred = []

    print("Analyse des 1130 images en cours...")

    with torch.no_grad():
        for images, masks in loader:
            outputs = model(images.to(device))
            # On transforme les probabilités en 0 ou 1 (seuil 0.5)
            preds = (torch.sigmoid(outputs) > 0.5).float()

            # On aplatit tout pour comparer pixel par pixel
            y_true.extend(masks.view(-1).cpu().numpy())
            y_pred.extend(preds.view(-1).cpu().numpy())

    # 1. Calcul de la Matrice de Confusion
    cm = confusion_matrix(y_true, y_pred)

    # 2. Affichage des statistiques principales
    print("\n--- RAPPORT DE PERFORMANCE PIXEL ---")
    print(classification_report(y_true, y_pred, target_names=['Béton Sain', 'Fissure']))

    # 3. Visualisation de la Matrice de Confusion
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Sain (Prédit)', 'Fissure (Prédit)'],
                yticklabels=['Sain (Réel)', 'Fissure (Réel)'])
    plt.title("Matrice de Confusion (Total des Pixels)")
    plt.show()

# Lancer le calcul final
compute_final_metrics(model_test, test_loader)

Le jeu de données de test fourni sur Snowflake (1 130 images) était déséquilibré et ne contenait que des images de béton sain (Support Fissure = 0). Les métriques de test montrent une spécificité élevée (détection du béton sain), mais pour évaluer réellement la détection des fissures, j'ai dû effectuer des tests manuels sur le jeu d'entraînement où le modèle a montré d'excellents résultats visuels.

### **Vérification sur le jeu d’entraînement**

Une dernière visualisation est réalisée sur le jeu d’entraînement afin
de vérifier que le modèle a bien appris à détecter les fissures connues.

Cette étape permet de :
- confirmer la capacité du modèle à apprendre.
- identifier un éventuel sur-apprentissage

In [None]:
# On utilise le TRAIN_LOADER car on sait qu'il contient des fissures
print("Test sur le jeu d'entraînement pour vérifier notre model")
visualize_results(model_test, train_loader)