# Skin Cancer Detection using MLCV

### Installer des bibliothèques et faire les imports nécessaires

In [None]:
%pip install tensorflow
%pip install matplotlib
%pip install numpy
%pip install pandas
%pip install pydot
%pip install pydot graphviz
%pip install scikit-learn

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import os
from tensorflow.keras import Input
from tensorflow.keras.applications import EfficientNetB0 # type: ignore #ingore the warning
from tensorflow.keras.layers import Concatenate, Input, Dense, Dropout, Flatten, GlobalAveragePooling2D # type: ignore #ingore the warning
from tensorflow.keras.models import Model, Sequential # type: ignore #ingore the warning
from tensorflow.keras.optimizers import Adam # type: ignore #ingore the warning
from tensorflow.keras.utils import plot_model # type: ignore #ingore the warning
from tensorflow.keras.preprocessing.image import load_img, img_to_array # type: ignore #ingore the warning
from tensorflow.keras.applications.efficientnet import preprocess_input # type: ignore #ingore the warning
from tensorflow.keras.callbacks import TensorBoard # type: ignore #ingore the warning
from tensorflow.keras.metrics import Precision, Recall # type: ignore #ingore the warning
from sklearn.model_selection import train_test_split



### Définition de la fonction de traçage de courbes

In [None]:
def display_training_curves(training, validation, title, subplot):
    """
    Titre: Afficher les courbes d'entraînement

    Description:
    Cette fonction trace les courbes d'entraînement et de validation pour une métrique donnée (par exemple, précision, perte) au fil des époques.
    Elle configure les sous-graphiques, personnalise l'apparence et ajoute des légendes et des étiquettes pour une meilleure visualisation.

    Arguments:
        training (liste ou tableau): Les points de données d'entraînement à tracer.
        validation (liste ou tableau): Les points de données de validation à tracer.
        title (str): Le titre du graphique, généralement le nom de la métrique tracée.
        subplot (int): L'index du sous-graphe à utiliser pour le graphique actuel (1 ou 2).

    Retour:
        Aucun
    """
    if subplot == 1:  # set up the subplots on the first call
        plt.subplots(figsize=(10, 10), facecolor='#F0F0F0')
        plt.tight_layout()
    plt.subplot(2, 1, subplot)  # S'assurer d'avoir un bon placement dans la grille (2x1)
    plt.gca().set_facecolor('#F8F8F8')  # change the background color
    plt.plot(training)
    plt.plot(validation)
    plt.title('model ' + title)
    plt.ylabel(title)
    plt.xlabel('epoch')
    plt.legend(['train', 'valid.'])

### Détecter le hardware à disposition (TPU, GPU ou CPU dans cet ordre de priorité)
La variable `strategy` détermine le matériel disponible (TPU, GPU ou CPU) et le nombre de répliques synchronisées (`strategy.num_replicas_in_sync`). Cette information sera utilisée pour adapter dynamiquement le `BATCH_SIZE` global, en le multipliant par le nombre de répliques, afin d'optimiser l'entraînement selon les ressources disponibles.

In [None]:
# Detect hardware
try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver() # TPU detection
except ValueError:
  tpu = None
#If TPU not found try with GPUs
  gpus = tf.config.experimental.list_logical_devices("GPU")
    
# Select appropriate distribution strategy for hardware
if tpu:
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  strategy = tf.distribute.experimental.TPUStrategy(tpu)
  print('Running on TPU ', tpu.master())  
elif len(gpus) > 0:
  strategy = tf.distribute.MirroredStrategy(gpus) # this works for 1 to multiple GPUs
  print('Running on ', len(gpus), ' GPU(s) ')
else:
  strategy = tf.distribute.get_strategy()
  print('Running on CPU')

# How many accelerators do we have ?
print("Number of accelerators: ", strategy.num_replicas_in_sync)

### Définition des paramètres globaux pour l'entrainement du modèle

In [None]:
# Chemins vers les fichiers TFRecord (désactivé dans cet exemple)
# TRAINING_FILENAMES = tf.io.gfile.glob(GCS_DS_PATH + '/tfrecords/train*')
# TEST_FILENAMES = tf.io.gfile.glob(GCS_DS_PATH + '/tfrecords/test*')

# Taille des lots adaptée au nombre de répliques (dispositifs GPU/TPU) disponibles

