# Atividade: GANs

Neste notebook, você irá **preparar seu próprio dataset** e **treinar uma DCGAN utilizando a distância de Wasserstein**.
O objetivo é gerar imagens sintéticas a partir de ruído, aprendendo a distribuição dos dados reais.

O treinamento será realizado com um **Gerador** e um **Crítico** (substituindo o discriminador tradicional), aplicando **gradient clipping** para garantir a restrição de Lipschitz exigida pela métrica de Wasserstein.

Ao final, o modelo deverá ser capaz de **produzir imagens realistas** a partir de vetores aleatórios.

## Preparando os dados

Para esta atividade, será necessário baixar ou montar um dataset de imagens de um domínio específico (por exemplo, rostos, paisagens, objetos, etc.). Você pode utilizar datasets públicos como LFW e CelebA ou criar o seu próprio conjunto de imagens armazenadas em uma pasta local.

In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
import torchvision.utils as vutils
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from icrawler.builtin import BingImageCrawler
import numpy as np
from tqdm import tqdm
import logging

In [2]:
# Só resolvendo um probleminha de logging
logging.getLogger("icrawler").setLevel(logging.ERROR)
logging.getLogger("parser").setLevel(logging.ERROR)
logging.getLogger("downloader").setLevel(logging.ERROR)
logging.getLogger("feeder").setLevel(logging.ERROR)

In [3]:
DATA_DIR = "data/images"
OUTPUT_DIR = "output"
KEYWORD = "landscape painting"
IMAGE_SIZE = 64
BATCH_SIZE = 8
Z_DIM = 100
NUM_EPOCHS = 150
LR = 5e-5
CLIP_VALUE = 0.01
N_CRITIC = 5
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
os.makedirs(OUTPUT_DIR, exist_ok=True)

### Coleta de Imagens

Caso opte por montar seu próprio dataset, você pode utilizar a biblioteca iCrawler para baixar imagens automaticamente a partir de buscadores (como Google, Bing ou Baidu), fornecendo uma lista de termos relacionados ao domínio desejado.

