# Livrable 2 - Groupe 1

## Contenu du livrable

Le but est de traiter un ensemble de photographies afin de les rendre mieux traitables par les algorithmes de Machine Learning. Le traitement à réaliser est une opération de débruitage. Ces algorithmes s'appuieront sur les auto-encodeurs à convolution, et les appliqueront pour améliorer la qualité de l'image.

1. Chargement des données provenant de l'EDA (livrable 1)
2. Création du dataset
3. Définition de l'autoencodeur (CAE)
4. Entrainement
5. Métriques


## Chargement des bibliothèques

In [None]:
import os
import gdown
import zipfile

import PIL
import imghdr
import pathlib
import numpy as np
import tensorflow as tf
from tensorflow import keras
from collections import Counter
import matplotlib.pyplot as plt
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

## Sélection de la source de données


Cette partie permet de récuperer le dataset depuis le drive afin d'automatiser la `pipeline`.  
L'objectif est d'utiliser un dataset commun au sein du groupe. 

Ce dernier à le dossier `Sketch` complété par des données trouvés dans les sources suivantes : [croquis](https://paperswithcode.com/dataset/sketch) || [visages réalistes](https://www.kaggle.com/datasets/arbazkhan971/cuhk-face-sketch-database-cufs/data)

Ce qui fait passer le dossier sketch de `606 visages` et `800 croquis` avec un ration de `43%`/`57%` entre visages/croquis à `1200 visages` et `3200 croquis` pour un ratio final équivalent.

In [None]:
# ID du fichier (extrait de l'URL)
file_id = "1PGTFqsXHRCXV3R2Rns6yz0QS26ihfgYM"
dataset_path = "dataset_livrable_2"
zip_path = dataset_path + ".zip"
extract_dir = pathlib.Path(zip_path).parent / dataset_path
reduce_dataset = True

if not os.path.exists(extract_dir):
    print(f"Le dossier '{extract_dir}' n'existe pas. Téléchargement en cours...")
    gdown.download(f"https://drive.google.com/uc?id={file_id}", zip_path, quiet=False)

    print(f"Extraction ZIP en cours...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    print(f"Extraction Zip terminée")
else:
    print(f"Le dossier '{extract_dir}' existe déjà. Téléchargement et extraction non nécessaires.")

data_dir = extract_dir
print(f"Dataset disponible dans : {data_dir}")

## Création du dataset

In [None]:
# categories = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]
# print(f"Catégories détectées : {categories}")

In [None]:
validation_split = 0.2
seed = 42

batch_size = 32
img_height = 128
img_width = 128

In [None]:
# Load the dataset using image_dataset_from_directory

train_set = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=validation_split,
    subset="training",
    seed=seed,
    image_size=(img_height, img_width),
    batch_size=batch_size,
    labels=None,
    shuffle=False
)

val_set = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=validation_split,
    subset="validation",
    seed=seed,
    image_size=(img_height, img_width),
    batch_size=batch_size,
    labels=None,
    shuffle=False
)

### Image dimensions

In [None]:
for images in val_set.take(1):
    print(f"Image dimensions: {images.shape}")
    break

## Visualisation