##################################
# rajout conidtion si GPU ou TPU #
##################################
BATCH_SIZE = 32 * strategy.num_replicas_in_sync  # Par exemple, 10 images par réplique

# Taille des images à utiliser (100x100 pixels)
IMAGE_SIZE = [100, 100]  # Taille utilisée pour le redimensionnement des images
imSize = 100             # Taille utilisée pour redimensionner les images dans les pipelines

# Optimisation automatique pour le préchargement des données
AUTO = tf.data.experimental.AUTOTUNE

# Nombre d'époques (itérations sur l'ensemble d'entraînement)
EPOCHS = 10  # Peut être ajusté en fonction des performances et de la convergence

# Définir l'entrée du modèle (couche d'entrée pour TensorFlow/Keras)
input_layer = Input(shape=(imSize, imSize, 3))  # Entrée avec une image RGB (3 canaux)

## Traitement des métadonnées, classifications en batch (train, validate, test) et création des dataset TensorFlow

### Chargement et prétraitement des métadonnées

In [None]:
# Chargement du fichier CSV contenant les métadonnées
metadata = pd.read_csv("./Data/train.csv")
if metadata.isnull().any().any():
    print("Des valeurs manquantes ont été détectées.")
    
# Ajouter les chemins complets des images
metadata['image_path'] = metadata['image_name'].apply(lambda x: f"./Data/train/{x}.jpg")

metadata['label'] = metadata['benign_malignant'].map({'benign': 0, 'malignant': 1})


# Supprimer les lignes avec des valeurs manquantes
metadata = metadata.dropna(subset=['label'])

print(f"Nombre total d'images disponibles : {len(metadata)}")
print(metadata['benign_malignant'].isnull().sum())
print(metadata['benign_malignant'].unique())
print(f"Total rows in original CSV: {metadata.shape[0]}")

In [None]:
# Affiche le nombre  d'images par classe
print(metadata['label'].value_counts())

# Prendre un échantillon aléatoire de 10 000 images
metadata = metadata.sample(n=1000, random_state=42)
print(metadata['label'].value_counts())
# Afficher la taille du nouvel échantillon
print(f"Nombre d'images après échantillonnage : {len(metadata)}")

### Division des données en ensembles d’entraînement, validation et test

À cette étape, les données sont divisées en trois ensembles distincts : **entraînement**, **validation**, et **test**, afin de préparer le modèle pour son entraînement et son évaluation. Tout d'abord, 70 % des données sont affectées à l'ensemble d'entraînement, qui sera utilisé pour ajuster les paramètres du modèle. Les 30 % restants sont répartis entre les ensembles de validation (20 % des données totales) et de test (10 % des données totales). La stratification est appliquée sur la variable `label` pour garantir que les proportions des classes (par exemple, "Malignant" et "Benign") sont similaires dans les trois ensembles. Cette méthode assure une représentation équilibrée des classes dans chaque ensemble, ce qui est essentiel pour éviter les biais dans l'évaluation des performances du modèle.

La stratification consiste à répartir les données de manière à ce que la proportion de chaque classe soit conservée dans les sous-ensembles créés (entraînement, validation, test). Cela garantit que les ensembles reflètent fidèlement la distribution initiale des classes, évitant les déséquilibres qui pourraient biaiser l'entraînement ou l'évaluation du modèle.

In [None]:
# Division des ensembles en entraînement (70%), validation (20%), et test (10%)
train_metadata, temp_metadata = train_test_split(
    metadata, 
    test_size=0.3,  # 30 % seront partagés entre validation et test
    stratify=metadata['label'],  # Assurer un équilibre des classes
    random_state=42
)

val_metadata, test_metadata = train_test_split(
    temp_metadata, 
    test_size=0.33,  # 1/3 des 30 % pour test, soit ~10 % au total
    stratify=temp_metadata['label'], 
    random_state=42
)

print(f"Entraînement : {train_metadata.shape[0]} exemples")
print(f"Validation : {val_metadata.shape[0]} exemples")
print(f"Test : {test_metadata.shape[0]} exemples")

####################################################################################
# ??? Ajouter la fonction de création des datasets pour TensorFlow juste après ??? #
####################################################################################

Vérifications sur les datasets splités.

