![banniere_one](img/banniere.png)

# Classification Binaire de Pneumonie à partir de Radios Thoraciques

La **pneumonie** est une infection des poumons causée par des bactéries, des virus ou des champignons. Elle entraîne une inflammation des sacs aériens (alvéoles), qui peuvent se remplir de liquide ou de pus, provoquant des symptômes comme la toux, de la fièvre et des difficultés respiratoires.

Dans ce projet, nous entraînons un modèle de deep learning (MobileNetV3LArge) pour classifier automatiquement les radiographies thoraciques entre *normal* et *pneumonia*.


Ce notebook présente un projet de classification binaire de pneumonie à partir de radios thoraciques. Nous allons suivre les étapes suivantes :

1. Préparation et exploration du dataset
2. Mise en place du modèle pré-entraîné et adaptation
3. Entraînement du modèle
4. Évaluation du modèle
5. Suivi des expériences via MLflow
6. Documentation et dépôt sur GitHub

## 1 . Préparation et exploration du dataset

### Télécharger le dataset

Nous allons utiliser le dataset Chest X-Ray Pneumonia. Assurez-vous que les images sont organisées dans des sous-dossiers `NORMAL` et `PNEUMONIA` dans le répertoire `data/train`, `data/test`, et `data/val`.

 

### Préparation des dépendances python et des configurations nécessaires

Ici nous allons rélaiser les différents import nécessaires pour le projet. il faudra avoir installé les dépendances avec le ```pip install -r  requirements.txt``` sinon rien ne fonctionnera. 

In [None]:
# Import des dépendances python nécessaire 

import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import cv2
import seaborn as sns
import random
import subprocess
import requests
import time

from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, average_precision_score,
                             log_loss, confusion_matrix, classification_report)
from sklearn.utils import class_weight
from sklearn.preprocessing import StandardScaler

import mlflow
from mlflow.models.signature import infer_signature

from tensorflow.keras.applications import MobileNetV3Large
from tensorflow.keras.applications.mobilenet_v3 import preprocess_input

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam

np.set_printoptions(linewidth=np.inf)
np.set_printoptions(edgeitems=30) 

Cette fonctionnalité permet de lancer mlflow ui sans avoir besoin de le lancer dans votre terminal il faut juste que vous soyez dans votre environnement python(virtuel ou non ) possédant votre mlflow 

In [None]:
# Lancement et vérification MlFlow 

# Vérifier si le serveur MLflow est en cours d'exécution
try:
    response = requests.get("http://localhost:5000")
    response.raise_for_status()  # Cela lèvera une exception si le serveur n'est pas accessible
    print("Le serveur MLflow UI est déjà en cours d'exécution.")
