# Importy, wizualizacja
(Należy odpalić i schować).

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch.utils.data import Subset
from torchvision.datasets import FashionMNIST
from torchvision.transforms import Compose, Lambda, ToTensor

%matplotlib inline

device = "cuda" if torch.cuda.is_available() else "cpu"

def plot_dataset(train_data, generator, is_cond=False, n_samples = 5, show_train=True):
    view_data = train_data.data[:n_samples].view(-1, 28 * 28) / 255.0

    if show_train:
        labels = train_data.targets[:n_samples]
    else:
        labels = torch.arange(n_samples) % 10

    noise = torch.randn((n_samples, generator.latent_dim), device=device)
    with torch.no_grad():
        if is_cond:
            labels_one_hot = torch.nn.functional.one_hot(labels, 10).to(torch.float32).to(device)
            gen_data = generator(noise, labels_one_hot).cpu().detach().numpy()
        else:
            gen_data = generator(noise).cpu().detach().numpy()

    n_rows = 2 if show_train else 1
    n_cols = len(view_data)
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(n_cols * 2, n_rows * 2))

    if show_train:
        for i in range(n_cols):
            axes[0][i].imshow(np.reshape(view_data.data.numpy()[i], (28, 28)), cmap="gray")
            axes[0][i].set_xticks(())
            axes[0][i].set_yticks(())

        for i in range(n_cols):
            axes[1][i].clear()
            axes[1][i].imshow(np.reshape(gen_data[i], (28, 28)), cmap="gray")
            axes[1][i].set_xticks(())
            axes[1][i].set_yticks(())

    else:
        for i in range(n_cols):
            axes[i].imshow(np.reshape(gen_data[i], (28, 28)), cmap="gray")
            axes[i].set_xticks(())
            axes[i].set_yticks(())

    plt.show()

torch.manual_seed(1337)
batch_size = 128
transforms = Compose([ToTensor(), Lambda(lambda x: x.flatten())])

# Mnist dataset
train_data = FashionMNIST(
    root=".", train=True, transform=transforms, download=True
)  # change to false if you already have the data

# Data Loader for easy mini-batch return in training, the image batch shape will be (50, 1, 28, 28)
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)

# Generative Adversarial Networks (GANs)

Idea GANów polega na takim ustaleniu wag generatora, aby utworzone przykłady były nieodróżnialne od prawdziwych danych (pochodziły z tego samego rozkładu). Nowatorskie podejście w GANach polega na tym, że model oceniający czy dane są realistyczne, będzie się uczył razem z modelem generującym. Powyższa idea oznacza to, że model GANowski jest grą dwóch graczy:

* pierwszym graczem jest generator $G(z)$ reprezentowany przez sieć neuronową z parametrami $\theta$ zaś $z$ ∼ $N(0, I)$ jest losowym szumem z rozkładu normalnego. Jego głównym zadaniem jest przekształcenie szumu wejściowego $z$ w obiekt $G(z)$ ∼ $P_G$ podobny do rzeczywistych danych (z rozkładu $P_{dane}$).
* drugim graczem jest dyskryminator $D(x)$ reprezentowany przez sieć neuronową z parametrami $\phi$, zaś $x$ jest obiektem z $P_G$ albo $P_{dane}$. Dyskryminator ma odróżniać dane pochodzące od generatora od danych rzeczywistych. Mówiąc dokładniej jest to klasyfikator, który zwraca prawdopodobieństwo, że obiekt $x$ pochodzi z danych rzeczywistych $P_{dane}$, a nie z $P_G$.

<img src="resources/gan.png">