In [5]:
def download_images(keyword, folder, n_total=200):
    if not os.path.exists(folder):
        os.makedirs(folder)

    existing = len([f for f in os.listdir(folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
    if existing >= n_total:
        print(f"Found {existing} images, skipping download.")
        return

    print(f"Downloading images for '{keyword}'...")
    try:
        crawler = BingImageCrawler(storage={'root_dir': folder})
        crawler.crawl(keyword=keyword, max_num=n_total)
    except Exception as e:
        print(f"Crawler failed: {e}")

    existing = len([f for f in os.listdir(folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
    if existing == 0:
        print("Download failed or no images found. Generating synthetic data for testing...")
        for i in range(n_total):
            img = np.random.randint(0, 255, (IMAGE_SIZE, IMAGE_SIZE, 3), dtype=np.uint8)
            Image.fromarray(img).save(os.path.join(folder, f"synthetic_{i}.png"))
    
    print("Data preparation complete!")

### Implementação do Dataset

Com as imagens já disponíveis, implemente uma **classe de Dataset personalizada** para o PyTorch. Ela deve herdar de `Dataset` e retornar, em cada amostra, a imagem processada pelos **transforms** definidos anteriormente.

O Dataset deve:

* Ler as imagens a partir de uma pasta.
* Converter as imagens para **tensor normalizado** (ex.: valores entre -1 e 1).

In [6]:
from PIL import Image
from torch.utils.data import Dataset, DataLoader

class ImageDataset(Dataset):
    def __init__(self, folder, transform=None):
        self.folder = folder
        self.transform = transform
        self.images = [
            os.path.join(folder, f)
            for f in os.listdir(folder)
            if f.lower().endswith((".png", ".jpg", ".jpeg"))
        ]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        try:
            img = Image.open(img_path).convert("RGB")
            if self.transform:
                img = self.transform(img)
            return img
        except Exception as e:
            print(f"Error loading {img_path}: {e}")
            return torch.zeros(3, IMAGE_SIZE, IMAGE_SIZE)

### Carregamento

Carregue os dados a partir do seu dataset de imagens e aplique os transforms necessários. Caso o dataset seja pequeno, recomenda-se o uso de data augmentation (como flips horizontais, jitter de cor ou pequenas rotações) para aumentar a diversidade das amostras e melhorar a estabilidade do treinamento adversarial. Em seguida, defina um batch size adequado e instancie um DataLoader.

## Definição dos Modelos

Para este exercício, deverão ser utilizadas DCGANs com distância de Wasserstein. Nesta seção, defina a arquitetura dos modelos Gerador e Crítico, implementando o treinamento adversarial baseado na métrica de Wasserstein.

### Gerador

O Gerador seguirá a arquitetura típica de uma DCGAN, produzindo amostras sintéticas a partir de vetores de ruído, enquanto o Crítico avaliará a distância entre as distribuições reais e geradas.

In [None]:
class Generator(nn.Module):
    def __init__(self, z_dim=100, img_channels=3, feature_g=64):
        super(Generator, self).__init__()
        self.net = nn.Sequential(
            self._block(z_dim, feature_g * 8, 4, 1, 0),
            self._block(feature_g * 8, feature_g * 4, 4, 2, 1),
            self._block(feature_g * 4, feature_g * 2, 4, 2, 1),
            self._block(feature_g * 2, feature_g, 4, 2, 1),
            nn.ConvTranspose2d(feature_g, img_channels, 4, 2, 1),
            nn.Tanh()
        )

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride, padding, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(True)
        )

    def forward(self, x):
        return self.net(x)

### Crítico

O Crítico (substituindo o discriminador tradicional) deve utilizar gradient clipping para garantir o cumprimento da restrição de Lipschitz, condição essencial para a estabilidade da função de custo de Wasserstein.

In [None]:
class Critic(nn.Module):
    def __init__(self, img_channels=3, feature_d=64):
        super(Critic, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(img_channels, feature_d, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            self._block(feature_d, feature_d * 2, 4, 2, 1),
            self._block(feature_d * 2, feature_d * 4, 4, 2, 1),
            self._block(feature_d * 4, feature_d * 8, 4, 2, 1),
            nn.Conv2d(feature_d * 8, 1, 4, 1, 0),
        )

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.2, inplace=True)
        )

    def forward(self, x):
        return self.net(x)

In [9]:
def initialize_weights(model):
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d, nn.BatchNorm2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)

## Treinamento

Com o **Gerador** e o **Crítico** definidos, e os dados devidamente carregados, inicie o treinamento.

Durante o processo:

* Atualize o **Crítico** várias vezes para cada atualização do **Gerador**, garantindo uma estimativa mais precisa da distância de Wasserstein.
* Aplique **gradient clipping** nos parâmetros do Crítico após cada atualização, mantendo a restrição de **Lipschitz**.
* Utilize **losses baseadas na métrica de Wasserstein**.

Ao longo do treinamento, **visualize amostras geradas** a cada determinado número de épocas, observando a evolução da qualidade das imagens produzidas pelo Gerador.

In [10]:
def train():
    download_images(KEYWORD, DATA_DIR, n_total=1000)
    
    transform = T.Compose([
        T.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        T.ToTensor(),
        T.Normalize([0.5]*3, [0.5]*3)
    ])
    
    dataset = ImageDataset(DATA_DIR, transform=transform)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
    gen = Generator(Z_DIM).to(DEVICE)
    critic = Critic().to(DEVICE)
    initialize_weights(gen)
    initialize_weights(critic)
    opt_gen = optim.RMSprop(gen.parameters(), lr=LR)
    opt_critic = optim.RMSprop(critic.parameters(), lr=LR)
    
    fixed_noise = torch.randn(32, Z_DIM, 1, 1).to(DEVICE)
    
    print("Starting Training...")
    step = 0
    
    for epoch in range(NUM_EPOCHS):
        loop = tqdm(dataloader, leave=True)
        for batch_idx, real in enumerate(loop):
            real = real.to(DEVICE)
            cur_batch_size = real.shape[0]
            for _ in range(N_CRITIC):
                noise = torch.randn(cur_batch_size, Z_DIM, 1, 1).to(DEVICE)
                fake = gen(noise)
                
                critic_real = critic(real).reshape(-1)
                critic_fake = critic(fake).reshape(-1)
                
                loss_critic = -(torch.mean(critic_real) - torch.mean(critic_fake))
                
                critic.zero_grad()
                loss_critic.backward(retain_graph=True)
                opt_critic.step()
                
                for p in critic.parameters():
                    p.data.clamp_(-CLIP_VALUE, CLIP_VALUE)

            gen_fake = critic(fake).reshape(-1)
            loss_gen = -torch.mean(gen_fake)
            
            gen.zero_grad()
            loss_gen.backward()
            opt_gen.step()
            
            loop.set_description(f"Epoch [{epoch}/{NUM_EPOCHS}]")
            loop.set_postfix(loss_d=loss_critic.item(), loss_g=loss_gen.item())
            
            step += 1
            
        with torch.no_grad():
            fake = gen(fixed_noise)
            img_grid = vutils.make_grid(fake[:32], normalize=True)
            vutils.save_image(img_grid, f"{OUTPUT_DIR}/epoch_{epoch}.png")

    print("Training Finished!")

## Inferência

Após o treinamento, utilize o **Gerador** para produzir novas imagens a partir de **vetores de ruído aleatório**.
Cada vetor servirá como ponto de partida no espaço latente, sendo transformado pelo modelo em uma amostra sintética do domínio aprendido.

Durante a inferência:

* Gere múltiplas imagens e visualize os resultados.
* Analise a **qualidade e diversidade** das amostras produzidas.

In [11]:
train()

Downloading images for 'landscape painting'...
Data preparation complete!
Starting Training...


Epoch [0/150]: 100%|██████████| 50/50 [00:05<00:00,  9.80it/s, loss_d=-1.47, loss_g=0.71]   
Epoch [1/150]: 100%|██████████| 50/50 [00:04<00:00, 10.13it/s, loss_d=-1.53, loss_g=0.733]
Epoch [2/150]: 100%|██████████| 50/50 [00:04<00:00, 10.83it/s, loss_d=-1.53, loss_g=0.727]
Epoch [3/150]: 100%|██████████| 50/50 [00:04<00:00, 10.33it/s, loss_d=-1.54, loss_g=0.738]
Epoch [4/150]: 100%|██████████| 50/50 [00:05<00:00,  9.80it/s, loss_d=-1.55, loss_g=0.739]
Epoch [5/150]: 100%|██████████| 50/50 [00:05<00:00,  8.79it/s, loss_d=-1.54, loss_g=0.737]
Epoch [6/150]: 100%|██████████| 50/50 [00:05<00:00,  9.39it/s, loss_d=-1.55, loss_g=0.739]
Epoch [7/150]: 100%|██████████| 50/50 [00:05<00:00,  9.55it/s, loss_d=-1.55, loss_g=0.74] 
Epoch [8/150]: 100%|██████████| 50/50 [00:05<00:00,  9.70it/s, loss_d=-1.55, loss_g=0.74] 
Epoch [9/150]: 100%|██████████| 50/50 [00:05<00:00,  9.63it/s, loss_d=-1.55, loss_g=0.74] 
Epoch [10/150]: 100%|██████████| 50/50 [00:05<00:00,  9.77it/s, loss_d=-1.55, loss_g=0.7

Training Finished!