In [None]:
print("Class distribution in training set:")
print(train_metadata['label'].value_counts(normalize=True))
print("Class distribution in validation set:")
print(val_metadata['label'].value_counts(normalize=True))
print("Class distribution in test set:")
print(test_metadata['label'].value_counts(normalize=True))

total_samples = train_metadata.shape[0] + val_metadata.shape[0] + test_metadata.shape[0]
print(f"Total samples: {total_samples} (original: {metadata.shape[0]})")

# Check pour des overlaps entre les sets de données (data leakage), retourne une erreur si overlap
assert len(set(train_metadata.index) & set(val_metadata.index)) == 0, "Overlap between train and validation sets!"
assert len(set(val_metadata.index) & set(test_metadata.index)) == 0, "Overlap between validation and test sets!"

### Définition des fonctions pour charger et prétraiter les données
À cette étape, le code prépare un pipeline de traitement des images pour l'entraînement du modèle. Les images sont chargées à partir de leurs chemins, redimensionnées à une taille standard de 100x100 pixels, normalisées (valeurs entre 0 et 1), et associées à leurs étiquettes (labels). Ces transformations sont encapsulées dans une fonction `load_image_and_label`, appliquée via `tf.data.Dataset` pour créer un ensemble de données TensorFlow optimisé. Enfin, les données sont divisées en lots et préchargées pour accélérer l'entraînement.

In [None]:
# Fonction pour charger et prétraiter une image
def load_image_and_label(image_path, label):
    """
    Charge une image depuis son chemin, applique des prétraitements
    (normalisation, redimensionnement), et retourne l'image et son étiquette.
    """
    # Charger l'image depuis son chemin
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)  # Décode une image JPEG en RGB
    
    # Normaliser les valeurs des pixels (entre 0 et 1)
    image = tf.cast(image, tf.float32) / 255.0
    
    # Redimensionner l'image à une taille standard
    image = tf.image.resize(image, [imSize, imSize])
    
    return image, label

# Fonction pour convertir un DataFrame Pandas en dataset TensorFlow
def create_tf_dataset(metadata_df, batch_size):
    """
    Convertit un DataFrame contenant les chemins des images et les labels 
    en un dataset TensorFlow optimisé pour l'entraînement.
    """
    # Extraction des colonnes nécessaires depuis le DataFrame
    image_paths = metadata_df['image_path'].values
    labels = metadata_df['label'].values
    
    # Créer un dataset TensorFlow à partir des chemins et des étiquettes
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    
    # Appliquer la fonction de chargement et de prétraitement à chaque image du dataset
    dataset = dataset.map(load_image_and_label, num_parallel_calls=AUTO)
    
    dataset = dataset.repeat()

    dataset = dataset.shuffle(1500)  # Mélange avec un tampon de 1500 éléments

    # Diviser en lots et activer le préchargement pour optimiser les performances
    dataset = dataset.batch(batch_size)
    dataset= dataset.prefetch(AUTO)

    
    return dataset

#### Test d'affichage de quelques images du dataset (depuis le pipeline TensorFlow)
Cette étape permet de vérifier si les images ont été correctement redimensionnées en prenant aléatoirement 5 images.

In [None]:
# # Visualiser 5 images aléatoires depuis le pipeline TensorFlow
# for image, label in create_tf_dataset(metadata.sample(frac=1).reset_index(drop=True), batch_size=1).take(5):
#     # Convertir le tenseur en tableau NumPy pour l'affichage
#     img_resized = image[0].numpy()
#     plt.imshow(img_resized)
#     # Afficher le label avec le mapping 0 -> Benign, 1 -> Malignant
#     plt.title(f"{'Benign' if label.numpy() == 0 else 'Malignant'}")
#     plt.axis('off')  # Supprimer les axes pour une meilleure visualisation
#     plt.show()

### Création des datasets TensorFlow

Cette étape transforme les sous-ensembles (entraînement, validation et test) en pipelines optimisés pour TensorFlow à l'aide de `create_tf_dataset`. Chaque dataset contient des images prétraitées (chargées, redimensionnées, et normalisées) associées à leurs labels, regroupées en lots de taille spécifiée (`BATCH_SIZE`). Ces datasets sont utilisés directement par le modèle lors de l'entraînement ou de l'évaluation pour assurer une gestion efficace des données.

In [None]:
# BATCH_SIZE = max(1, len(train_metadata) // 10)  # Adjust batch size

