In [None]:
import os
import shutil

import pandas as pd
import kagglehub
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
import mlflow
from mlflow.models.signature import infer_signature
from mlflow.data.pandas_dataset import PandasDataset
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from dotenv import load_dotenv
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import torchxrayvision as xrv
import torchvision.transforms as transforms
import numpy as np
import torch

from functions.image.img_analysis import get_images_info
from functions.image.img_treatment import prepare_data
from functions.visualizations.img_show import show_images
from functions.visualizations.show_predictions import show_predictions
from functions.data import get_train_test
import functions.models as custom_models
from functions.mlflow import get_run_name
from settings import params, model_name, folders, added_layers, experiment_name, testing_cycle


# Loads the MLFlow credentials
load_dotenv()

In [None]:
# Download latest version of dataset and store it locally
if not os.path.exists("data"):
    path = kagglehub.dataset_download("paultimothymooney/chest-xray-pneumonia")

    target_dir = "data"
    os.makedirs(target_dir, exist_ok=True)
    shutil.move(path, target_dir)
    removable_folder_paths = ("data/chest_xray/", "data/2/chest_xray/chest_xray", "data/2/chest_xray/__MACOSX")
    for folder_path in removable_folder_paths:
        if os.path.exists(folder_path) and os.path.isdir(folder_path):
            shutil.rmtree(folder_path)

In [None]:
# Analyser les images
df_train_images = get_images_info(folders["train"]["input"])
df_train_valid = df_train_images[df_train_images["error"] == False]

# Identifier les images extrêmes
extremes = pd.concat([
    df_train_valid.loc[df_train_valid["width"].idxmax()],
    df_train_valid.loc[df_train_valid["width"].idxmin()],
    df_train_valid.loc[df_train_valid["height"].idxmax()],
    df_train_valid.loc[df_train_valid["height"].idxmin()],
    df_train_valid.loc[df_train_valid["aspect_ratio"].idxmax()],
    df_train_valid.loc[df_train_valid["aspect_ratio"].idxmin()],
], axis=1).T.drop_duplicates()

# Aperçu
print(df_train_images.head())
# Statistiques
print(df_train_images.describe())
# Affichage en tableau
print("\nImages aux valeurs extrêmes :")
print(extremes.sort_values(by=["width", "height"], ascending=[False, False]))

# Effectif de chaque catégorie d'entrainement: le dataset est déséquilibré
split_path = "data/2/chest_xray/train"
for category in ["NORMAL", "PNEUMONIA"]:
    category_path = split_path + '/' + category
    count = len([
            f for f in os.listdir(category_path)
        ])
    print(f"{category}: {count} images")

# Affichage des images extrêmes
show_images(extremes.reset_index(drop=True), folder_path=folders["train"]["input"])

In [None]:
# Transformation des images
import json

config_path = os.path.join("data/processed", "data_config.json")

current_config = {
    "rgb": params["rgb"],
    "img_size": params["img_size"]
}

if os.path.exists(config_path):
    with open(config_path, "r") as f:
        previous_config = json.load(f)
    if previous_config != current_config:
        for folder in folders:
            prepare_data(folders[folder]["input"], folders[folder]["output"])
else:
    for folder in folders:
            prepare_data(folders[folder]["input"], folders[folder]["output"])

with open(config_path, "w") as f:
        json.dump(current_config, f)

In [None]:
# Préparation des données de travail
X_train, y_train = get_train_test(folders["train"]["output"])
X_test, y_test = get_train_test(folders["test"]["output"])
X_val, y_val = get_train_test(folders["val"]["output"])

# Rééquilibrage du dataset
class_weight_dict = {0: 1.0, 1: 1.0}
if params["equilibrate"]:
    class_weights = compute_class_weight(
        class_weight='balanced',
        classes=np.unique(y_train),
        y=y_train
    )
    class_weight_dict = dict(enumerate(class_weights))

In [None]:
# Echantillonnage pour vérification
# Sélectionner les indices des labels
indices_label_0 = [i for i, y in enumerate(y_train) if y == 0][:5]
indices_label_1 = [i for i, y in enumerate(y_train) if y == 1][:5]

# Fusionner les indices sélectionnés
selected_indices = indices_label_0 + indices_label_1

# Affichage
plt.figure(figsize=(15, 3))
for idx, i in enumerate(selected_indices):
    plt.subplot(1, 10, idx + 1)
    plt.imshow(X_train[i], cmap="gray")
    plt.title(f"Label: {y_train[i]}", fontsize=8)
    plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
# Preprocessing
X_train = X_train.astype("float32")
X_test = X_test.astype("float32")

# Normalisation : on divise les valeurs de pixels par 255 pour les ramener entre 0 et 1
X_train /= 255.0
X_test /= 255.0

#Transform y_train, y_test to the shape expected by the model
y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)
y_val = to_categorical(y_val, num_classes=2)

if params["data_augmentation"]:
    datagen = ImageDataGenerator(
        rotation_range=10,
        horizontal_flip=True,
        zoom_range=0.1,
        width_shift_range=0.1,
        height_shift_range=0.1,
    )

    datagen.fit(X_train)
    
