In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.layers import Dropout 
from tensorflow.keras import backend
from tensorflow.keras.initializers import HeNormal,GlorotUniform
import numpy as np
import pandas as pd
from traitement_image import image_processing
from utils import load_parquet_files_cond_red_save, wasserstein_loss, generator_loss, generate_batches_cond, generate_cond, Conv2DCircularPadding, load_parquet_files_cond_red, newTorchSign, transform_batch
import matplotlib.pyplot as plt
import os

In [None]:
class NoiseInjection(layers.Layer):
    def __init__(self, stddev=0.1, **kwargs):
        super(NoiseInjection, self).__init__(**kwargs)
        self.stddev = stddev  # Écart-type du bruit gaussien

    def build(self, input_shape):
        # Cette méthode est utilisée pour créer les poids de la couche, si nécessaire.
        # Dans ce cas, nous n'avons pas de poids à créer.-
        super(NoiseInjection, self).build(input_shape)

    def call(self, inputs, training=None):
        if training:
            # Générer du bruit gaussien avec la même forme que les entrées
            noise = tf.random.normal(shape=tf.shape(inputs), mean=0.0, stddev=self.stddev)
            # Ajouter le bruit aux entrées
            return inputs + noise
        return inputs  # Ne pas ajouter de bruit pendant l'inférence

    def get_config(self):
        # Permet de sérialiser la couche pour la sauvegarde et le chargement
        config = super(NoiseInjection, self).get_config()
        config.update({'stddev': self.stddev})
        return config

