# Diffusion prior in VAE
* Estrutura similar ao DCGAN
* KL Annealing Linear

## Parte 1: DDPM prior

### a) Beta schedule cossine
A ideia é definir coeficientes de variância ($\beta_t$) de forma que o produto cumulativo dos $\alpha_t = 1 - \beta_t$ seja suavizado.

É trabalhado como $\bar{\alpha}_t = cos (\frac{t/T + s}{1+s} \times \frac{\pi}{2})^2$, onde $s$ é um pequeno coeficiente para não começar do valor zero. Depois, os betas são calculados como $\beta_t = 1 - \frac{\bar{\alpha_{t+1}}}{\bar{\alpha}_t}$

Com isso, temos:

    * Garantia de transições suaves: Cada etapa adiciona uma quantidade pequena de ruído de forma controlada, o que é crucial para a validade das aproximações gaussianas nos DDPM.

    * Melhora na qualidade das amostras: resulta em melhores métricas de qualidade (como FID), possivelmente porque a distribuição de ruído se comporta de maneira mais gradual.

* Referência: 
    * Improved Denoising Diffusion Probabilistic Models (2021)

### b) Timestep embedding
* O objetivo é gerar uma representação contínua dos timesteps (por exemplo, 0, 1, …, T-1), convertendo eles em vetores contínuos por meio de funções trigonométricas (seno e cosseno)

* ***Equações do Positional encoding***:
Para uma posição $pos$ e uma dimensão $i$ do embedding:
$$ 
PE(pos,2i) = sin(\frac{pos}{1000^{2i/d_{model}}})
$$
    
$$ 
PE(pos,2i+1) = cos(\frac{pos}{1000^{2i/d_{model}}}),
$$

onde:
    * $pos$ seria o timestep (ou a posição) que queremos codificar;
    * $d_{model}$ é a dimensão total do embedding;
    * e para cada par de dimensões (uma para o seno e outra para o cosseno), o denominador $1000^{2i/d_{model}}$ garante que cada dimensão do embedding corresponde a uma frequência diferente.
    
* A codificação sinusoidal permite que o modelo saiba a posição (ou, no nosso caso, o timestep) usando funções senoidais e cosenoidais em diferentes frequências.

* Referências:
    * Denoising Diffusion Probabilistic Models (2020)
    * Attention is all you need (2023)

### c) DDPM loss
* Para cada amostra de z0 (latent extraído pelo encoder):
    1. Amostra um timestep t aleatório (de 0 a T-1).
    2. Calcula z_t = sqrt(prod_alpha[t]) * z0 + sqrt(1 - prod_alpha[t]) * epsilon,
         onde epsilon é ruído gaussiano.
    3. O modelo diffusion_prior tenta prever esse epsilon a partir de z_t e t.
    4. A loss é o erro quadrático médio (MSE) entre o ruído previsto e o ruído real.
    
* Equação do artigo:
$$ L_{DDPM} (x_0, \phi) = E_{t,x_0,x_t} [\frac{1}{2 \sigma_t ^2} ||\mu_\phi(x_t,t) - \tilde{\mu}_t(x_0,x_t)||^2],
$$
onde $\tilde{\mu}_t(x_0,x_t)$ é a média de $q(x_{t-1}|x_0,x_t)$, a forward diffusion posterior condicionada na observação $x_0$, $\mu_\phi(x_t,t)$ média prevista pelo modelo para o processo reverso e $\sigma_t ^2$ é a variância associada ao passo t.