except requests.exceptions.RequestException:
    print("Démarrage du serveur MLflow UI...")
    # Lancer le serveur MLflow UI dans un sous-processus
    subprocess.Popen(["mlflow", "ui"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    # Attendre un peu pour s'assurer que le serveur a le temps de démarrer
    time.sleep(7)

# Vérifier si une exécution est déjà en cours
active_run = mlflow.active_run()
if active_run is not None:
    print(f"Une exécution est déjà en cours : {active_run.info.run_id}. Fin de l'exécution.")
    mlflow.end_run()  # Terminer l'exécution active

# Remplacez par l'ID de l'expérience que vous souhaitez supprimer


# MLflow tracking
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("Pneumonia_Classification_MobileNet_V3_Large")
mlflow.start_run()

## Exploration des données 

Ici nous allons simplement affciher une radio pour vérifier qu'on arrive à la lire. 

In [None]:
# On charge une image depuis un chemin local, l’image est chargé en 1 canal (valeur de pixel de 0 à 255)
img = cv2.imread("data/train/NORMAL/IM-0115-0001.jpeg", cv2.IMREAD_COLOR)

# Observer l'image sous forme de matrice
#print(img)

# Visualisation de l'image
plt.imshow(img)
plt.axis("off")
plt.show()

# Inspection des métadonnées de l'image
print(f"Shape (dimensions)    : {img.shape}")
print(f"Type des valeurs      : {img.dtype}")


dans l'exemple ci-dessus nous avons effectivement une image affichée donc nous pouvons continuer. 

### Prétraitement des images
Ici nous allons trasnformer nos images pour qu'elles soit conformer à la taille recommandé voulue par notre modèle et en format couleur.
nous allons ensuite les normaliser c'est à dire les convertir dans un format lisible pour notre modèle(ici: MobileNetV3Large)

In [None]:
def load_data(data_dir, img_read_type: int, target_size=(224, 224)):
    """
    Charge les images et les étiquettes depuis un répertoire structuré en sous-dossiers 'NORMAL' et 'PNEUMONIA'.

    Parameters
    ----------
    data_dir : str
        Chemin du dossier contenant deux sous-dossiers : 'NORMAL' et 'PNEUMONIA', chacun contenant les images correspondantes.
    img_read_type : int
        Mode de lecture des images pour OpenCV (par exemple cv2.IMREAD_GRAYSCALE ou cv2.IMREAD_COLOR).
    target_size : tuple (int, int), default=(224, 224)
        Dimensions finales souhaitées pour les images après redimensionnement.

    Returns
    -------
    X : 
        Tableau des images chargées et redimensionnées, converties en float32.
        La forme dépend du mode de lecture (grayscale ou couleur).
    
    y : 
        Tableau des labels (0 pour 'NORMAL', 1 pour 'PNEUMONIA').
    """

    X = []
    y = []
    label_map = {'NORMAL': 0, 'PNEUMONIA': 1}

    for label_name in ['NORMAL', 'PNEUMONIA']:
        class_dir = os.path.join(data_dir, label_name)
        for filename in os.listdir(class_dir):
            if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                filepath = os.path.join(class_dir, filename)
                img = cv2.imread(filepath, img_read_type)
                if img is None:
                    continue  # image illisible, on passe
                img = cv2.resize(img, target_size)
                X.append(img)
                y.append(label_map[label_name])

    X = np.array(X, dtype=np.float32)
    y = np.array(y, dtype=np.float32)
    return X, y

X_train, y_train = load_data("data/train", cv2.IMREAD_COLOR)
X_test, y_test = load_data("data/test", cv2.IMREAD_COLOR)
X_valid, y_valid= load_data("data/val", cv2.IMREAD_COLOR)

Nos images sont maintenant maintenant conforme et nous allons les normaliser avec ImageDataGenerator

In [None]:
# Créer un générateur d'images avec normalisation
datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Créer des générateurs à partir des tableaux NumPy
train_generator = datagen.flow(X_train, y_train, batch_size=32)
valid_generator = datagen.flow(X_valid, y_valid, batch_size=32)
test_generator = datagen.flow(X_test, y_test, batch_size=32,shuffle=False)

In [None]:
def compute_class_weights(generator, num_batches=10):
    """
    Calcule les poids de classe pour un ensemble d'étiquettes à partir d'un générateur.

    Parameters
    ----------
    generator : ImageDataGenerator
        Générateur d'images qui produit des lots d'images et d'étiquettes.
    num_batches : int
        Nombre de lots à utiliser pour le calcul des poids de classe.

    Returns
    -------
    dict
        Dictionnaire des poids de classe.
    """
    y = []
    # Limiter le nombre de lots à itérer pour éviter une boucle infinie
    for _ in range(num_batches):
        try:
            _, labels = next(generator)  # Obtenir le prochain lot
            y.extend(labels)
        except StopIteration:
            break  # Sortir si le générateur est épuisé

    y = np.array(y)

    # Calculer les poids de classe
    class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y), y=y)
    
    # Créer un dictionnaire de poids
    class_weight_dict = {i: weight for i, weight in enumerate(class_weights)}
    
    return class_weight_dict

# Calculer les class_weights à partir du train_generator
class_weights = compute_class_weights(train_generator)


une fois les images préparé on vérfie que nous pouvons lire les images, on va prendre un échantillon de 10 images 

In [None]:
# Test images aléatoires

# Obtenir un lot d'images et d'étiquettes depuis test_generator
images, labels = next(iter(train_generator))

# Mapping des classes
label_name = {0: "Normal", 1: "Pneumonia"}

# Choisir 10 indices aléatoires dans le batch
num_images_to_display = 10
indices = random.sample(range(len(images)), num_images_to_display)

# Créer une figure pour afficher les images
plt.figure(figsize=(15, 10))

# Affichage des images
for i, idx in enumerate(indices):
    plt.subplot(2, 5, i + 1)  # 2 lignes, 5 colonnes
    plt.imshow((images[idx] / 255.0).astype(np.float32))  # /255 pour rendre l’image lisible
    plt.title(label_name[labels[idx]])  # Afficher le nom de la classe
    plt.axis("off")

plt.tight_layout()  # Ajuster l'espacement entre les sous-graphes
plt.show()


les images sont corects, on continue 

# Création modele 

Nous allons créer notre modèle de données pour faire des predictions nous allons faire deux entrainement 

