## Zad 1.
Poniżej mają Państwo implementację, trening oraz generowanie z przestrzeniu latent prostego modelu `Real NVP` w PyTorch, zdolnego do nauczenia się i generowania próbek. Proszę wykonać poniższe instrukcje:
- zaimplementuj funkcję `inverse` z klasy `AffineCoupling`, która musi odwrócić transformację z funkcji `forward`,
- sprawdź funkcję `sigmoid` w modelu `self.net_s` zamiast funkcji `tanh`,
- dobierz odpowienie parametry uczenia się modelu:
    - learning rate,
    - wpływ liczby bloków,
    - zmień rozkład bazowy $p_z$ ze standardowego rozkładu normalnego na rozkład jednostajny i sprawdź, jak wpływa to na proces uczenia i wyniki.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torch.distributions import MultivariateNormal


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class AffineCoupling(nn.Module):
    """
    Affine Coupling Layer (kluczowy element Real NVP).
    Wykonuje transformację: y = [x1, x2 * exp(s(x1)) + t(x1)]
    gdzie x1 to połowa wektora wejściowego, a s i t to skalar i translacja.
    """
    def __init__(self, in_features, hidden_features, mask):
        super().__init__()
        self.mask = mask
        
        # Sieci neuronowe s (skala) i t (translacja), które biorą maskowaną część wejścia
        # i generują parametry dla drugiej części.
        self.net_s = nn.Sequential(
            nn.Linear(in_features // 2, hidden_features),
            nn.ReLU(),
            nn.Linear(hidden_features, hidden_features),
            nn.ReLU(),
            nn.Linear(hidden_features, in_features // 2),
            nn.Tanh()     # <---  sprawdzić sigmoidę
        )
        
        self.net_t = nn.Sequential(
            nn.Linear(in_features // 2, hidden_features),
            nn.ReLU(),
            nn.Linear(hidden_features, in_features // 2)
        )

    def forward(self, x):
        """ Transformacja do przodu (z x do z), używana do treningu. """
        # Zmienna log_det_jacobian będzie sumą log|det(J)| dla każdego bloku.
        log_det_jacobian = 0.0
        
        x1 = (x @ self.mask).view(x.shape[0], -1)  # Nie transformowana część
        x2 = (x @ (1 - self.mask)).view(x.shape[0], -1) # Transformowana część
        
        s = self.net_s(x1)
        t = self.net_t(x1)
        
        z2_transformed = x2 * torch.exp(s) + t

        temp = (torch.arange(self.mask.numel(), device=x.device, dtype=x.dtype).tile(x.shape[0], 1) @ self.mask).view(x.shape[0], -1).long()
        
        z = torch.zeros_like(x)
        z.scatter_(1, temp, x1)

        temp = (torch.arange(self.mask.numel(), device=x.device, dtype=x.dtype).tile(x.shape[0], 1) @ (1 - self.mask)).view(x.shape[0], -1).long()
        z.scatter_(1, temp, z2_transformed)
        
        # log|det(J)| = suma log(exp(s)) = suma s
        # Ponieważ transformacja ma formę y = [x1, exp(s)*x2 + t], jakobian jest macierzą trójkątną.
        # Determinanta to iloczyn elementów na przekątnej, a logarytm z determinanty to suma logarytmów.
        log_det_jacobian += torch.sum(s, dim=1)
        
        return z, log_det_jacobian

    def inverse(self, z):
        """ Transformacja odwrotna (z z do x), używana do generowania próbek. """
        pass


class RealNVP(nn.Module):
    """
    Główny model Normalizing Flow, składający się z sekwencji bloków Affine Coupling.
    """
    def __init__(self, in_features, hidden_features, num_blocks):
        super().__init__()
        
        # Rozkład bazowy (prior), z którego losujemy próbki w przestrzeni 'z'.
        # Zwykle używa się standardowego rozkładu normalnego (Gaussowski).
        self.base_dist = MultivariateNormal(
            loc=torch.zeros(in_features).to(device),
            covariance_matrix=torch.eye(in_features).to(device)
        )
        
        self.flows = nn.ModuleList()
        # Tworzenie sekwencji bloków Affine Coupling
        for i in range(num_blocks):
            # Maska naprzemienna, aby każda cecha miała szansę być transformowana.
            # np. dla dim=2, maska to [1, 0] lub [0, 1]
            # [1, 0]: transformuje drugą cechę bazując na pierwszej.
            # [0, 1]: transformuje pierwszą cechę bazując na drugiej.
            mask = torch.zeros(in_features)
            mask[i % in_features::2] = 1.0 # Naprzemienne maskowanie co 2-gi blok
            mask = mask.to(device)
            
            self.flows.append(
                AffineCoupling(in_features, hidden_features, mask)
            )

    def forward(self, x):
        """
        Trening: Przekształcenie danych wejściowych x do przestrzeni bazowej z
        i obliczenie log-wiarygodności (log-likelihood).
        """
        log_prob = 0.0
        z = x
        
        # Iteracja przez wszystkie bloki transformacji (Flows)
        for flow in self.flows:
            z, log_det_jacobian = flow(z)
            log_prob += log_det_jacobian
            
        # P(x) = P(z) * |det(dz/dx)|^-1  -> log P(x) = log P(z) - log|det(dz/dx)|
        # log|det(dx/dz)| = -log|det(dz/dx)|
        # W Real NVP transformacja jest z x do z: dx/dz = (dz/dx)^-1
        # log|det(dz/dx)| = log_det_jacobian (zwracany przez flow)
        # Ostateczna log-wiarygodność:
        log_prob += self.base_dist.log_prob(z)
        
        return log_prob.mean(), z

    def sample(self, num_samples):
        """
        Generowanie próbek: Losowanie z z i przekształcenie z do przestrzeni danych x.
        """
        # Losowanie z rozkładu bazowego (z)
        z = self.base_dist.sample((num_samples,))
        
        # Iteracja przez bloki transformacji w odwrotnej kolejności
        x = z
        for flow in reversed(self.flows):
            x = flow.inverse(x)
            
        return x


def generate_data(name, num_samples=1000):
    """
    Generuje dane spiralne lub pierścieniowe.
    """
    if name == 'spiral':
        t = np.linspace(0, 4 * np.pi, num_samples)
        x = t * np.cos(t) + np.random.normal(0, 0.2, num_samples)
        y = t * np.sin(t) + np.random.normal(0, 0.2, num_samples)
        data = np.stack([x, y], axis=1)
    elif name == 'ring':
        t = np.random.uniform(0, 2 * np.pi, num_samples)
        r = 10 + np.random.normal(0, 0.5, num_samples)
        x = r * np.cos(t)
        y = r * np.sin(t)
        data = np.stack([x, y], axis=1)
    else:
        raise ValueError("Nieznany typ danych.")

    # Konwersja do tensora PyTorch
    data = torch.tensor(data, dtype=torch.float32).to(device)
    return data



def train_and_visualize(data_type='spiral', num_blocks=4, num_epochs=5000):
    """
    Funkcja trenująca i wizualizująca wyniki dla danego typu danych.
    """
    print(f"\n--- Trening dla danych: {data_type.upper()} ---")
    data = generate_data(data_type)
    
    # Inicjalizacja modelu
    in_features = 2
    hidden_features = 64
    model = RealNVP(in_features, hidden_features, num_blocks).to(device)
    
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    
    # Pętla treningowa
    for epoch in range(1, num_epochs + 1):
        model.train()
        optimizer.zero_grad()
        
        # Obliczenie straty (maksymalizacja log-wiarygodności)
        # PyTorch domyślnie minimalizuje straty, więc minimalizujemy ujemną log-wiarygodność.
        log_likelihood, _ = model(data)
        loss = -log_likelihood 
        
        loss.backward()
        optimizer.step()
        
        if epoch % 500 == 0:
            print(f'Epoka {epoch}/{num_epochs}, Ujemna Log-Wiarygodność (Loss): {loss.item():.4f}')

    # Wizualizacja wyników
    model.eval()
    with torch.no_grad():
        # Generowanie nowych próbek
        samples = model.sample(1000).cpu().numpy()
        
        # Wizualizacja transformacji x -> z
        _, z_transformed = model(data)
        z_transformed = z_transformed.cpu().numpy()
        
    plt.figure(figsize=(12, 5))

    # Oryginalne dane
    plt.subplot(1, 3, 1)
    plt.scatter(data.cpu().numpy()[:, 0], data.cpu().numpy()[:, 1], s=10)
    plt.title(f'1. Oryginalne dane ({data_type.capitalize()})')
    plt.xlabel('x1'); plt.ylabel('x2')
    plt.grid(True, linestyle='--')
    
    # Przestrzeń latent (z)
    plt.subplot(1, 3, 2)
    plt.scatter(z_transformed[:, 0], z_transformed[:, 1], s=10)
    plt.title('2. Transformacja do przestrzeni bazowej (z)')
    plt.xlabel('z1'); plt.ylabel('z2')
    plt.xlim(-5, 5); plt.ylim(-5, 5)
    plt.grid(True, linestyle='--')

    # Wygenerowane próbki
    plt.subplot(1, 3, 3)
    plt.scatter(samples[:, 0], samples[:, 1], s=10)
    plt.title('3. Wygenerowane próbki (Flow)')
    plt.xlabel('x1'); plt.ylabel('x2')
    plt.grid(True, linestyle='--')
    
    plt.tight_layout()
    plt.show()
    


# Uruchomienie treningu dla danych pierścieniowych
train_and_visualize(data_type='ring', num_blocks=8, num_epochs=10000)

# Uruchomienie treningu dla danych spiralnych
# train_and_visualize(data_type='spiral', num_blocks=8, num_epochs=10000)


## Zad 2.
Korzystając z kodów z [repozytorium GitHub](https://github.com/GSavathrakis/Glow-pytorch/tree/main), naucz model `GLOW` na zbiorze mnist. Po nauczeniu modelu wygeneruj kilka obrazków z przestrzeni latent. Użyj funkcji [t-sne](https://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html) do wizualizacji przestrzeni latentej na podstawie punktów reprezentujących dane (obrazy), stosując różne kolory dla poszczególnych klas.  

## Zad 3.
Przeanalizuj metodę flow-matching na podstawie podanego [jupyter notebooka](https://github.com/rfangit/analytical_flow_matching/blob/main/2D%20Examples/Linear%20Schedule%20-%20Other%20Distributions/Linear%20Schedule%20-%20Checkerboard.ipynb). Powtórz eksperymenty w nim zawarte.