# Projet Informatique - Comics to Music

## GANs non-conditionnel

Dans le code, en voici les différents éléments :
 - définition du générateur
 - définition du discriminateur
 - entraînement du modèle

In [246]:
import torch
from torch import nn, optim
import torchvision
import torchvision.transforms as transforms
import math
import matplotlib.pyplot as plt
import os

In [247]:
# IMPORT GPU sur google colab

import torch

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA disponible: {torch.cuda.is_available()}")

if torch.cuda.is_available():
    print(f"Nom du GPU: {torch.cuda.get_device_name(0)}") # Devrait afficher 'Tesla T4'
    device = torch.device('cuda')
else:
    print("❌ GPU toujours pas détecté.")
    device = torch.device('cpu')

print(f"Device final: {device}")

PyTorch version: 2.8.0+cu126
CUDA disponible: True
Nom du GPU: Tesla P100-PCIE-16GB
Device final: cuda


### Générateur (sans conditionnement)

In [248]:
class NonConditionnalGenerator(nn.Module):
    def __init__(self, z_dim):
        super().__init__()

        '''On commence par définir les éléments structurels de notre générateur.
        Dans notre cas, on suppose d'abord qu'on veut générer vers un espace latent
        (génération conditionnée par la suite par un autre espace latent, plus des
        conditions supplémentaires).
        Le générateur va augmenter la dimension des données en opérant une déconvolution (Upsample, 2e article cGANs)
        Ici, on part d'un vecteur bruit z et on utilise une première couche linéaire pour
        le mettre sous format-image en 2D. Dans cet exemple, on veut obtenir des images 28x28, donc on transforme z
        en images 7x7, ce qui permettra facilement d'atteindre la dimension finale en réalisant deux Upsample de scale = 2
        '''

        self.z_dim = z_dim # dimension du vecteur bruit
        self.init_dim = 7 # dans cet exemple, on veut partir d'une image 7x7
        self.num_ch = 128 # nombre de canaux arbitraire (dépend de l'espace latent, peut être des autres conditions)
        self.l1 = nn.Sequential( # première couche linéaire
            nn.Linear(z_dim, self.num_ch * self.init_dim * self.init_dim) # on change la dimension du bruit pour pouvoir le
                                                                          #reshape en [num_ch] images de dimensions init_dim x init_dim
        )

        # Réseau convolutionnel pour générer une image 28x28  : G_non_conditionnel : bruit --> espace latent audio
        self.model = nn.Sequential(
            nn.BatchNorm2d(self.num_ch),
            nn.Upsample(scale_factor=2), # passer de 7x7 à 14x14
            nn.Conv2d(self.num_ch, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2), # on passe à 28x28
            nn.Conv2d(256, 1, kernel_size=3, stride=1, padding=1),
            nn.Tanh()
        )
        ######################

    def forward(self, x):
        # conversion du vecteur bruit
        output1 = self.l1(x)
        # reshape en image
        output2 = output1.view(output1.shape[0], self.num_ch, self.init_dim, self.init_dim)
        # CNN
        output = self.model(output2)
        return output

### Discriminateur (sans conditionnement)