In [None]:
base_model = MobileNetV3Large(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
features = base_model.output
features = GlobalAveragePooling2D()(features)
features = Dropout(0.3)(features)
output = Dense(1, activation='sigmoid')(features)

model = Model(inputs=base_model.input, outputs=output)

# Geler les couches du modèle de base
for layer in base_model.layers:
    layer.trainable = False

In [None]:
# Compiler le modèle
model.compile(optimizer=Adam(learning_rate=1e-3), loss='binary_crossentropy', metrics=['accuracy'])

# Early stopping pour éviter l'overfitting
early_stop = EarlyStopping(patience=4, restore_best_weights=True)

# Enregistrer les paramètres
mlflow.log_param("model", "MobileNetV3Large")
mlflow.log_param("phase", "freeze")
mlflow.log_param("epochs", 8)
mlflow.log_param("batch_size", 32)
mlflow.log_param("optimizer", "adam")
mlflow.log_param("loss", "categorical_crossentropy")
mlflow.log_param("learning_rate", 0.001) 
mlflow.log_param("trainable_layers", sum([layer.trainable for layer in model.layers]))

history = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // 32,
    validation_data=valid_generator,
    validation_steps=len(X_valid) // 32,
    epochs=8,
    class_weight=class_weights,
    callbacks=[early_stop]
)

