In [2]:
# %% [markdown]
# ## 1. Chargement des Bibliothèques
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model, callbacks
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import cv2
import os
import keras.backend as K
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

# Reproductibilité
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

In [6]:
# %% [markdown]
# ## 2. Paramètres et Configuration
IMG_SIZE = (256, 256)  # Ajuster selon vos données
BATCH_SIZE = 8
EPOCHS = 100
VAL_SPLIT = 0.2
TEST_SPLIT = 0.1
DATA_PATH = '/home/kevin/datasets/livrable2/processed'

In [10]:
# %% [markdown]
# ## 3. Chargement et Préparation des Données (Version Corrigée)

def load_data(data_path):
    # Vérification du chemin
    if not os.path.exists(data_path):
        raise ValueError(f"Le chemin {data_path} n'existe pas!")
    
    # Collecte et tri des fichiers pour assurer l'alignement
    noisy_files = sorted([f for f in os.listdir(data_path) if "noisy" in f.lower()])
    clean_files = sorted([f for f in os.listdir(data_path) if "clean" in f.lower()])
    
    # Validation des paires
    if len(noisy_files) != len(clean_files):
        raise ValueError("Nombre inégal de fichiers noisy/clean!")
    if not noisy_files:
        raise ValueError("Aucune image trouvée!")
    
    # Lecture des images avec vérification
    noisy_imgs, clean_imgs = [], []
    for nfile, cfile in zip(noisy_files, clean_files):
        noisy = cv2.imread(os.path.join(data_path, nfile))
        clean = cv2.imread(os.path.join(data_path, cfile))
        
        if noisy is None:
            print(f"Échec de lecture de {nfile}")
        if clean is None:
            print(f"Échec de lecture de {cfile}")
        
        if noisy is not None and clean is not None:
            noisy_imgs.append(noisy)
            clean_imgs.append(clean)
    
    print(f"{len(noisy_imgs)} paires chargées avec succès")
    return np.array(noisy_imgs), np.array(clean_imgs)

# Chargement avec vérification
try:
    X_noisy, X_clean = load_data(DATA_PATH)
except Exception as e:
    print(f"Erreur: {e}")
    raise

# Vérification des dimensions
assert X_noisy.shape == X_clean.shape, "Dimensions incohérentes entre X_noisy et X_clean!"

# Normalisation [0,1]
X_noisy = X_noisy.astype('float32') / 255.0
X_clean = X_clean.astype('float32') / 255.0

# Split adaptatif pour petits datasets
TOTAL_SIZE = len(X_noisy)
if TOTAL_SIZE < 100:
    # Stratégie pour datasets réduits
    TEST_SPLIT = max(1, int(0.1 * TOTAL_SIZE))
    VAL_SPLIT = max(1, int(0.2 * TOTAL_SIZE))
else:
    TEST_SPLIT = 0.1
    VAL_SPLIT = 0.2

# Split avec shuffle stratifié
X_train, X_temp, y_train, y_temp = train_test_split(
    X_noisy, X_clean, 
    test_size=(VAL_SPLIT + TEST_SPLIT), 
    random_state=SEED,
    shuffle=True
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp,
    test_size=TEST_SPLIT/(VAL_SPLIT + TEST_SPLIT), 
    random_state=SEED
)

print(f"\nSplit Final:")
print(f"- Train: {len(X_train)}")
print(f"- Val: {len(X_val)}")
print(f"- Test: {len(X_test)}")

Échec de lecture de noisy
Échec de lecture de clean
0 paires chargées avec succès


ValueError: test_size=2 should be either positive and smaller than the number of samples 0 or a float in the (0, 1) range

In [None]:
# %% [markdown]
# ## 4. Architecture de l'Auto-Encodeur (État de l'Art)
# Inspirée de U-Net et DnCNN avec mécanismes d'attention
def build_cae(input_shape=(256,256,3)):
    inputs = layers.Input(shape=input_shape)
    
    # Encoder
    x = layers.Conv2D(64, (3,3), activation='relu', padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2,2))(x)
    
    x = layers.Conv2D(128, (3,3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2,2))(x)
    
    # Bottleneck avec Attention
    x = layers.Conv2D(256, (3,3), activation='relu', padding='same')(x)
    x = layers.Attention()([x,x])  # Self-Attention
    
    # Decoder
    x = layers.UpSampling2D((2,2))(x)
    x = layers.Conv2DTranspose(128, (3,3), activation='relu', padding='same')(x)
    
    x = layers.UpSampling2D((2,2))(x)
    x = layers.Conv2DTranspose(64, (3,3), activation='relu', padding='same')(x)
    
    outputs = layers.Conv2D(3, (3,3), activation='sigmoid', padding='same')(x)
    
    model = Model(inputs, outputs)
    return model

In [None]:
# %% [markdown]
# ## 5. Métriques Personnalisées
def PSNR(y_true, y_pred):
    return tf.image.psnr(y_true, y_pred, max_val=1.0)

def SSIM(y_true, y_pred):
    return tf.image.ssim(y_true, y_pred, max_val=1.0)

In [None]:
# %% [markdown]
# ## 6. Compilation et Entraînement
model = build_cae()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
              loss='mse',
              metrics=[PSNR, SSIM])

# Callbacks
early_stop = callbacks.EarlyStopping(patience=15, restore_best_weights=True)
reduce_lr = callbacks.ReduceLROnPlateau(factor=0.5, patience=5)
checkpoint = callbacks.ModelCheckpoint("best_model.h5", save_best_only=True)

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=[early_stop, reduce_lr, checkpoint]
)

In [None]:
# %% [markdown]
# ## 7. Évaluation Quantitative
test_results = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_results[0]}, PSNR: {test_results[1]}, SSIM: {test_results[2]}")

In [None]:
# %% [markdown]
# ## 8. Visualisation des Résultats
def plot_results(n=5):
    plt.figure(figsize=(15,6))
    predictions = model.predict(X_test[:n])
    for i in range(n):
        plt.subplot(3, n, i+1)
        plt.imshow(X_test[i])
        plt.title("Noisy")
        
        plt.subplot(3, n, i+1+n)
        plt.imshow(predictions[i])
        plt.title("Denoised")
        
        plt.subplot(3, n, i+1+2*n)
        plt.imshow(y_test[i])
        plt.title("Original")
    plt.show()

plot_results()