In [249]:
class NonConditionnalDiscriminator(nn.Module):
    def __init__(self, num_ch):
        super().__init__()

        '''Le discriminateur va être entraîner pour différencier les données réelles (espace latent audio)
        des données générées (G(z), qui dans la version cGAN seront conditionnées par l'espace latent BD).
        En pratique, il ne prend ne prend en entrée que des vrais ou que des faux à la fois.
        Fonctionnement de l'entraînement :
         - on génère les prédictions D(x_real) --> objectif D(x_real) = 1
         - on calcule la loss sur les prédictions réelles : loss_real = loss(D(x_real), 1)
         - on génère les prédictions D(x_fake) --> objectif D(x_fake) = 1
         - on calcule la loss sur les prédictions réelles : loss_fake = loss(D(x_fake), 1)
        Le discriminateur est donc entraîné sur loss_tot = (loss_real + loss_fake)

        Architecture : réseau convolutionnel classique, exemple dans le deuxième article sur les cGANs'''

        self.num_ch = num_ch # nombre de canaux, dépend de l'espace latent audio

        #  modèle de convolution classique, on cherche à densifier l'information des images à chaque couche
        self.model = nn.Sequential(
            nn.Conv2d(num_ch, 64, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(128), nn.LeakyReLU(0.2),
            nn.Conv2d(128, 512, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(512), nn.LeakyReLU(0.2),

            # A la fin, on réduit brusquement le nombre de canaux à 1, pour obtenir une prédiction scalire {0,1}.
            # Dans la version cGAN, c'est ici que l'on introduit la condition y !!!
            nn.Conv2d(512, 1, kernel_size=4, stride=2, padding=0),
            nn.Sigmoid()
        )

    def forward(self, x):
        # prend un batch d'images d'entrée, sort un batch de 1 et de 0
        output = self.model(x).view(-1,1)
        return output

### Entraînement (sans conditionnement)

In [250]:
# # Sauvegarde et récupération de G et D
# if os.path.isfile('discriminator2.pt') and os.path.isfile('generator2.pt'):
#     nc_discriminator.load_state_dict(torch.load('./discriminator2.pt'))
#     nc_generator.load_state_dict(torch.load('./generator2.pt'))

# else:
#     for epoch in range(num_epochs):

#         for n, (real_samples, _) in enumerate(train_loader):

#             ### Entraînement nc_discriminator
#             # On récupère les données réelles (reshape pour être sûr qu'elles soient acceptés par nc_D)
#             real_samples = real_samples.view(-1, 1, 28, 28)

#             # on génère les fake_samples à partir d'un bruit gaussien (pas de condition ici)
#             z = torch.randn(batch_size, z_dim)
#             fake_samples = torch.tanh(nc_generator(z))

#             # Initialisation de la descente de gradient
#             optimizer_nc_discriminator.zero_grad()

#             # Loss sur les prédictions "données réelles"
#             predict_real = nc_discriminator(real_samples)
#             loss_real = loss_function(predict_real, torch.full((batch_size, 1), 1.0))

#             # Loss sur les prédictions "données générées"
#             predict_fake = nc_discriminator(fake_samples.detach())
#             loss_fake = loss_function(predict_fake, torch.full((batch_size, 1), 0.0))

#             # Loss complète
#             loss_discriminator = (loss_real + loss_fake) / 2

#             # Descente de gradient et mise à jour des poids w_D
#             loss_discriminator.backward()
#             optimizer_nc_discriminator.step()

#             ### Entraînement nc_generator
#             # Initialisation descente de gradient
#             optimizer_nc_generator.zero_grad()

#             # génération des fake_samples
#             z = torch.randn(batch_size, 100)
#             fake_samples_new = nc_generator(z)

#             # récupération de la prédiction de D
#             gen_prediction = nc_discriminator(fake_samples_new)

#             # loss classique entre la prédiction D(fake_samples) et l'objectid vecteur de 1
#             valid_labels = torch.full((batch_size, 1), 1.0)
#             loss_generator = loss_function(gen_prediction, valid_labels)

#             # OPTIONNEL --> diversity loss
#             # on ajoute une diversity loss pour éviter de générer que des points identiques
#             # sinon le générateur peut mapper à chaque fois vers une seule image qui à l'air vraie
#             distances = torch.cdist(fake_samples_new, fake_samples_new, p=2) # renvoie la distance scalaire entre tous les éléments générés du batch
#             mean_distance = torch.mean(distances)
#             lambda_div = 0.2 # poids sur la loss
#             loss_diversity = -lambda_div * mean_distance # on veut maximiser la distance donc on met un - devant

#             # loss totale
#             loss_generator = loss_generator + loss_diversity

#             loss_generator.backward()
#             optimizer_nc_generator.step()

#             # Show loss
#             if n == batch_size - 2:
#                 print(f"Epoch: {epoch} Loss D.: {loss_discriminator}")
#                 print(f"Epoch: {epoch} Loss G.: {loss_generator}")

## GANs conditionnels

Pas si différents des GANs non-conditionnels, il faut juste rajouter une condition $y$ à deux étapes du modèle.

In [251]:
class ConditionnalGenerator(nn.Module):
    def __init__(self, z_dim, y_dim):
        super().__init__()

        self.z_dim = z_dim # dimension du vecteur bruit
        self.y_dim = y_dim # dimension du vecteur condition
        self.init_dim = 7 # dans cet exemple, on veut partir d"'une image 7x7
        self.num_ch = 128 # nombre de canaux arbitraire (dépend de l'espace latent, peut être des autres conditions)

        self.l1 = nn.Sequential( # première couche linéaire
            nn.Linear(self.z_dim + self.y_dim, self.num_ch * self.init_dim * self.init_dim) # cette fois, on part de
                                                                                            # z_dim + y_dim
        )

        # Réseau convolutionnel identique au vanilla GAN
        self.model = nn.Sequential(
            nn.BatchNorm2d(self.num_ch),
            nn.Upsample(scale_factor=2), # passer de 7x7 à 14x14
            nn.Conv2d(self.num_ch, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2), # on passe à 28x28
            nn.Conv2d(256, 1, kernel_size=3, stride=1, padding=1),
            nn.Tanh()
        )
        ######################

    def forward(self, x):
        # Ici, x est la concaténation du bruit z et de la condition y
        # conversion du vecteur x
        output1 = self.l1(x)
        # reshape en image
        output2 = output1.view(output1.shape[0], self.num_ch, self.init_dim, self.init_dim)
        # CNN
        output = self.model(output2)
        return output

In [252]:
class ConditionnalDiscriminator(nn.Module):
    def __init__(self, num_ch, latent_x_dim, y_cond_dim):
        super().__init__()

        self.num_ch = num_ch # nombre de canaux, dépend de l'espace latent audio
        self.y_cond_dim = y_cond_dim
        self.stride_size = 2 # valeur du stride pour connaître la dimension lors du reshape

        #  modèle de convolution classique, on cherche à densifier l'information des images à chaque couche
        self.model = nn.Sequential(
            nn.Conv2d(num_ch, 64, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(128), nn.LeakyReLU(0.2),
            nn.Conv2d(128, 512, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(512), nn.LeakyReLU(0.2),
        )

        # A cette étape, il faut applatir les données et y concaténer la condition y
        # calcul de la dimension du reshape :
        self.image_dim = int(latent_x_dim / (self.stride_size ** 3)) # on a appliqué 3 strides de 2
        self.flat_dim = 512 * self.image_dim * self.image_dim

        self.classifier = nn.Sequential(
            nn.Linear(self.flat_dim + self.y_cond_dim, 1), # ce n'est plus une convolution car vecteur plat
            nn.Sigmoid()
        )


    def forward(self, x, y):
        # on rentre x et y car on commence par traiter seulement x avant d'injecter y
        features = self.model(x)

        # Aplatissement vers la flat dimension
        features_flat = features.view(features.size(0), -1)

        # Concaténation avec le vecteur condition y
        combined = torch.cat([features_flat, y], dim=1)

        # Prédiction finale
        output = self.classifier(combined)
        return output


## GANs Cycle-Consistent

Dans notre cas, on veut que le GAN apprennent à générer des données dans l'espace latent $Y$ (audio) en étant influencé par l'espace latent $X$ (BDs). Pour cela, deux solutions : entraîner notre GAN avec des données réelles pairées, en conservant les paires dans les espaces latents $X$ et $Y$. Problème : auncun dataset existant possédant des paires d'audio et de BDs. On va donc chercher à entraîner notre GAN pour qu'il puisse associer des échantillons $x\in \{x_i\}$ à des échantillons $y\in\{y_i\}$, de manière non supervisée, en détectant lui-même les caractéristiques pour associer une paire. On s'inspire de l'article *Unpaired Image-to-Image Translation
using Cycle-Consistent Adversarial Networks*, qui introduisent les *Cycle-Consistent GANs*. Ils utilisent ce réseau pour passer d'images à images. Dans notre cas, les espaces $X$ et $Y$ ne sont pas aussi similaires, il va peut être falloir injecter des conditions supplémentaires (structure, rythme, couleur, texte, sens --> CLIP), pour aider notre GAN à générer des paires qui ont du sens.

En pratique, on a besoin d'entraîner deux générateurs
$$
G : X \longrightarrow Y \\
F : Y \longrightarrow X
$$

Chacun associé à un discrimnateur, qui va déterminer si la sortie du générateur correspond à l'espace réel cible. Là génération est donc entraînée par une adversarial loss. On s'inspire des travaux de *Improved Training of Wasserstein GANs* qui ont introduit les WGANs, utilisant une distance de Wasserstein comme adversarial loss.

$$
\begin{align}
\mathcal{L}_{WGAN} (G, D_Y, X, Y) =\: &\mathbb{E}_{y\sim p_{data}(y)}[D_Y(y)]\\ - \: &\mathbb{E}_{x\sim p_{data}(x)}[D_Y(G(x))]
\end{align}
$$

Contrairement à un GAN classique, on ne cherchera à prédire des 1 ou des 0. On va ici chercher à maximiser les écarts entre les score vrais ou faux. Nos discriminateurs sortent donc des vecteurs de score, et non pas des labels de prédiction. On en tire une loss pour les discriminateurs, et une pour les générateurs :
$$
\begin{align}
\mathcal{L}_D &= \mathbb{E}[D(G(x))] - \mathbb{E}[D(y)] + \lambda_{GP}\mathcal{L}_{GP}\\
\mathcal{L}_G &= -\mathbb{E}[D(G(x))]
\end{align}
$$

où $\mathcal{L}_{GP}$ est la $gradient \: penalty$ introduite dans l'article WGANs (ils utilisent $\lambda_{GP}=10$). Cette pénalité vise à empêcher la sortie du discriminateur d'avoir une norme différente de 1. Ils l'expriment comme :
$$
\mathcal{L}_{GP} = \mathbb{E}[(||\nabla_{\hat{y} = G(x)}D(G(x))||_2 -1)^2]
$$
Un aspect important de l'article est qu'il stipule que le discriminateur doit être entraîné plus souvent que le générateur. Environ 5 steps pour 1.


L'architecture globale *Cycle-GANs* nécessite d'ajouter une *cycle-loss*, qui détermine si les données sont bien reconstruites lors de l'application successive de $G$ et $F$ :
$$
\begin{align}
\mathcal{L}_{cyc} =\: &\mathbb{E}_{x\sim p_{data}(x)}[||F(G(x)) - x||]\\ + \:&\mathbb{E}_{y\sim p_{data}(y)}[||G(F(y)) - y||]
\end{align}
$$


Pour l'architecture, l'article *Cycle-GANs* s'inspire de Johnson et al. *Perceptual losses
for real-time style transfer and super-resolution*. On y ajoute la notion développée dans l'article *LOGAN : Unpaired Shape Transform in Latent Overcomplete Space*, qui présente l'avantage de travailler avec des espaces *overcomplete* (réseaux dans lesquels la dimension a été fortement augmentée). Cela facilite le transport de masse optimale entre les deux distributions (--> coût égal *masse* x *distance(X,Y)*). Cet ajout agit avec la Wassertein-Loss pour empêcher le *mode-collapse* :

1. Générateurs :
 - $\longrightarrow$ donc pour ces GANs là, on n'augmente pas forcément la dimension comme vu dans les cGANs (on peut quand même modifier si jamais nos espaces $X$ et $Y$ n'ont pas les mêmes dimensions). Nos espaces latents auront peut être des dimensions différentes (par exemple : peut être besoin de plus grandes dimensions dans l'espace latent audio, si on utilise plein de conditions supplémentaires dans l'espace BD).

 2. Discriminateurs :
 - $N\times N$ Patch-GANs : faits pour discriminer des grosses images (1080p par exemple). Ils ne discriminent que sur des portions de $N$ par $N$. Peut être pas nécessaire pour nous si on travaille sur des espaces latents de faibles dimensions. Mais peux justement permettre de travailler sur des espaces latents plus complexes sans trop augmenter la charge de calcul.

### Générateur $G$ : $X$(BDs) $\longrightarrow$ $Y$(audios)

$G$ part de $X$ l'espace latent BDs pour générer $Y$ l'espace latent audio. Comment on utilisera des conditions supplémentaires pour influencer la génération, on peut se permettre de ne garder que peut d'information dans $X$. Ainsi, on choisit de générer $X$ (partie VAE) comme un vecteur 1D plutôt que comme une image. On ne garde ainsi qu'une idée du style et on oublie les informations de structure spatiale qui seront utilisées plus efficacement dans les autres conditions.

In [253]:
class Generator_G(nn.Module):
    def __init__(self, z_dim, latent_x_dim, init_dim, ngf=256):
        '''
        Inputs :
         - z_dim : dimension du vecteur bruit seed du générateur
        Dimensions des espaces latents:
         - latent_x_dim : dimension de l'espace latent X d'où est tirée la condition du générateur G.
            On peut générer l'espace latent BDs comme un vecteur, car on ne veut l'utiliser que pour avoir une
            notion du style
         - pas besoin de prendre latent_y_dim pour connaître la dimension de l'objectif si l'architecture
         est adaptée en fonction de la dimension de départ init_dim (voir juste en dessous)
        '''
        super().__init__()
        self.z_dim = z_dim
        self.x_dim = latent_x_dim # dimension du vecteur condition (espace BDs X)
        self.init_dim  = init_dim # dimension (H ou W)
        self.main_ch = ngf 


        
        self.l1 = nn.Sequential(
            nn.Linear(self.z_dim + self.x_dim, self.main_ch * self.init_dim * self.init_dim)
        )

        # le réseau est structuré en blocs séquentiel pour permettre l'injection de la condition x entre les convolutions
        self.bloc1 = nn.Sequential(
            nn.BatchNorm2d(self.main_ch + self.x_dim),
            nn.Upsample(scale_factor=2),
            # injection de x
            nn.Conv2d(self.main_ch + self.x_dim, self.main_ch, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(self.main_ch, 0.8),
            nn.LeakyReLU(0.2, inplace=True)
        )

        self.bloc2 = nn.Sequential(
            # injection de x
            nn.Conv2d(self.main_ch + self.x_dim, self.main_ch // 2, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(self.main_ch // 2, 0.8),
            nn.LeakyReLU(0.2, inplace=True)
        )

        self.bloc3 = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.Conv2d(self.main_ch // 2, 1, kernel_size=3, stride=1, padding=1),
            nn.Tanh()
        )


    def forward(self, x, z):
        # Concaténation sur la dimension 1
        cat_xz = torch.cat([x, z], dim=1)
        out = self.l1(cat_xz)
        out = out.view(out.shape[0], self.main_ch, self.init_dim, self.init_dim)

        # première inejction
        x_img = x.unsqueeze(2).unsqueeze(3).expand(-1, -1, out.size(2), out.size(3))
        out = torch.cat([out, x_img], dim=1) # concaténation
        out = self.bloc1(out)

        # deuxième injection
        x_img = x.unsqueeze(2).unsqueeze(3).expand(-1, -1, out.size(2), out.size(3))
        out = torch.cat([out, x_img], dim=1) # concaténation
        out = self.bloc2(out)

        output = self.bloc3(out)

        return output

### Générateur $F$ : $Y$(audios) $\longrightarrow$ $X$(BDs)

Contrairement à $G$, qui développait les données, $F$ a pour but de compresser l'espace $Y$ vers $X$. On veut que les deux générateurs forment un équilibre lors de l'apprentissage. On a également les même dimensions que dans $G$, mais inversées. Il es dont logique que l'architecture de $F$ soit symétrique à celle de $G$.

In [254]:
class Generator_F(nn.Module):
    def __init__(self, z_dim, latent_x_dim, x_ch, y_ch, init_dim, nff=256):
        '''
        Inputs :
         - latent_x_dim : dimension du vecteur de sortie (espace X)
         - latent_y_dim : nombre de canaux de l'espace latent d'entrée Y
         - z_dim        : dimension du bruit gaussien ajouté
         - init_dim     : dimension spatiale de référence
        '''
        super().__init__()
        self.x_dim = latent_x_dim
        self.y_ch = y_ch
        self.z_dim = z_dim
        self.init_dim = init_dim
        self.internal_ch = x_ch
        self.nff = nff

        # Le réseau prend maintenant en entrée canaux de Y + canaux de Z
        input_channels = self.y_ch + self.z_dim
        self.model = nn.Sequential(

            nn.Conv2d(input_channels, self.nff, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2, inplace=True),

            # equivalent à un Downsample
            nn.AvgPool2d(kernel_size=2, stride=2),

            nn.Conv2d(self.nff, self.nff//4, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(self.nff//4, 0.8),
            nn.LeakyReLU(0.2, inplace=True),

            nn.AvgPool2d(kernel_size=2, stride=2),

            nn.Conv2d(self.nff//4, self.internal_ch, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(self.internal_ch),
            nn.LeakyReLU(0.2, inplace=True),
        )

        self.flat_dim = self.internal_ch * self.init_dim * self.init_dim

        self.final_head = nn.Sequential(
            nn.Linear(self.flat_dim, self.x_dim),
            nn.Tanh()
        )

    def forward(self, input_y, z):

        # comme on ajoute z à une matrice 2D, il faut l'étendre (comme la condition X dans DY)
        batch_size, _, h, w = input_y.size()
        z_expanded = z.view(batch_size, -1, 1, 1).expand(batch_size, -1, h, w)
        combined_input = torch.cat([input_y, z_expanded], dim=1)

        features = self.model(combined_input)
        features_flat = features.view(features.shape[0], -1)
        output_x = self.final_head(features_flat)

        return output_x

### Discriminateur $D_Y$

$D_Y$ vérifie que $G(x) = \hat{y} \approx y$.

**Au final les discriminateurs ne prennent pas de conditions. $\\$ EXPLICATION :  la loss des discriminateurs est calculée d'abord sur des prédictions à partir d'entrées totalement vraies, puis sur des prédictions à partir d'entrées totalement fausses (ou inversement) le conditionnement fonctionne pour la prédiction à partir de données fausses, puisque l'on veut que  les données générées depuis l'autre espace soit notées en fonction de l'autre espace. Cependant, pour les prédictions à partir de données réelles, comme les données ne sont pas pairées, la conditions ne veut rien dire pour le discriminateur. Il ne veut donc même pas reconnaître ça comme une donnée réelle lors du calcul de la loss (à clarifier).**

**Du coup, *Comment assurer que le modèle découvre une cohérence entre X et Y ?* On s'appuie pour cela sur la Cycle-loss, qui permet de retrouver les données d'un espace en appliquant les deux générations successives. Cette Loss oblige le modèle à lier les échantillons $x$ et $y$ qui lui permettront de reproduire le plus fidèlement les espaces à partir des générations (à clarifier)**.

Le discriminateur va donc prendre en entrée des données 2D, on peut donc utiliser une structure patchGAN $N\times N$, qui permettra de réduire la quantité de calculs de la discrimination, et d'avoir un espace latent $Y$ plus complexe.

Un discriminateur de type PatchGAN ne sort pas un scalaire de prédiction mais une grille de prédiction pour chaque patche de données. La couche linéaire à la fin du discriminateur conditionnel vu juste avant doit donc nécessairement disparaître car on veut une sortie 2D pour le discriminateur.

In [255]:
class Discriminator_DY(nn.Module):
    def __init__(self, y_ch, ndf=64):
        """
        y_ch : Canaux de l'image Y
        ndf  : Nombre de filtres de base
        """
        super().__init__()

        self.model = nn.Sequential(
            # Layer 1 : Y -> Features
            nn.Conv2d(y_ch, ndf, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf, ndf * 2, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf * 2, ndf * 4, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),

            # Layer 4 : Sortie PatchGAN (Grille 2D)
            nn.Conv2d(ndf * 4, 1, kernel_size=4, stride=1, padding=1)
        )

    def forward(self, y_input):
        # Plus besoin de x_condition
        return self.model(y_input)

### Discriminateur $D_X$

Ce discriminateur agit sur l'espace $X$. Les données sont donc 1D, et plus simples que pour $D_Y$, on n'a pas besoin d'utiliser un discriminateur de type PatchGAN. On utilise un discriminateur conditionnée classique.

In [256]:
import torch
import torch.nn as nn

class Discriminator_DX(nn.Module):
    def __init__(self, x_dim, hidden_dim=512):
        """
        x_dim      : Dimension du vecteur latent X (ex: 3)
        hidden_dim : Taille de la couche cachée (Largeur du réseau)
        """
        super().__init__()

        self.model = nn.Sequential(

            nn.Linear(x_dim, hidden_dim),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(hidden_dim // 2, hidden_dim // 4),
            nn.LeakyReLU(0.2, inplace=True),

            # Sortie : Score scalaire (pour WGAN)
            nn.Linear(hidden_dim // 4, 1)
            # Pas de Sigmoid ici pour un WGAN
        )

    def forward(self, x):
        # x doit être de forme (Batch, x_dim)

        # Sécurité : on s'assure que l'entrée est bien aplatie
        x_flat = x.view(x.size(0), -1)

        output = self.model(x_flat)
        return output

### Fonctions de pertes

In [257]:
def CycleLoss(real_samples, rec_samples):
    loss_func = nn.L1Loss()
    return loss_func(real_samples, rec_samples)

def GradientPenalty(D, real_samples, fake_samples, device=device):
    """Calcul du Gradient Penalty pour WGAN-GP"""

    # On adapte les dimensions de alpha pour qu'elles collent aux données (1D ou 2D)
    alpha = torch.rand(real_samples.size(0), 1, 1, 1, device=device) if len(real_samples.shape) == 4 else torch.rand(real_samples.size(0), 1, device=device)

    # si c'est un vecteur 1D (X), alpha doit être (Batch, 1)
    # si c'est un tensuer 2D (Y), alpha doit être (Batch, 1, 1, 1) pour le broadcast
    if len(real_samples.shape) == 2:
         alpha = alpha.view(real_samples.size(0), 1)

    # Interpolation
    interpolates = (alpha * real_samples + ((1 - alpha) * fake_samples)).requires_grad_(True)

    # Passage dans le discriminateur
    d_interpolates = D(interpolates)

    # Calcul du gradient
    # On crée un vecteur de 1 --> cible de notre gradient
    fake = torch.ones(d_interpolates.size(), device=device, requires_grad=False)

    gradients = torch.autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=fake,
        create_graph=True,
        retain_graph=True,
        only_inputs=True,
    )[0]

    # Calcul de la pénalité
    gradients = gradients.view(gradients.size(0), -1) # on aplatit pour le calcul de norme
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()

    return gradient_penalty

def DiscriminatorLoss(score_real, score_fake):
    return torch.mean(score_fake) - torch.mean(score_real)

def GeneratorLoss(predict_fake):
    return -torch.mean(predict_fake)

def DiversityLoss(fake_1, fake_2, z1, z2):
    diff_img = torch.mean(torch.abs(fake_1 - fake_2))
    diff_z = torch.mean(torch.abs(z1 - z2))

    return - diff_img / (diff_z + 1e-8)



### Entraînement du modèle

Pour entraîner nos deux générateurset nos discriminateurs, il faut successivement générerles données, et les prédictions et calculer les loss de chaque sous-structure à chaque itération.

### Dataset fait maison

Pour vérifier l'implémentation correcte des réseaux dans le cycle-GAN, on génère notre propres dataset. Pour respecter la relation entre les espaces $\mathcal{X}$ et $\mathcal{Y}$, le premier contiendra des vecteurs $x = [centre\_x,\:centre\_y, rayon]$ et le deuxième contiendra une image d'un disque correspondant. L'objectif du cycle-GANS sera de lier les vecteurs $x$ correspondant aux images $y$.

In [258]:
import torch
from torch.utils.data import Dataset

class SyntheticCircleDataset(Dataset):
    def __init__(self, size=1000, img_dim=64):
        self.size = size
        self.img_dim = img_dim

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        # Espace X : 3 paramètres aléatoires (x, y, rayon)
        # Normalisés entre 0 et 1 (ou -1 et 1 selon tes besoins)
        params = torch.rand(3)

        # Espace Y : Génération de l'image correspondante (Simplifié)
        # (Dans un vrai cas non-pairé, tu mélangerais les indices)
        img = torch.zeros((1, self.img_dim, self.img_dim))

        # Logique de dessin simple (juste pour l'exemple)
        cx, cy = int(params[0]*self.img_dim), int(params[1]*self.img_dim)
        r = int(params[2] * (self.img_dim/4))

        y_grid, x_grid = torch.meshgrid(torch.arange(self.img_dim), torch.arange(self.img_dim), indexing='ij')
        mask = ((x_grid - cx)**2 + (y_grid - cy)**2) <= r**2
        img[0][mask] = 1.0

        return params, img



In [259]:
from torch.utils.data import DataLoader

# 1. Instanciation du Dataset
# On génère assez de données (ex: 5000 exemples)
dataset = SyntheticCircleDataset(size=32*100, img_dim=32)

# 2. Création des DataLoaders indépendants
# On crée deux loaders distincts avec shuffle=True.
# Comme ils sont mélangés différemment, le vecteur X du premier loader
# ne correspondra pas à l'image Y du deuxième loader à l'instant t.
# C'est ce qui simule le "Non-Pairé".

batch_size = 32

train_loader_X = DataLoader(dataset, batch_size=batch_size, shuffle=True)
train_loader_Y = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Création de l'itérateur pour Y (au cas où Y est plus petit que X)
iter_Y = iter(train_loader_Y)

In [260]:
# Paramètres réseaux
z_dim = 50 # arbitraire
x_dim = 3 # dimension du vecteur x
x_ch = 512
y_dim = 32 # arbitraire, tenseur 2D 64x64
y_ch = 1 # noir et blanc
init_dim = y_dim // (2**2)


G = Generator_G(z_dim, x_dim, init_dim, ngf=1024).to(device=device)
F = Generator_F(z_dim, x_dim, x_ch, y_ch, init_dim, nff=1024).to(device=device)
DY = Discriminator_DY(y_ch, ndf=256).to(device=device)
DX = Discriminator_DX(x_dim).to(device=device)

# Paramètres de l'entraînement

lr = 0.0001
num_epochs = 100
lambda_cycle = 200 # poids de la cycle_loss
lambda_gp = 10 # poids de la gradient penalty
lambda_div = 2 # poids pour le cout de diversité
n_critic = 3 # ration entraînement discriminateurs/générateurs
activ_div = num_epochs // 2

b1 = 0.0  # paramètres ADAM recommandés par le papier
b2 = 0.9

optimizer_G = torch.optim.Adam(G.parameters(), lr=lr, betas=(b1,b2))
optimizer_F = torch.optim.Adam(F.parameters(), lr=lr, betas=(b1,b2))
optimizer_DY = torch.optim.Adam(DY.parameters(), lr=lr, betas=(b1,b2))
optimizer_DX = torch.optim.Adam(DX.parameters(), lr=lr, betas=(b1,b2))


In [None]:
for epoch in range(num_epochs):
    for i, (real_x, _) in enumerate(train_loader_X):

        try:
            _, real_y = next(iter_Y)
        except StopIteration:
            # Si on a fini le dataset Y, on le recharge
            iter_Y = iter(train_loader_Y)
            _, real_y = next(iter_Y)

        real_x = real_x.to(device)
        real_y = real_y.to(device)

        # normalisation etre -1 et 1
        real_x = real_x.to(device)
        real_x = (real_x * 2) - 1
        real_y = real_y.to(device)
        real_y = (real_y * 2) - 1


        # activation de la diversity_loss
        if epoch >= activ_div:
            poids_div = lambda_div
        else:
            poids_div = 0

        # on s'assure qu'ils aient les bonnes dimensions
        # real_x = ...
        # real_y = ...

        with torch.no_grad(): # on ne veut que les gradients des générateurs soient calculés pour l'instant
            z_x = torch.randn(batch_size, z_dim).to(device)
            z_y = torch.randn(batch_size, z_dim).to(device)
            fake_x = F(real_y, z_y)
            fake_y = G(real_x, z_x)
        # print(real_x.size())
        # print(fake_x.size())

        ### ENTRAINEMENT DISCRIMINATEURS
        optimizer_DY.zero_grad()
        optimizer_DX.zero_grad()

        # Discriminateur DY
        score_real_Y = DY(real_y) # les discriminateurs sortent des scores, pas des prédictions
        score_fake_Y = DY(fake_y.detach()) # detach() --> on ne veut pas que le gradient remonte au générateur
        # Loss WGAN : E[D(fake)] - E[D(real)] + lambda * GP
        loss_DY = DiscriminatorLoss(score_real_Y, score_fake_Y) + lambda_gp * GradientPenalty(DY, real_y,fake_y.detach())
        loss_DY.backward()
        optimizer_DY.step()

        # Discriminateur DX
        score_real_X = DX(real_x) # les discriminateurs sortent des scores, pas des prédictions
        score_fake_X = DX(fake_x.detach()) # detach() --> on ne veut pas que le gradient remonte au générateur
        # Loss WGAN : E[D(fake)] - E[D(real)] + lambda * GP
        loss_DX = DiscriminatorLoss(score_real_X, score_fake_X) + lambda_gp * GradientPenalty(DX, real_x,fake_x.detach())
        loss_DX.backward()
        optimizer_DX.step()

        ### ENTRAINEMENT GENERATEURS
        if i % n_critic == 0:
            optimizer_G.zero_grad()
            optimizer_F.zero_grad()


            z_x = torch.randn(batch_size, z_dim).to(device)
            z_x2 = torch.randn_like(z_x).to(device)
            fake_x = F(real_y, z_y)
            z_y = torch.randn(batch_size, z_dim).to(device)
            z_y2 = torch.randn_like(z_y).to(device)
            fake_y = G(real_x, z_x)

            # Calcul des GAN-Loss --> tromper le discriminateur
            # Le générateur veut maximiser D(fake), donc minimiser -D(fake)
            predict_fake_Y = DY(fake_y)
            Loss_GAN_G = GeneratorLoss(predict_fake_Y)
            predict_fake_X = DX(fake_x)
            Loss_GAN_F = GeneratorLoss(predict_fake_X)

            # Calcul de Cycle-Loss --> pour revenir à l'espace de départ
            reconstructed_x = F(fake_y, z_y)
            Loss_cycle_X = CycleLoss(real_x, reconstructed_x)
            reconstructed_y = G(fake_x, z_x)
            Loss_cycle_Y = CycleLoss(real_y, reconstructed_y)

            # Calcul de Diversity-Loss
            fake_x2 = F(real_y, z_y2)
            Loss_div_F = DiversityLoss(fake_x, fake_x2, z_x, z_x2)
            fake_y2 = G(real_x, z_x2)
            Loss_div_G = DiversityLoss(fake_y, fake_y2, z_y, z_y2)

            # Loss totale
            Loss_Generator = Loss_GAN_G + Loss_GAN_F + lambda_cycle*(Loss_cycle_X + Loss_cycle_Y) + poids_div*(Loss_div_F + Loss_div_G)
            Loss_Generator.backward()
            optimizer_G.step()
            optimizer_F.step()

        # Show loss
    if epoch % 1 == 0:
        print(f"Epoch: {epoch} Loss G : {Loss_GAN_G}")
        print(f"Epoch: {epoch} Loss F : {Loss_GAN_F}")
        print(f"Epoch: {epoch} Loss DY : {loss_DY}")
        # print(f"Epoch: {epoch} Loss DX : {loss_DX}")


Epoch: 0 Loss G : 1.8940273523330688
Epoch: 0 Loss F : 0.19677366316318512
Epoch: 0 Loss DY : -0.4865652322769165
Epoch: 1 Loss G : 3.030085802078247
Epoch: 1 Loss F : 0.01688380166888237
Epoch: 1 Loss DY : -0.810099184513092
Epoch: 2 Loss G : 2.8621327877044678
Epoch: 2 Loss F : -0.013001516461372375
Epoch: 2 Loss DY : -0.42043042182922363
Epoch: 3 Loss G : 3.4332611560821533
Epoch: 3 Loss F : 0.30852797627449036
Epoch: 3 Loss DY : -0.759192705154419
Epoch: 4 Loss G : 3.524346351623535
Epoch: 4 Loss F : 0.39656734466552734
Epoch: 4 Loss DY : -1.1618754863739014
Epoch: 5 Loss G : 3.489110231399536
Epoch: 5 Loss F : -0.21009743213653564
Epoch: 5 Loss DY : -1.1372379064559937
Epoch: 6 Loss G : 3.686211585998535
Epoch: 6 Loss F : -0.22555077075958252
Epoch: 6 Loss DY : -1.2384870052337646
Epoch: 7 Loss G : 3.675940752029419
Epoch: 7 Loss F : -0.1337355226278305
Epoch: 7 Loss DY : -1.2538185119628906
Epoch: 8 Loss G : 3.766140937805176
Epoch: 8 Loss F : -0.937321662902832
Epoch: 8 Loss DY 

# Inférence Cycle-GAN

In [None]:
import matplotlib.pyplot as plt
import torch

def show_5_comparisons(G, z_dim, device, img_dim=64):
    n_samples = 5
    
    # 1. Création des conditions aléatoires (Batch de 5)
    # x_vals est entre [0, 1] pour faciliter le calcul des coordonnées
    x_vals = torch.rand(n_samples, 3).to(device) 
    
    # 2. Préparation pour le GAN (x doit être entre -1 et 1 comme vu précédemment)
    x_for_gan = (x_vals * 2) - 1
    z_noise = torch.randn(n_samples, z_dim).to(device)
    
    # 3. Génération (Batch)
    with torch.no_grad():
        fake_imgs = G(x_for_gan, z_noise).cpu().squeeze() # (5, 64, 64)

    # 4. Création des "Vraies" images théoriques
    real_imgs = []
    y_grid, x_grid = torch.meshgrid(torch.arange(img_dim), torch.arange(img_dim), indexing='ij')
    
    for i in range(n_samples):
        img = torch.zeros((img_dim, img_dim))
        
        # On récupère les scalaires pour chaque sample
        cx = int(x_vals[i, 0] * img_dim)
        cy = int(x_vals[i, 1] * img_dim)
        r = int(x_vals[i, 2] * (img_dim / 4))
        
        mask = ((x_grid - cx)**2 + (y_grid - cy)**2) <= r**2
        img[mask] = 1.0
        real_imgs.append(img)

    # 5. Affichage
    fig, axes = plt.subplots(2, n_samples, figsize=(15, 6))
    
    for i in range(n_samples):
        # Ligne du haut : Vraie Image (Cible)
        axes[0, i].imshow(real_imgs[i], cmap='gray', vmin=0, vmax=1)
        axes[0, i].axis('off')
        if i == 2: axes[0, i].set_title("Cibles Théoriques (Dataset)", fontsize=12)

        # Ligne du bas : Image Générée
        # Dénormalisation de la sortie du GAN (-1,1 -> 0,1)
        gen_img = (fake_imgs[i] + 1) / 2
        axes[1, i].imshow(gen_img, cmap='gray', vmin=0, vmax=1)
        axes[1, i].axis('off')
        if i == 2: axes[1, i].set_title("Générations (GAN)", fontsize=12)

    plt.tight_layout()
    plt.show()

# Utilisation
show_5_comparisons(G, z_dim, device)