# Évaluation modele avant fine_tuning 
loss, acc = model.evaluate(test_generator, steps=len(X_test) // 32)
mlflow.log_metric("test_loss", loss)
mlflow.log_metric("test_accuracy", acc)
print(f"Test accuracy: {acc*100:.2f}%")
print(f"Pertes : {loss}")


On vérifie avec ce graphique 

In [None]:
plt.plot(history.history['accuracy'], label='Train acc')
plt.plot(history.history['val_accuracy'], label='Val acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("MobileNetV3LargeFreeze - Accuracy")
plt.legend()
plt.grid(True)
plt.savefig("graph/accuracy.png")  # Sauvegarder la figure
mlflow.log_artifact("graph/accuracy.png")  # Enregistrer l'artefact dans MLflow
plt.show()

Ici nous allons "degeler" notre modele, ce principe permet à notre modele d'être plus performant et précis, nous allons dans ce cas-ci "dégeler" les 30 dernières couches 

In [None]:
# Défreezer les 20 dernières couches du modèle 
for layer in base_model.layers[-20:]:
    layer.trainable = True

# Recompiler avec un learning rate plus petit
model.compile(optimizer=Adam(learning_rate=1e-5),
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [None]:
# Nouveau early stopping
early_stop_fine = EarlyStopping(patience=3, restore_best_weights=True)

# Enregistrer les paramètres
mlflow.log_param("phase_2", "unfreeze")
mlflow.log_param("unfreeze_epochs", 8)
mlflow.log_param("unfreeze_batch_size", 32)
mlflow.log_param("unfreeze_optimizer", "adam")
mlflow.log_param("unfreeze_loss", "categorical_crossentropy")
mlflow.log_param("unfreeze_learning_rate", 0.00001)  # Si vous utilisez un learning rate spécifique
mlflow.log_param("unfreeze_trainable_layers", sum([layer.trainable for layer in model.layers]))

history_finetune = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // 32,
    validation_data=valid_generator,
    validation_steps=len(X_valid) // 32,
    epochs=8,
    class_weight=class_weights,
    callbacks=[early_stop_fine]
)

# Évaluation
fine_tuned_loss, fine_tuned_acc = model.evaluate(test_generator, steps=len(X_test) // 32)
mlflow.log_metric("fine_tuned_accuracy", fine_tuned_acc)
mlflow.log_metric("fine_tuned_loss", fine_tuned_loss)
print(f"Test accuracy: {fine_tuned_acc*100:.2f}%")
print(f"Pertes : {loss}")

In [None]:
# Test images aléatoires

# Obtenir un lot d'images et d'étiquettes depuis test_generator
images, labels = next(iter(test_generator))

# Mapping des classes
label_name = {0: "Normal", 1: "Pneumonia"}

# Choisir 10 indices aléatoires dans le batch
num_images_to_display = 10
indices = random.sample(range(len(images)), num_images_to_display)

# Créer une figure pour afficher les images
plt.figure(figsize=(15, 10))

# Affichage des images
for i, idx in enumerate(indices):
    plt.subplot(2, 5, i + 1)  # 2 lignes, 5 colonnes
    plt.imshow((images[idx] / 255.0).astype(np.float32))  # /255 pour rendre l’image lisible
    plt.title(label_name[labels[idx]])  # Afficher le nom de la classe
    plt.axis("off")

plt.tight_layout()  # Ajuster l'espacement entre les sous-graphes
plt.show()


Vérification avec le fine-tune nous observons que 

In [None]:
plt.plot(history_finetune.history['accuracy'], label='Train acc')
plt.plot(history_finetune.history['val_accuracy'], label='Val acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("MobileNetV3LargeUnFreeze - Accuracy")
plt.legend()
plt.grid(True)
plt.savefig("graph/finetune_accuracy.png")  # Sauvegarder la figure
mlflow.log_artifact("graph/finetune_accuracy.png")  # Enregistrer l'artefact dans MLflow
plt.show()

# Graphique et Enregistrement MlFlow 

Ici nous allons faire des tests avec différents graphiques pour tester notre modele et vérifier sa précision, le paramétre le plus important ici est le recall, plus un recall est proche de 1 plus le modéle est précis, nous vérifions également d'autres graphiques 


In [None]:
# Prédictions probabilistes (sorties sigmoid)
y_true = test_generator.y
y_probs = model.predict(test_generator, steps=len(test_generator))
y_pred = (y_probs > 0.5).astype("int32") 

accuracy_score_metric=accuracy_score(y_true, y_pred)
precision_score_metric=precision_score(y_true, y_pred)
recall_score_metric=recall_score(y_true, y_pred)
f1_score_metric=f1_score(y_true, y_pred)
roc_auc_score_metric=roc_auc_score(y_true, y_probs)
average_precision_score_metric= average_precision_score(y_true, y_probs)
log_loss_metric=log_loss(y_true, y_probs)

# Calcul des métriques 
mlflow.log_metric("accuracy", round(accuracy_score_metric,3))
mlflow.log_metric("precision", round(precision_score_metric,3))
mlflow.log_metric("recall", round(recall_score_metric,3))
mlflow.log_metric("f1_score", round(f1_score_metric,3))
mlflow.log_metric("roc_auc", round(roc_auc_score_metric,3))
mlflow.log_metric("average_precision", round(average_precision_score_metric,3))
mlflow.log_metric("log_loss", round(log_loss_metric,3))

# Obtenir les classes réelles à partir du générateur
y_test = np.concatenate([test_generator[i][1] for i in range(len(test_generator))])

# Calculer la matrice de confusion
conf_matrix = confusion_matrix(y_test, y_pred)
print("Matrice de confusion :")
print(conf_matrix)

# Calculer le rapport de classification
class_report = classification_report(y_test, y_pred, target_names=['NORMAL', 'PNEUMONIA'])
print("Rapport de classification :")
print(class_report)

# Visualiser la matrice de confusion
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['NORMAL', 'PNEUMONIA'], yticklabels=['NORMAL', 'PNEUMONIA'])
plt.ylabel('Vérité terrain')
plt.xlabel('Prédictions')
plt.title('Matrice de confusion')
plt.savefig("graph/confuse_matrix.png")
mlflow.log_artifact("graph/confuse_matrix.png")
plt.show()

In [None]:
# ROC curve
fpr, tpr, _ = roc_curve(y_test, y_probs)
roc_auc = auc(fpr, tpr)


plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f"ROC curve (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel("Taux de faux positifs")
plt.ylabel("Taux de vrais positifs")
plt.title("Courbe ROC")
plt.legend(loc="lower right")
plt.grid(True)
plt.savefig("graph/roc_curve.png")
mlflow.log_artifact("graph/roc_curve.png")

# Precision-Recall curve
precision, recall, _ = precision_recall_curve(y_test, y_probs)
avg_precision = average_precision_score(y_test, y_probs)

plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f"Average Precision = {avg_precision:.2f}")
plt.xlabel("Rappel")
plt.ylabel("Précision")
plt.title("Courbe Précision-Rappel")
plt.legend(loc="lower left")
plt.grid(True)
plt.savefig("graph/precision_recall_curve.png")
mlflow.log_artifact("graph/precision_recall_curve.png")

# Rapport de classification
#report = classification_report(y_test, y_pred, target_names=['Normal', 'Pneumonia'], output_dict=False)
#print(report)


In [None]:
# Obtenir un lot d'images et d'étiquettes depuis test_generator
images, labels = next(iter(test_generator))

# Mapping des classes
label_name = {0: "Normal", 1: "Pneumonia"}

# Choisir un indice aléatoire dans le batch
i = random.randint(0, len(images) - 1)

# Prédiction
proba = model.predict(np.expand_dims(images[i], axis=0))[0][0]
predicted_class = int(proba > 0.5)

# Affichage de l’image
plt.imshow((images[i] / 255.0).astype(np.float32))  # /255 pour rendre l’image lisible
plt.title(f"Prédiction : {label_name[predicted_class]} ({proba:.2f})")
plt.axis("off")
plt.show()


# Test Mlflow

In [None]:
# On récupère un mini-batch depuis le générateur
X_batch, _ = train_generator[0]  # train_generator[0][0] = images, [0][1] = labels
#example_input = X_batch[0:1]     # On prend une seule image, format (1, H, W, 3)
example_input = np.random.rand(1, 224, 224, 3).astype(np.float32)
# Inférer automatiquement la signature d'entrée/sortie
signature = infer_signature(example_input, model.predict(example_input))

mlflow.keras.log_model(
    model,
    artifact_path="model",
    signature=signature
)

# Terminer l'exécution
mlflow.end_run()