* Efetuaremos uma reparametrização para usar o ruído no lugar de $\mu$, ou seja, se parametrizarmos o processo reverso de forma adequada, essa diferença entre as médias pode ser reescrita como a diferença entre o ruído real $\epsilon$ e uma predição do modelo: $||\mu_\phi(x_t,t) - \tilde{\mu}_t(x_0,x_t)||^2 \propto ||\epsilon_\phi(x_t,t) - \epsilon||^2$, onde $\epsilon_\phi(x_t,t)$ é a predição do ruído pelo modelo.
    * A reparametrização usada é $z_t = \sqrt{\Pi_{s=0} ^t \alpha_s}z_0 + \sqrt{1-\Pi_{s=0} ^t \alpha_s} \epsilon$, onde $z_0$ é a amostra do espaço latente (obtida do encoder), $\alpha_s = 1 - \beta_s$ (note que $\alpha_s$ representa a proporção da informação original que permanece após a adição de ruído naquele passo) e $\epsilon$ é amostrado de $N(0,I)$.
    * Tal formulação permite que a amostragem seja diferenciável.
    * Dessa forma, ao treinar o modelo para prever o ruído $\epsilon$ (por meio do MSE), garantimos que a aprendizagem está focada em como remover o ruído do estado atual $z_t$.
    
    
## Parte 2: VAE

### a) Encoder
* Responsável por levar os dados até o espaço latente.
* Usaremos duas redes convolucionais e calcularemos, usando camadas lineares, $\mu$ e $log \sigma^2$ da distribuição $q(z|x)$
* As duas saídas serão usadas para a reparametrização, onde amostramos $z$ a partir de $q(z|x)$.


### b) Decoder
* Responsável por levar do espaço latente até a reconstrução de imagens $(\hat{x})$.
* Primeiro, o vetor $z$ é transformado por uma camada linear e, em seguida, é desachatado.
* Usaremos duas redes convolucionais e usamos a função de ativação sigmoid de modo a garantir uma saída no intervalo [0,1], assim como estão as imagens normalizadas. 


### c) Reparametrização
* Usamos a equação $z = \mu(x) + \sigma(x) \odot \epsilon$, ou seja, $z_i = \mu_i + \sigma_i . \epsilon_i$

## Parte 3: Loss treinamento
* No artigo principal, a loss é dada por:
$$ \mathcal{L}(x;\phi, \theta, \psi) = E_{q_\psi}[log \frac{p_\theta(x|z)}{q_\psi (z|x)}] + E_{q_\psi} [L_{DDPM}(z_0;\phi)],
$$
onde $q_\psi(z|x)$ é a distribuição aproximada do encoder, $p_\theta (x|z)$ é o modelo de verossimilhança do decoder.

* De certo modo, a loss é escrita da forma: $Loss = reconstructed_{Loss} + Latent_{Loss}$.
    * A ***recon_loss*** é calculada como a Binary Cross-Entropy entre a imagem reconstruída e a imagem original.
    * A ***latent_loss***  é calculada a partir da loss de difusão. Essa parte substitui o termo tradicional da KL divergence.
    
* Lembrando que a função loss de difusão faz:
    1. A partir de $z_0$, é gerado o $z_t$ usando o forward do DDPM: $z_t = \sqrt{\Pi_{s=0} ^t \alpha_s}z_0 + \sqrt{1-\Pi_{s=0} ^t \alpha_s} \epsilon$, com $\epsilon \sim N(0,I)$.
    2. O diffusion prior $(\epsilon_\phi(z_t,t))$ tenta prever o ruído $\epsilon$.
    3. A loss é o erro quadrático médio (MSE) entre a predição do ruído e o ruído real: $DDPM_{loss} = ||\epsilon_\phi(z_t,t) - \epsilon||^2$
    
### Uso de KL Annealing
A técnica consiste em iniciar o treinamento com o termo da divergência KL com um peso baixo – ou até mesmo zero – e aumentá-lo gradativamente até atingir o valor desejado. À medida que o treinamento avança, o peso em KL é aumentado, forçando o modelo a regularizar o espaço latente para que ele se aproxime da distribuição do prior.

Vale destacar que valores altos do peso para o KL no caso linear, influencia termo KL a ter mais influência. Isso força o encoder a manter o posterior mais próximo do prior. Por outro lado, menores valores para o peso permite que o encoder se concentre mais na reconstrução dos dados e aprenda representações mais detalhadas e ricas, mas se a regularização for muito fraca, o espaço latente pode não ter uma estrutura bem definida

* Referências:
    * Generating Sentences from a Continuous Space
    * Understanding Posterior Collapse in Generative Latent Variable Models
    