# Créer les datasets
training_dataset = create_tf_dataset(train_metadata, batch_size=BATCH_SIZE)
validation_dataset = create_tf_dataset(val_metadata, batch_size=BATCH_SIZE)
test_dataset = create_tf_dataset(test_metadata, batch_size=BATCH_SIZE)

print(f"Training dataset : {len(training_dataset)} batches")
print(f"Validation dataset : {len(validation_dataset)} batches")
print(f"Test dataset : {len(test_dataset)} batches")

### Calcul des paramètres pour l’entraînement

Les paramètres `STEPS_PER_EPOCH` et `VALIDATION_STEPS` déterminent le nombre de lots nécessaires pour parcourir une fois l'ensemble des données d'entraînement ou de validation. Ils sont calculés en divisant la taille totale des données par `BATCH_SIZE` pour garantir que le modèle traite toutes les données à chaque époque, tout en optimisant l'utilisation des ressources.

In [None]:
# à supprimer
print(f"Training samples: {len(train_metadata)}")
print(f"Validation samples: {len(val_metadata)}")
print(f"Batch size: {BATCH_SIZE}")

# print(f"Total training batches: {len(list(training_dataset))}")
# print(f"Total validation batches: {len(list(validation_dataset))}")

In [None]:
# Calcul des paramètres pour l'entraînement
# STEPS_PER_EPOCH = len(training_dataset) // BATCH_SIZE
# VALIDATION_STEPS = len(validation_dataset) // BATCH_SIZE 
steps_per_epochs = len(train_metadata) // BATCH_SIZE
validation_steps = len(val_metadata) // BATCH_SIZE

print(f"Nombre d'étapes pour l'entraînement : {steps_per_epochs}")
print(f"Nombre d'étapes pour la validation : {validation_steps}")

### Charger le modèle (sans couche de classification finale)

In [None]:
# Charger le modèle EfficientNetB0 préentraîné sans la couche de classification finale
with strategy.scope():
    base_model = tf.keras.Sequential([
        EfficientNetB0(
            input_shape=(imSize, imSize, 3),
            weights='imagenet',
            include_top=False
        )
    ])

# Geler les couches du modèle de base pour conserver les poids préentraînés
base_model.trainable = False

In [None]:
# Passer l'entrée à travers le modèle de base
x = base_model(input_layer, training=False)  # `input_layer` 
x = Flatten()(x)  # Aplatir les caractéristiques extraites

# Ajout de couches fully connected pour la classification
x = Dense(128, activation="relu")(x)  # Couche dense
x = Dropout(0.5)(x)  # Dropout pour régularisation
output = Dense(1, activation="sigmoid")(x)  # Sigmoid pour une classification binaire

# Définir le modèle final avec une seule entrée (image)
model = Model(inputs=input_layer, outputs=output)

### Compiler le modèle

In [None]:
# Calcul des poids des classes
# class_weights = {0: 1.0, 1: 9.0}  # Augmenter le poids de la classe "malin"
# Ajustez le poids en fonction de l'importance de chaque classe dans votre dataset

# Créez une fois les objets Precision et Recall
# precision = Precision()
# recall = Recall()

# def f1_score(y_true, y_pred):
#     """
#     Calcul du F1 score basé sur la précision et le rappel.
#     """
#     p = precision(y_true, y_pred)
#     r = recall(y_true, y_pred)
#     return 2 * ((p * r) / (p + r + tf.keras.backend.epsilon()))

model.compile(
    optimizer='adam',  # Optimiseur Adam
    loss="binary_crossentropy",          # Fonction de perte pour la classification binaire
    metrics=["accuracy"]  # Suivi de la précision
)

# Résumé du modèle
model.summary()

In [None]:
# Visualisation de l'architecture du modèle
# plot_model(model, show_shapes=True, to_file="efficientnetb0_model.png")