In [None]:
plt.figure(figsize=(15, 15))
for images in train_set.take(1):
    for i in range(5):
        ax = plt.subplot(1, 5, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title("Noisy Image")
        plt.axis("off")

## Préparation

In [None]:
def add_gaussian_noise(image, mean=0.0, stddev=70):
    """Applies Gaussian noise to an image."""
    noise = tf.random.normal(shape=tf.shape(image), mean=mean, stddev=stddev, dtype=tf.float32)
    noisy_image = tf.cast(image, tf.float32)# / 255.0
    noisy_image = noisy_image + noise
    noisy_image = tf.clip_by_value(noisy_image, 0.0, 255.0)
    return noisy_image

noisy_train_set = train_set.map(lambda x: add_gaussian_noise(x))
noisy_val_set = val_set.map(lambda x: add_gaussian_noise(x))

In [None]:
plt.figure(figsize=(15, 15))
for images in noisy_train_set.take(1):
    for i in range(5):
        ax = plt.subplot(1, 5, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title("Noisy Image")
        plt.axis("off")

## Performance & pre processing

In [None]:
# AUTOTUNE = tf.data.experimental.AUTOTUNE

# a_train_set = noisy_train_set.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
# a_val_set = val_set.cache().prefetch(buffer_size=AUTOTUNE)

for image in noisy_train_set.take(1):
    print(f"Image shape: {image.shape}")

for image in noisy_val_set.take(1):
    print(f"Image shape: {image.shape}")

## Modélisation

In [None]:
# Configurations principales de nos modèles
IMG_SIZE          = img_width
NB_EPOCHS_DENOISE = 100               # nombre epoch alogithme debruiter
BATCH_SIZE        = 128               # taille batch de traitement
SAV_MODEL_DENOISE = "denoiser.h5"     # sauvegarde du modele de debruitage
LATENT_DIM        = 32

## Encodeur

In [None]:
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D
from keras.models import Sequential
from keras.regularizers import l1_l2

# Create a Sequential model
encoder = Sequential([
    Input(shape=(IMG_SIZE, IMG_SIZE, 3)),
    # layers.Rescaling(1./255, input_shape=(img_height, img_width, 3)),
    Conv2D(32, (3, 3), activation='relu', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
    MaxPooling2D((2, 2)),
    layers.Dropout(0.2),  # Dropout after the first pooling layer
    Conv2D(64, (3, 3), activation='relu', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
    MaxPooling2D((2, 2), padding='same'),
    layers.Dropout(0.3),  # Dropout after the second pooling layer
    Conv2D(128, (3, 3), activation='relu', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
    MaxPooling2D((2, 2))
])

## Décodeur

In [None]:
from keras.models import Sequential

# Decoding #

# TODO =>=>=>=>=>=>=>=>=>=>=>=>=>=>=> drop out

# Create a Sequential model for the decoder
decoder = Sequential([
    Input(shape=encoder.output_shape[1:]),
    Conv2D(128, (3, 3), activation='relu', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
    UpSampling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
    UpSampling2D((2, 2)),
    Conv2D(32, (3, 3), activation='relu', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
    UpSampling2D((2, 2)),
    Conv2D(3, (1, 1), activation='sigmoid', padding='same', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
])


In [None]:
from tensorflow.keras.models import Model

class Autoencoder(Model):
  def __init__(self, latent_dim):
    super(Autoencoder, self).__init__()
    self.latent_dim = latent_dim   
    self.encoder = encoder
    self.decoder = decoder

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = Autoencoder(LATENT_DIM)

In [None]:
autoencoder.compile(optimizer='adam',
                    loss='binary_crossentropy')
# autoencoder.summary()

encoder.summary()
decoder.summary()

## Entrainement

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='./L1_model.keras', save_best_only=True)
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
# Train the model
import tensorflow as tf

paired_train_set = tf.data.Dataset.zip((noisy_train_set, train_set))
paired_val_set = tf.data.Dataset.zip((noisy_val_set, val_set))

paired_train_set = paired_train_set.map(lambda noisy, clean: (noisy / 255.0, clean / 255.0))
paired_val_set = paired_val_set.map(lambda noisy, clean: (noisy / 255.0, clean / 255.0))

# Train the autoencoder
history = autoencoder.fit(
    paired_train_set,
    epochs=NB_EPOCHS_DENOISE,
    batch_size=BATCH_SIZE,
    shuffle=True,
    validation_data=(paired_val_set),
    callbacks=[early_stopping, model_checkpoint, tensorboard_callback]
)

## Métriques

- Courbe d'apprentisssage
- Métrique
- Matrice de confusion

In [None]:
# Visualisation des pertes d'apprentissage (Train) et de validation (Test)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()

In [None]:
# Visualize noisy and denoised images side by side
plt.figure(figsize=(15, 15))
for noisy_images_batch, clean_images_batch in paired_val_set.take(1):
    for i in range(5):  # Display 5 images
        # Display noisy image
        ax = plt.subplot(2, 5, i + 1)
        noisy_image = noisy_images_batch[i].numpy() * 255.0  # Scale back to [0, 255]
        plt.imshow(noisy_image.astype("uint8"))
        plt.title("Noisy Image")
        plt.axis("off")

        # Display denoised image
        denoised_image = autoencoder(noisy_images_batch[i:i+1])  # Predict denoised image
        denoised_image = denoised_image[0].numpy() * 255.0  # Scale back to [0, 255]
        ax = plt.subplot(2, 5, i + 6)
        plt.imshow(denoised_image.astype("uint8"))
        plt.title("Denoised Image")
        plt.axis("off")