## Parte 4: Arquitetura similar a DCGAN
No Encoder a arquitetura utiliza camadas convolucionais com strides maiores (geralmente stride 2) para fazer o downsampling das imagens, seguida de técnicas como Batch Normalization e funções de ativação (normalmente LeakyReLU). Essa estrutura é típica do encoder em DCGAN.

No Decoder realiza-se o processo inverso, normalmente começando com uma camada totalmente conectada para "descompactar" o vetor latente em mapas de características (feature maps) e, em seguida, aplicando camadas de convolução transposta (ou upsampling seguidas de convolução), além de Batch Normalization e funções de ativação (como ReLU ou Tanh na camada final) para gerar as imagens.



* Referência:
    * UNSUPERVISED REPRESENTATION LEARNING WITH DEEP CONVOLUTIONAL GENERATIVE ADVERSARIAL NETWORKS

In [None]:
import math
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms, utils
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm
import numpy as np

In [None]:

# Parâmetros Gerais e Hiperparâmetros 
hidden_dim = 128
time_embed_dim = 100
latent_dim = 64
T = 100
latent_weight = 0.1  # peso para o termo KLD (com annealing)
epochs = 400  # número total de épocas para o treinamento conjunto
batch_size = 64
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Annealing schedule para o KLD
def latent_weight(epoch, max_epochs):
    annealing_epochs = max_epochs * 0.2
    if epoch < annealing_epochs:
        return latent_weight * (epoch / annealing_epochs)
    return latent_weight


# Transformação para CIFAR10
transform_cifar10 = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])


# Cosine Beta Schedule
def cosine_beta_schedule(timesteps, s=0.008):
    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps, dtype=torch.float32)
    alphas_prod = torch.cos(((x/timesteps) + s) / (1+s) * (math.pi * 0.5))**2
    alphas_prod = alphas_prod / alphas_prod[0]
    betas = 1 - (alphas_prod[1:] / alphas_prod[:-1])
    betas = betas.clamp(0, 0.999)
    return betas

betas = cosine_beta_schedule(T).to(device)
alphas = 1 - betas
alpha_bars = torch.cumprod(alphas, dim=0)


# Função de embedding para timestep
def timestep_embedding(timesteps, embedding_dim):
    half_dim = embedding_dim // 2 
    emb = torch.exp(torch.arange(half_dim, dtype=torch.float32, device=timesteps.device) *
                    -(math.log(10000.0) / (half_dim - 1)))
    emb = timesteps.float().unsqueeze(1) * emb.unsqueeze(0)
    emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
    if embedding_dim % 2 == 1:
        emb = torch.cat([emb, torch.zeros(timesteps.size(0), 1, device=emb.device)], dim=1)
    return emb