In [None]:
class Generator(models.Model):
    """
    Définition du générateur

    inputs :
        latent_dim : dimension de l'espace latent, à faire varier en fonction de la variance souhaitée des échantillones
        cond_dim : dimension de la signature qui sera concaténée à l'espace latent pour obtenir le vecteur permettant la génération
    """
    def __init__(self, latent_dim, cond_dim):
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.cond_dim = cond_dim
        self.model = self.build_model()

    def build_model(self):
        model = models.Sequential()
        # Adapter la taille de la condition en entrée
        # model.add(layers.Dense(self.latent_dim, input_dim=self.cond_dim, activation='relu'))
        model.add(layers.Dense(4 * 4 * 256, input_dim=self.latent_dim + self.cond_dim, activation='relu'))
        model.add(layers.Reshape((4, 4, 256)))
        model.add(layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding='same', activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(Dropout(0.25))
        model.add(layers.Conv2DTranspose(64, kernel_size=4, strides=2, padding='same', activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(layers.Conv2DTranspose(32, kernel_size=4, strides=2, padding='same', activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(layers.Conv2DTranspose(16, kernel_size=4, strides=2, padding='same', activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(Dropout(0.25))
        model.add(layers.UpSampling2D())
        model.add(layers.Conv2D(1, kernel_size=2, padding='same', activation='sigmoid'))
        # model.add(layers.Conv2D(1, kernel_size=3, padding='same', activation='sigmoid'))

        return model

    def call(self, inputs):
        latent, cond = inputs
        input_model = layers.Concatenate()([latent, cond])
        return self.model(input_model)


In [None]:
class Discriminator(models.Model):
    def __init__(self, cond_dim):
        """
        Modèle du discriminateur
        :param cond_dim: dimension de la signature (condition) en entrée
        """
        super(Discriminator, self).__init__()
        self.cond_dim = cond_dim
        self.model = self.build_model()

    def build_model(self):
        image_input = layers.Input(shape = (128, 128, 1))
        cond_input = layers.Input(shape = (62,))
        cond_up = layers.Dense(128*128, activation='relu')(cond_input)
        cond_up = layers.Reshape((128,128,1))(cond_up)

        concatenated_input = layers.Concatenate(axis=-1)([image_input, cond_up])

        model = models.Sequential()
        model.add(Conv2DCircularPadding(16, kernel_size=3, strides=1, input_shape=(128, 128, 2)))
        model.add(layers.LeakyReLU(alpha=0.2))
        model.add(Conv2DCircularPadding(32, kernel_size=3, strides=2))
        model.add(layers.LeakyReLU(alpha=0.2))
        model.add(Conv2DCircularPadding(64, kernel_size=7, strides=2))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU(alpha=0.2))
        model.add(Conv2DCircularPadding(128, kernel_size=9, strides=2))
        model.add(layers.LeakyReLU(alpha=0.2))
        model.add(Conv2DCircularPadding(256, kernel_size=3, strides=2))
        model.add(layers.LeakyReLU(alpha=0.2))
        # model.add(Dropout(0.10))
        model.add(layers.Flatten())
        model.add(layers.Dense(1, activation='linear'))
        return models.Model(inputs = [image_input, cond_input], outputs = model(concatenated_input))

    def call(self, inputs):
        return self.model(inputs)


In [None]:


class GAN(models.Model):
    def __init__(self, generator, discriminator, data, latent_dim, batch_size):
        """
        Définition du modèle complet générateur + discriminateur
        :param generator: le générateur
        :param discriminator: le disriminateur
        :param data_path: Le chemin d'accès aux données prétraitées
        """
        super(GAN, self).__init__()
        self.generator = generator
        self.discriminator = discriminator
        self.discriminator_optimizer = RMSprop(learning_rate=0.00005)
        self.generator_optimizer= RMSprop(learning_rate=0.005)
        self.compile_discriminator()
        self.compile_gan()
        self.data = data
        self.latent_dim = latent_dim
        self.cond_dim = 62
        self.image_dim = (128,128)
        self.batch_size = batch_size
        

    def gradient_penalty_loss(self, real_samples, fake_samples,X_cond):
        
        lambda_gp = 10.0
    
        #1. Création des échantillons interpolés
        alpha = tf.random.uniform(shape=[self.batch_size] + [1]*(real_samples.ndim-1), 
                            minval=0.0, maxval=1.0)
        interpolated = alpha * real_samples + (1 - alpha) * fake_samples
    
        # 2. Calcul du gradient
        with tf.GradientTape() as tape:
            tape.watch(interpolated)  
            pred = self.discriminator([interpolated,X_cond], training=True)
    
        gradients = tape.gradient(pred, [interpolated])[0]
    
        # 3. Calcul de la pénalité
        if gradients is not None:
            gradients_sqr = tf.square(gradients)
            gradients_sqr_sum = tf.reduce_sum(gradients_sqr, 
                                       axis=np.arange(1, len(gradients_sqr.shape)))
            gradient_l2_norm = tf.sqrt(gradients_sqr_sum)
            gradient_penalty = tf.reduce_mean((gradient_l2_norm - 1.0)**2)
            print("Gradients l2", gradient_l2_norm)
            return lambda_gp * gradient_penalty
        else:
            print("Gradient nul")
            return 0.0
        
    def compile_discriminator(self):
        self.discriminator.compile(loss=self.gradient_penalty_loss, optimizer=RMSprop(learning_rate=0.00005), metrics=['accuracy'])
        self.discriminator.trainable = False

    def compile_gan(self):
        latent = layers.Input(shape=(self.generator.latent_dim,))
        cond = layers.Input(shape=(self.generator.cond_dim,))
        fake_image = self.generator([latent, cond])
        input_disc = [fake_image, cond]
        validity = self.discriminator(input_disc)

        self.model = models.Model([latent,cond], validity)
        self.model.compile(loss=generator_loss, optimizer=RMSprop(learning_rate=0.005))

    def generate_latent_points(self, latent_dim, n_samples):
        """
        Génère n_samples vecteurs latents
        :param latent_dim: dimension de l'espace latent
        :param n_samples: taille de l'échantillon crée
        :return:
        """
        x_input = np.random.randn(latent_dim * n_samples)
        x_input = x_input.reshape(n_samples, latent_dim)
        return x_input

    def generate_cond(self, cond):
        """
        Mise en forme de la condition avant passage dans le modèle
        :param cond: Liste des conditions du batch correspondant lors de l'entraînement
        :return: les conditions mises en forme
        """
        x_input = pd.concat(cond, axis=1)
        x_input = x_input.T
        x_input = np.array(x_input)
        return x_input

    def generate_real_samples(self, n_samples):
        """
        
        Charge un batch d'images réelles
        :param n_samples: taille du batch
        :return: Batch d'images réelles
        """
        dfs = self.data
        dfs_array = [(df[0].to_numpy(), df[0].to_numpy()) for df in dfs] ### /!\
        # np.random.shuffle(dfs_array)

        sampled_indices = np.random.choice(len(dfs_array), size=n_samples, replace=False)

        # Sélectionner les arrays échantillonnés
        real_samples = [dfs_array[i] for i in sampled_indices]
        real_samples = np.stack(real_samples, axis=0)
        real_samples = np.expand_dims(real_samples, axis=-1)
        labels = -(np.ones((n_samples, 1))) # Création des labels associés aux images réelles

        return real_samples, labels


In [None]:

def plot_training_history(d_losses, g_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(d_losses, label='Discriminator Loss', linestyle='--')
    plt.plot(g_losses, label='Generator Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('WGAN Training History')
    plt.show()

def generate_and_save_outputs(root_folder, gan, latent_dim):
    """
    CHarge des signatures de root_folder puis génère les structures associées selon le gan
    :param root_folder:
    :param gan:
    :param latent_dim:
    :return:
    """
    # Parcourir tous les sous-dossiers
    for folder in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder)
        if os.path.isdir(folder_path):
            condition_file = os.path.join(folder_path, 'craft_s12.parquet')

            # Vérifier si le fichier de condition existe
            if os.path.exists(condition_file):
                # Charger le DataFrame de condition
                condition = pd.read_parquet(condition_file)
                condition = condition.T
                # Générer un vecteur latent aléatoire
                latent_vector = gan.generate_latent_points(latent_dim, 1)

                # Générer une sortie du GAN
                generated_output = gan.generator.predict([latent_vector, condition])

                # Sauvegarder la sortie dans le même dossier
                output_file = os.path.join(folder_path, 'generated_output_red.npy')
                np.save(output_file, generated_output)

In [None]:
def train_wgan_gp(generator, discriminator, gan, data, latent_dim, n_epochs, n_critic, batch_size):
    """
    Fonction d'entraînement du GAN
    :param generator: le générateur
    :param discriminator: le discriminateur
    :param gan: le modèle gan complet
    :param latent_dim: idem plus haut
    :param n_epochs: nombre d'epochs pour l'entraînement
    :param n_critic: nombre de mises à jour des poids du discriminateur pour chaque mise à jour des poids du générateur
    :param batch_size: taille des batchs lors de l'entraînement
    :return:
    """
    d_losses, g_losses = [], [] # stocker les pertes
    current_epoch = 0 # compteur d'epochs

    #gan.discriminator.compile(loss=gan.gradient_penalty_loss, optimizer=RMSprop(learning_rate=0.00005), metrics=['accuracy'])

    for epoch in range(n_epochs):
        # génération des batchs
        batches = generate_batches_cond(data, batch_size)
        current_epoch += 1

        for batch in batches:
            # Update the critic (discriminator) n_critic times
            for _ in range(n_critic):

                y_real = -(np.ones((batch_size, 1))) # Association des bonnes étiquettes (-1) pour les données réelles

                voxel_data = [np.expand_dims(sample[0], axis=-1) for sample in batch]
                signature_data = [sample[1].to_numpy() for sample in batch]
                
                X_real_tab_np = np.array(voxel_data)  # Convertir en numpy array
                X_real_sign_np = np.array(signature_data)
                
                discriminator.trainable = True

                # entraînement du discriminateur sur échantillons réels
                #d_loss_real = discriminator.train_on_batch([X_real_tab_np, X_real_sign_np], y_real)

                # Mesure de la précision sur faux échantillons
                latent = gan.generate_latent_points(latent_dim, batch_size)  # génération des vecteurs latents
                cond = [sample[1] for sample in batch] # extraction des conditions
                cond = gan.generate_cond(cond = cond)  # mise en forme
                X_fake = generator.predict([latent, cond]) # Génération des fausses images
                plt.imshow(X_fake[0], cmap='gray')
                plt.show()
                
                X_fake_tab_np = np.array([np.array(df) for df in X_fake])
                X_fake_sign = np.expand_dims(cond, axis=-1)
                
                #d_loss_fake = discriminator.train_on_batch([X_fake_tab_np, X_fake_sign], np.ones((batch_size, 1)))  # Utiliser +1  comme cible pour les fausses images
                #d_loss = np.mean(np.array(d_loss_fake) - np.array(d_loss_real))
                with tf.GradientTape() as d_tape:
                    pred_real = discriminator([X_real_tab_np, X_real_sign_np], training=True)
                    pred_fake = discriminator([X_fake_tab_np, X_fake_sign], training=True)
                
                    wasserstein_loss=tf.reduce_mean(pred_fake*(-1))+tf.reduce_mean(pred_real*(1))
                    # 2. Ajout du gradient penalty
                    gp_loss = gan.gradient_penalty_loss(X_real_tab_np, X_fake_tab_np,X_real_sign_np)
                    total_loss = wasserstein_loss + gp_loss
                    print("pred fake et pred_real",pred_fake,pred_real)
                d_gradients = d_tape.gradient(total_loss, discriminator.trainable_variables)
                gan.discriminator_optimizer.apply_gradients(zip(d_gradients, discriminator.trainable_variables))
                

            discriminator.trainable = False
            #gan.model.compile(loss=generator_loss, optimizer=RMSprop(learning_rate=0.005))
            # Update the generator
            latent = gan.generate_latent_points(latent_dim, batch_size)
            cond = [sample[1] for sample in batch]
            cond = gan.generate_cond(cond)
            y_gan = np.ones((batch_size, 1))
            with tf.GradientTape() as g_tape:
                X_gen = generator([latent, cond], training=True)
                pred_gen = discriminator([X_gen, cond])
                g_loss = -tf.reduce_mean(pred_gen*(-1))
            
            g_gradients = g_tape.gradient(g_loss, generator.trainable_variables)
            gan.generator_optimizer.apply_gradients(zip(g_gradients, generator.trainable_variables))

            # Record losses for visualization
            d_losses.append(total_loss)
            g_losses.append(g_loss)

        # Sélectionner aléatoirement deux indices de conditions
        random_indices = np.random.choice(cond.shape[0], 2, replace=False)
        conditions_selected = cond[random_indices]

        # Générer des points latents
        latent_points = gan.generate_latent_points(latent_dim, 2)  # Pour deux échantillons

        # Générer des images avec le générateur
        generated_images = gan.generator.predict([latent_points, conditions_selected])

        # Afficher les images
        for i in range(2):
            plt.imshow(generated_images[i], cmap='gray')
            plt.show()

        # Display results 
        plot_training_history(d_losses, g_losses)
        print(f"Epoch {current_epoch}, [D loss: {d_losses[-1]}], [G loss: {g_loss}]")
        generator_weights = [layer.get_weights()[0].flatten() for layer in generator.layers if len(layer.get_weights()) > 0]
        discriminator_weights = [layer.get_weights()[0].flatten() for layer in discriminator.layers if len(layer.get_weights()) > 0]
        
        plt.figure(figsize=(10,5))
        for weights in generator_weights :
            plt.hist(weights, bins = 50, alpha = 0.5)
        plt.title('Histogramme des poids du générateur')
        plt.show()
        
        plt.figure(figsize=(10,5))
        for weights in discriminator_weights :
            plt.hist(weights, bins = 50, alpha = 0.5)
        plt.title('Histogramme des poids du discriminateur')
        plt.show()

In [1]:
# Définir le nombre d'époques et la taille du lot
latent_dim = 38
n_epochs = 200
batch_size = 64
n_critic = 1
import shutil
data_path = '/Users/Bader/Desktop/Mines 2A/Projet 2A/CODES/donnees/Database/results'  # Définir le chemin d'accès aux données
data = load_parquet_files_cond_red(data_path, test = False)
len(data)

NameError: name 'load_parquet_files_cond_red' is not defined

In [None]:
# Créer les instances des classes

generator = Generator(latent_dim, 62)
# generator.summary()
discriminator = Discriminator(62)
# discriminator.summary()
gan = GAN(generator, discriminator, data, latent_dim, batch_size)

generator.summary()
discriminator.summary()

In [None]:
generator_weights = [layer.get_weights()[0].flatten() for layer in generator.layers if len(layer.get_weights()) > 0]
discriminator_weights = [layer.get_weights()[0].flatten() for layer in discriminator.layers if len(layer.get_weights()) > 0]

plt.figure(figsize=(10,5))
for weights in generator_weights :
    plt.hist(weights, bins = 50, alpha = 0.5)
plt.title('Histogramme des poids du générateur')
plt.show()

plt.figure(figsize=(10,5))
for weights in discriminator_weights :
    plt.hist(weights, bins = 50, alpha = 0.5)
plt.title('Histogramme des poids du discriminateur')
plt.show()

# Entraîner le GAN
train_wgan_gp(generator, discriminator, gan, data, latent_dim, n_epochs, n_critic, batch_size)

In [None]:
# Sauvegarder le générateur et le discriminateur après l'entraînement
generator.model.save('C:/Users/Bader/Desktop/Mines 2A/Projet 2A/Ines/model2/generateur_red_gp2.keras')
discriminator.model.save('C:/Users/Bader/Desktop/Mines 2A/Projet 2A/Ines/model2/discriminateur_red_gp2.keras')