Architecture of a generative adversarial network. ([Image source](http://www.kdnuggets.com/2017/01/generative-adversarial-networks-hot-topic-machine-learning.html))

## Zadanie 1. Vanilla GAN (3 pkt.)

Należy zaimplementować klasy Generator i Discriminator. Można zastosować dowolną architekturę sieci pod warunkiem, że:
* Generator przyjmuje wektor o rozmiarze `latent_dim` i produkuje wektor o rozmiarze `out_dim` z wartościami w zakresie \[-1, 1\]. Przykladową implementacją są warstwy nn.Linear \[latent_dim, 128, 256, 512, out_dim] z aktywacjami nn.LeakyReLU oraz nn.Tanh na końcu.
* Discriminator przyjmuje wektor o rozmiarze `input_size` i produkuje wektor o rozmiarze `1` z wartościami w zakresie \[0, 1\]. Przykladową implementacją są warstwy nn.Linear \[input_size, 128, 128, 64, 1] z aktywacjami nn.LeakyReLU oraz nn.Sigmoid na końcu.

In [None]:
class Generator(torch.nn.Module):

    def __init__(self, latent_dim: int, out_dim: int):
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.out_dim = out_dim

        self.model = ...


    def forward(self, x):
        return self.model(x)

In [None]:
class Discriminator(nn.Module):

    def __init__(self, input_size: int):
        super(Discriminator, self).__init__()
        self.input_size = input_size

        self.model = ...

    def forward(self, x):
        return self.model(x)

In [None]:
# Hyper Parameters
epochs: int = 25
g_lr: float = 1e-4  # learning rate
d_lr: float = 1e-4
latent_dim = 64

# models
generator = Generator(latent_dim, 784).to(device)
discriminator = Discriminator(784).to(device)

# optimizers
g_optimizer = torch.optim.Adam(generator.parameters(), lr=g_lr)
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=d_lr)

criterion = nn.BCELoss()

for epoch in range(epochs):
    epoch_losses = []  # For logging purposes
    for step, (x, _) in enumerate(train_loader):
        real_x = x.to(device)

        batch_size = real_x.shape[0]

        # Train discriminator
        d_optimizer.zero_grad()

        real_d_pred = discriminator(real_x)
        real_discriminator_loss = criterion(real_d_pred, torch.ones((batch_size, 1), device=device))

        noise = torch.randn((batch_size, latent_dim), device=device)
        fake_x = generator(noise)
        fake_d_pred = discriminator(fake_x.detach())
        fake_discriminator_loss = criterion(fake_d_pred, torch.zeros((batch_size, 1), device=device))

        discriminator_loss = real_discriminator_loss + fake_discriminator_loss
        discriminator_loss.backward()
        d_optimizer.step()

        # Train generator
        g_optimizer.zero_grad()

        noise = torch.randn((batch_size, latent_dim), device=device)
        gen_x = generator(noise)
        gen_d_pred = discriminator(gen_x)
        generator_loss = criterion(gen_d_pred, torch.ones((batch_size, 1), device=device))
        generator_loss.backward()
        g_optimizer.step()

        epoch_losses.append(np.array([discriminator_loss.item(), generator_loss.item()]))

    epoch_losses_np = np.stack(epoch_losses, axis=0)

    print(f"Epoch: {epoch}  |  total loss: {epoch_losses_np.mean():.4f} |  disc_loss: {epoch_losses_np[:, 0].mean():.4f} | gen_loss: {epoch_losses_np[:, 1].mean():.4f}")

    if epoch % 5 == 0:
        plot_dataset(train_data, generator)

## Zadanie 2. Conditional GAN (4 pkt.)

Zwykły GAN generuje obraz z szumu i trudno nam kontrolować wynik. Możemy wymusić GAN generować obrazki z wybranej klasy podając do sieci wektor, który ją enkoduje.

<img src="resources/cgan.png">

CGAN vs GAN diagram ([Image source](https://learnopencv.com/conditional-gan-cgan-in-pytorch-and-tensorflow/))

Należy zaimplementować klasy ConditionalGenerator i ConditionalDiscriminator. Można zastosować dowolną architekturę sieci pod warunkiem, że:
* Generator przyjmuje wektory o rozmiarach `latent_dim` i `num_classes` oraz produkuje wektor o rozmiarze `out_dim` z wartościami w zakresie \[-1, 1\]. Przykladową implementacją są warstwy nn.Linear \[latent_dim + num_classes, 128, 256, 512, out_dim] z aktywacjami nn.LeakyReLU oraz nn.Tanh na końcu.
* Discriminator przyjmuje wektory o rozmiarach `latent_dim` i `num_classes` oraz produkuje wektor o rozmiarze `1` z wartościami w zakresie \[0, 1\]. Przykladową implementacją są warstwy nn.Linear \[input_size + num_classes, 128, 128, 64, 1] z aktywacjami nn.LeakyReLU oraz nn.Sigmoid na końcu.

In [None]:
class ConditionalGenerator(torch.nn.Module):

    def __init__(self, latent_dim: int, out_dim: int, num_classes: int):
        super(ConditionalGenerator, self).__init__()
        self.latent_dim = latent_dim
        self.out_dim = out_dim
        self.num_classes = num_classes

        self.model = ...

    def forward(self, x, one_hot_label):
        x = torch.cat((x, one_hot_label), dim=1)  # może być zmienione
        return self.model(x)

In [None]:
class ConditionalDiscriminator(nn.Module):

    def __init__(self, input_size: int, num_classes:int):
        super(ConditionalDiscriminator, self).__init__()
        self.input_size = input_size
        self.num_classes = num_classes

        self.model = ...

    def forward(self, x, one_hot_label):
        x = torch.cat((x, one_hot_label), dim=1) # może być zmienione
        return self.model(x)

In [None]:
# Hyper Parameters
epochs: int = 25
g_lr: float = 1e-4
d_lr: float = 1e-4
latent_dim = 64

# models
generator = ConditionalGenerator(latent_dim, 784, 10).to(device)
discriminator = ConditionalDiscriminator(784, 10).to(device)

# optimizers
g_optimizer = torch.optim.Adam(generator.parameters(), lr=g_lr)
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=d_lr)

criterion = nn.BCELoss()

for epoch in range(epochs):
    epoch_losses = []  # For logging purposes
    for step, (x, y) in enumerate(train_loader):
        real_x= x.to(device)
        y_one_hot = torch.nn.functional.one_hot(y, 10).to(torch.float32).to(device)
        batch_size = real_x.shape[0]

        # Train discriminator
        d_optimizer.zero_grad()

        real_d_pred = ...
        real_discriminator_loss  = criterion(real_d_pred, torch.ones((batch_size, 1), device=device))

        noise = torch.randn((batch_size, latent_dim), device=device)
        fake_x = ...
        fake_d_pred = ...
        fake_discriminator_loss = criterion(fake_d_pred, torch.zeros((batch_size, 1), device=device))

        discriminator_loss = real_discriminator_loss + fake_discriminator_loss
        discriminator_loss.backward()
        d_optimizer.step()

        # Train generator
        g_optimizer.zero_grad()

        noise = torch.randn((batch_size, latent_dim), device=device)
        gen_x = ...
        gen_d_pred = ...
        generator_loss = criterion(gen_d_pred, torch.ones((batch_size, 1), device=device))
        generator_loss.backward()
        g_optimizer.step()

        epoch_losses.append(np.array([discriminator_loss.item(), generator_loss.item()]))

    epoch_losses_np = np.stack(epoch_losses, axis=0)

    print(f"Epoch: {epoch}  |  total loss: {epoch_losses_np.mean():.4f} |  disc_loss: {epoch_losses_np[:, 0].mean():.4f} | gen_loss: {epoch_losses_np[:, 1].mean():.4f}")

    if epoch % 5 == 0:
        plot_dataset(train_data, generator, is_cond=True)

In [None]:
plot_dataset(train_data, generator, is_cond=True, n_samples=10, show_train=False)  # generuje wszystkie klasy