# On n'aplatit pas maintenant car les convolution ont besoin de matrices

In [None]:
# Création du modèle
model = custom_models.initialize_model()
model.summary()

In [None]:
# Entrainement du modèle
if params["data_augmentation"]:
    model_info = model.fit(
        datagen.flow(X_train, y_train, batch_size=params["batch_size"]),
        epochs=params["epochs"],
        validation_data=(X_test, y_test),
        class_weight=class_weight_dict
    )
else:
    model_info = model.fit(
        X_train, y_train,
        epochs=params["epochs"],
        batch_size=params["batch_size"],
        validation_data=(X_test, y_test),
        class_weight=class_weight_dict
    )

In [None]:
# Matrice de confusion
y_test_classes = np.argmax(y_test, axis=1)
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

conf_matrix = confusion_matrix(y_test_classes, y_pred_classes)

labels = ["Normal", "Pneumonia"]
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=labels)

fig, ax = plt.subplots(figsize=(6,4))
disp.plot(cmap='GnBu', ax=ax) 
plt.title("Confusion matrix")
img_path = "confusion_matrix.png"
plt.savefig(img_path)
plt.show()
plt.close()

In [None]:
# Log results into MLFlow
results = model.evaluate(X_test, y_test, return_dict=True)
mlflow.set_tracking_uri("http://mlflow.cpetit.fr/")
experiment = mlflow.set_experiment(experiment_name=experiment_name)
run_name = get_run_name(experiment, params["model"])

weights_path = "model.weights.h5"
model.save_weights(weights_path)

with mlflow.start_run(run_name=run_name) as run:
    run_id = run.info.run_id

    # Log dataset
    mlflow.log_param("dataset_url", "https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia")
    mlflow.set_tag("dataset", "chest-xray-pneumonia")
    mlflow.set_tag("testing_cycle", testing_cycle)

    # Log parameters
    mlflow.log_params(params)
    for i, added_layer in enumerate(added_layers):
         mlflow.log_params({
            f"added_layer_{i}": added_layer
        })
    # Log training metrics
    for epoch in range(len(model_info.history['loss'])):
        for metric_name, values in model_info.history.items():
            mlflow.log_metric(f"training {metric_name}", values[epoch], step=epoch)
    # Log final metrics
    mlflow.log_metrics(
        results
    )
    # Log confusion matrix
    mlflow.log_artifact(img_path, artifact_path="confusion_matrix")
    # Log model
    mlflow.log_artifact(weights_path)
    mlflow.tensorflow.log_model(
            model=model,
            artifact_path=model_name,
            input_example=X_test[:5],
            signature = infer_signature(X_test, model.predict(X_test))
        )

    model_uri = f"runs:/{run_id}/{model_name}"
    mlflow.register_model(model_uri=model_uri, name=model_name)

In [None]:
# Load a model
model_name = input("Enter model name")
model_version = int(input("Enter model version"))
alias = "challenger"

model_uri = f"models:/{model_name}/{model_version}"

loaded_model = mlflow.tensorflow.load_model(model_uri)

y_pred = loaded_model.predict(X_val)
show_predictions(X_val, y_pred, y_true=y_val, class_names=["normal", "malade"], n_images=10)

In [None]:
# Modele plug and play
model = xrv.models.DenseNet(weights="densenet121-res224-all")
model.eval()  # mode évaluation (pas d'entraînement)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.481], std=[0.226])  # mean/std de torchxrayvision pour chest X-rays
])

def preprocess_batch(X_np):
    # X_np : numpy array shape (N, 224, 224, 1), dtype uint8 ou float

    # 1) Convertir en float32
    X_np = X_np.astype('float32')

    # 2) Normaliser : pixel/255 -> [0,1]
    X_np /= 255.0

    # 3) Normaliser avec mean/std
    X_np = (X_np - mean) / std

    # 4) Convertir en torch tensor et permuter les dimensions pour (N, C, H, W)
    X_tensor = torch.from_numpy(X_np).permute(0, 3, 1, 2)  # (N,1,224,224)

    return X_tensor

X_test_tensor = preprocess_batch(X_test)

model.eval()
with torch.no_grad():
    outputs = model(X_test_tensor)  # shape (N, 18)

pneumonia_scores = outputs[:, 7].cpu().numpy()
print(pneumonia_scores[:10])

In [None]:
# ID de ton experiment MLflow
experiment_id = "1"  # à remplacer par ton experiment id

client = MlflowClient()

# Récupérer tous les runs de l’experiment
runs = client.search_runs(experiment_ids=[experiment_id])

# Extraire l’accuracy et le numéro d’étape (step) de chaque run
for run in runs:
    run_id = run.info.run_id
    metrics = run.data.metrics
    # Exemple : accuracy et step
    accuracy = metrics.get("accuracy")
    step = metrics.get("step")  # ou une autre métrique ou tag
    
    if accuracy is not None:
        plt.plot(step, accuracy, marker='o', label=run.info.run_name)

plt.xlabel("Step")
plt.ylabel("Accuracy")
plt.title("Accuracy par step pour chaque run")
plt.legend()
plt.show()

In [None]:
experiments = mlflow.list_experiments()