# Diffusion Prior
class DiffusionPrior(nn.Module):
    def __init__(self, latent_dim, time_embed_dim=100, hidden_dim=256):
        """
        A rede recebe um vetor ruidoso z_t com dimensão = latent_dim 
        e concatena com o embedding do timestep (dimensão time_embed_dim),
        formando a entrada de dimensão (latent_dim + time_embed_dim).
        """
        super(DiffusionPrior, self).__init__()
        self.time_embed_dim = time_embed_dim
        # Refinando o vetor timestep antes de concatenar com z_t
            ## Uso de SiLU para gerar gradientes mais suaves do que ReLU
        self.time_mlp = nn.Sequential(
            nn.Linear(time_embed_dim, time_embed_dim * 2),
            nn.SiLU(),
            nn.Linear(time_embed_dim * 2, time_embed_dim)
        )
        # A entrada é concatenada: [z_t (latent_dim) , t_embed (time_embed_dim)]
        self.fc1 = nn.Linear(latent_dim + time_embed_dim, hidden_dim)
        self.norm1 = nn.LayerNorm(hidden_dim) #normalizaçâo na camada
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.norm3 = nn.LayerNorm(hidden_dim)
        #  Essa saída representa a predição do ruído (ou correção) que deve ser subtraído de z_t
            ## Lembrete: Ao retirar o ruído de z_t geramos z_0
            ## A saída deve ter dimensão igual a latent_dim
        self.fc4 = nn.Linear(hidden_dim, latent_dim)
        
        # Objetivo: manter a variância das ativações (e dos gradientes) aproximadamente constante através das camadas
            ## Ajuda a evitar tanto a explosão quanto o desaparecimento dos gradientes durante o treinamento.
            ## Treinamento mais estável
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)
        nn.init.xavier_uniform_(self.fc3.weight)
        nn.init.zeros_(self.fc4.weight)
    
    def forward(self, z_t, t):
        # Lembrete: z_t é o ruidoso e t é o time_step atual
        # Cria o embedding do timestep com dimensão time_embed_dim
            ## Usa as funções sinusoidais
        t_embed = timestep_embedding(t, self.time_embed_dim)
        t_embed = self.time_mlp(t_embed)
        # Concatena a informação do vetor ruidoso com o embedding
        x = torch.cat([z_t, t_embed], dim=1)
        h1 = F.silu(self.norm1(self.fc1(x))) #1ª representaçâo interna (h_1)
        h2 = F.silu(self.norm2(self.fc2(h1)))
        h3 = F.silu(self.norm3(self.fc3(h2)))
        # Conexão residual simples
            ## Essa soma ajuda a preservar informações iniciais e melhora a propagação do gradiente durante o treinamento
        h = h3 + h1  
        # Predição do ruìdo
        noise_pred = self.fc4(h)
        return noise_pred


# Loss DDPM 
def ddpm_loss(diffusion_prior, z0, betas, alpha_bars, T):
    batch_size, latent_dim = z0.shape
    device = z0.device
    # Seleciona um timestep aleatório para cada exemplo
    t = torch.randint(0, T, (batch_size,), device=device)
    alpha_bar_t = alpha_bars[t].view(-1, 1)
    epsilon = torch.randn_like(z0)
    # Cria z_t de acordo com a fórmula do forward process
    z_t = torch.sqrt(alpha_bar_t) * z0 + torch.sqrt(1 - alpha_bar_t) * epsilon
    noise_pred = diffusion_prior(z_t, t)
    beta_t = betas[t].view(-1, 1)
    # MSE ponderado por 1/(2 * beta_t)
    loss = torch.mean((noise_pred - epsilon)**2 / (2 * beta_t))
    return loss


# Encoder para CIFAR10 (inspirado no DCGAN)
    ## Dimensão CIFAR: 32 x 32
class EncoderCIFAR10(nn.Module):
    def __init__(self, latent_dim):
        """
        O encoder extrai características via camadas convolucionais
        e, no final do processo, utiliza camadas lineares para produzir os parâmetros do espaço
        latente (μ e log(σ²)) com dimensão igual a latent_dim.
        """
        super(EncoderCIFAR10, self).__init__()
        # Conv 1
            ## Entrada: 3 canais (RGB)
            ## Saìda: 64 canais
            ## Calculo: saida = [(dim_img + 2xpadding - kernel_size)/stride] + 1
                ### No primeiro caso, dim_img = 32 -> Resultado 16
                ### dim_saida_conv1 = (64,16,16)
        self.conv1 = nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        # Conv 2
            ## Entrada: 64 canais 
            ## Saìda: 128 canais
            ## Calculo: saida = [(dim_img + 2xpadding - kernel_size)/stride] + 1
                ### No segundo caso, dim_img = 16 -> Resultado 8
                ### dim_saida_conv1 = (128,8,8)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        # Conv 3
        self.conv3 = nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        # Flatten
        self.fc_input_dim = 256 * 4 * 4
        # Aprender \mu e log \sigma^2
        self.fc_mu = nn.Linear(self.fc_input_dim, latent_dim)
        self.fc_logvar = nn.Linear(self.fc_input_dim, latent_dim)
    
    def forward(self, x):
        # Dimensão de x: (batch_size,3, 32, 32)
        batch_size = x.size(0) 
        x = F.leaky_relu(self.bn1(self.conv1(x)), negative_slope=0.2)
        x = F.leaky_relu(self.bn2(self.conv2(x)), negative_slope=0.2)
        x = F.leaky_relu(self.bn3(self.conv3(x)), negative_slope=0.2)
        x = x.view(batch_size, -1)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar


# Decoder para CIFAR10 (inspirado no DCGAN)
class DecoderCIFAR10(nn.Module):
    def __init__(self, latent_dim):
        """
        O decoder recebe o vetor latente de dimensão latent_dim e o mapeia
        para uma representação espacial, seguido de 
        camadas deconvolucionais para reconstruir a imagem original.
        """
        super(DecoderCIFAR10, self).__init__()
        self.fc = nn.Linear(latent_dim, 256 * 4 * 4) #Olhar dim encoder
        # DeConv1
            ## dim_saida = (dim_img_entrada - 1) x stride - 2 x padding + kernel_size
            ## dim_img_entrada = 4 -> dim_saida = 8
        self.deconv1 = nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(128)
        # DeConv2
        self.deconv2 = nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        # DeConv3
        self.deconv3 = nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1)
    
    def forward(self, z):
        # A camada fc transforma vetor latente (batch, latent_dim) no vetor (batch, 256*4*4)
        batch_size = z.size(0)
        x = F.relu(self.fc(z))
        #O vetor é reorganizado (reshape) para formar um tensor com formato (batch,256, 4, 4)
        x = x.view(batch_size, 256, 4, 4)
        # Processo de Desconvolução
        x = F.relu(self.bn1(self.deconv1(x)))
        x = F.relu(self.bn2(self.deconv2(x)))
        x = torch.tanh(self.deconv3(x)) #Garante intervalo [-1,1]
        return x


# Reparametrização para amostrar z a partir de mu e logvar
def reparameterize(mu, logvar):
    std = torch.exp(0.5 * logvar)
    eps = torch.randn_like(std)
    return mu + eps * std


# Função de Sampling 
    ## (Reverse Diffusion) para gerar latentes

def sample_latent(diffusion_prior, T, latent_dim, betas, alpha_bars, device):
    z = torch.randn((1, latent_dim), device=device) #Amostra z_T (o estado final do forward process) a ser “desruído” no processo reverso.
    # O loop aplica o processo de reverse diffusion para atualizar z passo a passo
    for t in reversed(range(1, T)): 
        t_tensor = torch.full((z.shape[0],), t, device=device, dtype=torch.long) #informa em qual passo de difusão está
        pred_noise = diffusion_prior(z, t_tensor) #pred \epsilon
        beta_t = betas[t] #variância do ruído para o passo atual
        alpha_t = alphas[t]
        alpha_bar_t = alpha_bars[t] #valor cumulativo
        # Essa operação "remove" parte do ruído predito
            ## aproximando z de um estado com menos ruído.
        z = (z - ((1 - alpha_t) / torch.sqrt(1 - alpha_bar_t)) * pred_noise) / torch.sqrt(alpha_t) #atualiza z
        
        # Add ruìdo ao z atualizado
            ## simula a parte aleatória do reverse process
        if t > 0:
            noise = torch.randn_like(z)
            z = z + torch.sqrt(beta_t) * noise
    return z


# Inicialização 
encoder = EncoderCIFAR10(latent_dim).to(device)
decoder = DecoderCIFAR10(latent_dim).to(device)
diffusion_prior = DiffusionPrior(latent_dim).to(device)

# Otimizador conjunto para encoder, decoder e diffusion prior
optimizer = optim.Adam(
    list(encoder.parameters()) + list(decoder.parameters()) + list(diffusion_prior.parameters()),
    lr=5e-4
)

# Carregamento CIFAR10
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_cifar10)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_cifar10)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