In [None]:
def learning_rate_function(epoch):
    LR_START = 0.001 # Taux d'apprentissage initial
    LR_MAX = 0.00005 * strategy.num_replicas_in_sync # Taux d'apprentissage maximal
    LR_MIN = 0.00001 # Taux d'apprentissage minimal
    LR_RAMPUP_EPOCHS = 5 # nombre d'époques pendant lesquelles le taux d'apprentissage augmente linéairement.
    LR_SUSTAIN_EPOCHS = 0 # nombre d'époques où le taux reste maximal
    LR_EXP_DECAY = .8 # taux de décroissance exponentielle du taux d'apprentissage après les périodes de "ramp-up" et de "soutien"
    

    # Augmentation (pour les premières LR_RAMPUP_EPOCHS époques) : le taux d'apprentissage commence à LR_START et monte linéairement jusqu'à LR_MAX
    if epoch < LR_RAMPUP_EPOCHS: 
        lr = (LR_MAX - LR_START) / LR_RAMPUP_EPOCHS * epoch + LR_START

    # Soutien (pour les LR_SUSTAIN_EPOCHS suivantes) : le taux d'apprentissage reste constant à LR_MAX
    elif epoch < LR_RAMPUP_EPOCHS + LR_SUSTAIN_EPOCHS:
        lr = LR_MAX
    # Décroissance (pour les époques restantes) : le taux d'apprentissage diminue exponentiellement
    else:
        lr = (LR_MAX - LR_MIN) * LR_EXP_DECAY**(epoch - LR_RAMPUP_EPOCHS - LR_SUSTAIN_EPOCHS) + LR_MIN
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(learning_rate_function, verbose=1)

In [None]:
log_dir = os.path.join("logs", "fit", "model_name")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

history = model.fit(training_dataset, steps_per_epoch=steps_per_epochs, epochs=EPOCHS,
                    validation_data=validation_dataset, validation_steps=validation_steps,callbacks=[lr_schedule,tensorboard_callback],)

# tensorboard --logdir=logs/fit pour lancer TensorBoard

### Affichage de l'évolution de "accuracy" et "loss" en fonction des époques

In [None]:
# Plot accuracy
plt.figure(figsize=(8, 4))
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()

# Plot loss
plt.figure(figsize=(8, 4))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.grid(True)
plt.show()

In [None]:
# Fonction pour charger les matrices Y, Cb, Cr à partir du fichier
def load_ycbcr_from_file(file_path):
    """
    Charger les matrices Y, Cb, Cr à partir d'un fichier texte.
    Les fichiers doivent contenir trois sections avec des matrices pour chaque composante.
    """
    with open(file_path, 'r') as f:
        lines = f.readlines()

    # Identifier les sections et extraire les matrices
    y_start = lines.index("Matrice de luminance (Y):\n") + 1
    cb_start = lines.index("Matrice de chrominance (Cb):\n") + 1
    cr_start = lines.index("Matrice de chrominance (Cr):\n") + 1

    y = np.loadtxt(lines[y_start:cb_start - 1], dtype=np.float32)
    cb = np.loadtxt(lines[cb_start:cr_start - 1], dtype=np.float32)
    cr = np.loadtxt(lines[cr_start:], dtype=np.float32)

    return y, cb, cr

In [None]:
# Fonction pour préparer l'entrée pour le modèle
def prepare_ycbcr_input(y, cb, cr):
    """
    Prépare l'entrée pour le modèle à partir des matrices Y, Cb, Cr.
    """
    # Empiler les canaux Y, Cb, Cr pour former une image (100, 100, 3)
    image = np.stack([y, cb, cr], axis=-1)  # Shape: (100, 100, 3)

    # Normaliser l'image (en utilisant une normalisation simple, pas celle d'ImageNet)
    image = image / 255.0  # Normalisation simple entre 0 et 1

    # Ajouter la dimension du batch (1, 100, 100, 3)
    image = np.expand_dims(image, axis=0)

    return image

In [None]:
# Fonction de prédiction
def predict_skin_cancer(file_path):
    """
    Prédire à partir des matrices Y, Cb, Cr chargées depuis un fichier.
    """
    # Charger les matrices Y, Cb, Cr à partir du fichier
    y, cb, cr = load_ycbcr_from_file(file_path)

    # Préparer l'entrée pour le modèle
    ycbcr_input = prepare_ycbcr_input(y, cb, cr)


    # Prédiction
    prediction = model.predict(ycbcr_input)

    # Interprétation
    if prediction[0] > 0.5:
        return f"Maligne (probabilité {prediction[0][0]:.2f})"
    else:
        return f"Bénigne (probabilité {1 - prediction[0][0]:.2f})"

# Exemple d'utilisation
file_path = "./DataBase/Matrices/ISIC_0001120_matrices.txt"
result = predict_skin_cancer(file_path)
print(result)