# Treinamento Conjunto
def train(epochs):
    encoder.train()
    decoder.train()
    diffusion_prior.train()
    
    for epoch in range(epochs):
        total_loss = 0.0
        recon_loss_total = 0.0
        kld_loss_total = 0.0
        ddpm_loss_total = 0.0
        
        for x, _ in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            x = x.to(device)
            optimizer.zero_grad()
            
            # Forward no VAE
            mu, logvar = encoder(x)
            z0 = reparameterize(mu, logvar)
            x_recon = decoder(z0)
            
            recon_loss = F.mse_loss(x_recon, x, reduction='sum') / x.size(0)
            kld_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) / x.size(0)
            
            # Loss do diffusion prior (DDPM) aplicada sobre z0
            ddpm_loss_val = ddpm_loss(diffusion_prior, z0, betas, alpha_bars, T)
            
            current_latent_weight = latent_weight(epoch, epochs)
            # Loss total do modelo conjunto, conforme Eq. (20) do artigo:
            loss = recon_loss + current_latent_weight * kld_loss + ddpm_loss_val
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            recon_loss_total += recon_loss.item()
            kld_loss_total += kld_loss.item()
            ddpm_loss_total += ddpm_loss_val.item()
        
        avg_loss = total_loss / len(train_loader)
        avg_recon = recon_loss_total / len(train_loader)
        avg_kld = kld_loss_total / len(train_loader)
        avg_ddpm = ddpm_loss_total / len(train_loader)
        
        print(f"Epoch [{epoch+1}/{epochs}] - Loss: {avg_loss:.4f} | Recon: {avg_recon:.4f} | KLD: {avg_kld:.4f} | DDPM: {avg_ddpm:.4f}")

print("=== Treinando VAE com Diffusion Prior (Treinamento Conjunto) ===")
train(epochs)





Geração e avaliação de imagens

In [None]:

# Geração
num_imgs = 2000
def generate_img(num_images=num_imgs):
    fake_folder = './fake_images_cifar10'
    os.makedirs(fake_folder, exist_ok=True)
    
    diffusion_prior.eval()
    decoder.eval()
    
    with torch.no_grad():
        z_samples = []
        for _ in range(num_images):
            z = sample_latent(diffusion_prior, T, latent_dim, betas, alpha_bars, device)
            z_samples.append(z)
        z_samples = torch.cat(z_samples, dim=0)
        images = decoder(z_samples).cpu()
    
    for i, img in enumerate(images):
        utils.save_image(img, os.path.join(fake_folder, f"fake_cifa10_{i}.png"))

generate_img(num_imgs)


# Visualização de Amostras
def visualize_samples(num_samples=10):
    diffusion_prior.eval()
    decoder.eval()
    
    with torch.no_grad():
        z_samples = []
        for _ in range(num_samples):
            z = sample_latent(diffusion_prior, T, latent_dim, betas, alpha_bars, device)
            z_samples.append(z)
        z_samples = torch.cat(z_samples, dim=0)
        images = decoder(z_samples).cpu()
    
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))
    for i, ax in enumerate(axes):
        ax.imshow(images[i].permute(1,2,0).numpy() * 0.5 + 0.5)
        ax.axis('off')
    plt.tight_layout()
    plt.show()

visualize_samples()

# Salva imagens reais (opcional, se ainda não foram salvas)
def real_images(dataset, folder, num_images=num_imgs):
    os.makedirs(folder, exist_ok=True)
    count = 0
    for img, _ in dataset:
        fname = f"real_{count}.png"
        utils.save_image(img, os.path.join(folder, fname))
        count += 1
        if count >= num_images:
            break

real_images(test_dataset, './real_images_cifar10', num_images=num_imgs)




In [None]:
# Função para calcular o FID
def fid():
    from pytorch_fid.fid_score import calculate_fid_given_paths
    real_path = './real_images_cifar10'
    fake_path = './fake_images_cifar10'
    
    fid_batch_size = batch_size if batch_size > 0 else 1
    
    try:
        fid = calculate_fid_given_paths([real_path, fake_path], fid_batch_size, device, dims=2048)
        return fid
    except Exception as e:
        print("Erro ao calcular FID:", str(e))
        return float('nan')

fid_value = fid()
print("\nFID Score:", fid_value)
