In [44]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
import yaml

In [45]:
# Carica configurazione
with open('../configs/config.yaml', 'r') as f:
    config = yaml.safe_load(f)

T = config['diffusion']['T']
beta_start = config['diffusion']['beta_start']
beta_end = config['diffusion']['beta_end']
batch_size = 64
learning_rate = 1e-4
epochs = 50

In [46]:
class TimeEmbedding(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
        self.embedding = nn.Sequential(
            nn.Linear(dim, dim),
            nn.SiLU(),
            nn.Linear(dim, dim)
        )
    
    def get_sinusoidal_embedding(self, t):
        t = t.float()
        half_dim = self.dim // 2
        freqs = torch.arange(half_dim, dtype=torch.float32)
        freqs = 10000 ** (-freqs / half_dim)
        angles = t[:, None] * freqs[None, :]
        emb = torch.cat([torch.sin(angles), torch.cos(angles)], dim=-1)
        return emb
    
    def forward(self, t):
        emb = self.get_sinusoidal_embedding(t)
        emb = self.embedding(emb)
        return emb

In [50]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels): # canali in input = 1 (immmagini MNIST) e out_channels ad es = 64 per il primo livello
        super().__init__() # chiamo il costruttore della classe genitore nn.Module
        self.conv = nn.Sequential( # inizio a definire la sequenza di layer
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.GroupNorm(8, out_channels), # divido i canali in 8 gruppi, migliorando la stabilità del training
            nn.SiLU(), # applica la funzione di attivazione SiLu
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1), # definisco la seconda convoluzione 2d del blocco
            nn.GroupNorm(8, out_channels),
            nn.SiLU(),
        )
    # logica del passaggio dell'input attraverso il blocco
    def forward(self, x): # x è un tensore di forma [batch_size, in_channels, height, width]
        return self.conv(x)

In [51]:
# UNet sarà la rete neurale principale del DDPM, responsabile di prevedere il rumore 
class UNet(nn.Module):
    # in_channels e out_channels hanno stesso valore perchè l'output da prevedere ha la stessa forma dell'input
    def __init__(self, in_channels=1, out_channels=1, base_channels=64, time_dim=128): # base_channels è il numero di canali nel primo blocco (tale valore aumenta in downsampling)
        super().__init__()
        self.time_dim = time_dim # dimensione dell'embedding
        self.time_emb = TimeEmbedding(time_dim) # trasformo il time-step t in un embedding
        self.enc1 = ConvBlock(in_channels, base_channels) # primo blocco convoluzionale dell'encoder
        self.pool1 = nn.MaxPool2d(2) # da 28x28 riduco la risoluzione spaziale a 14x14
        self.enc2 = ConvBlock(base_channels, base_channels * 2) # 128, 14x14
        self.pool2 = nn.MaxPool2d(2) # da 14x14 riduco la risoluzione a 7x7
        self.enc3 = ConvBlock(base_channels * 2, base_channels * 4)
        self.bottleneck = ConvBlock(base_channels * 4, base_channels * 4)
        self.up1 = nn.ConvTranspose2d(base_channels * 4, base_channels * 2, kernel_size=2, stride=2) # aumento la risoluzione a 14x14 e dimezzo il numero di canali, primo layer di upsampling
        self.dec1 = ConvBlock(base_channels * 4, base_channels * 2) # primo blocco convoluzionale del decoder
        self.up2 = nn.ConvTranspose2d(base_channels * 2, base_channels, kernel_size=2, stride=2) # aumento la risoluzione e dimezzo il numero di canali
        self.dec2 = ConvBlock(base_channels * 2, base_channels)
        self.out = nn.Conv2d(base_channels, out_channels, kernel_size=3, padding=1) # layer finale per produrre l'output
        self.time_proj1 = nn.Linear(time_dim, base_channels) # embedding da proiettare per il livello 1
        self.time_proj2 = nn.Linear(time_dim, base_channels * 2) # embedding da proiettare per il livello 2
        self.time_proj3 = nn.Linear(time_dim, base_channels * 4) # embedding da proiettare per il livello 3
        
    def forward(self, x, t): # x = immagine rumorosa, t = time-step
        t_emb = self.time_emb(t)
        e1 = self.enc1(x) + self.time_proj1(t_emb)[:, :, None, None] # applico il primo blocco convoluzionale dell'encoder (feature + embedding temporale)
        d1 = self.pool1(e1) # applico il primo max pooling
        e2 = self.enc2(d1) + self.time_proj2(t_emb)[:, :, None, None]
        d2 = self.pool2(e2) # secondo max pooling
        e3 = self.enc3(d2) + self.time_proj3(t_emb)[:, :, None, None] # terzo blocco convoluzionale
        b = self.bottleneck(e3)
        u1 = self.up1(b) # primo upsampling
        u1 = torch.cat([u1, e2], dim=1) # concateno con le feature dell'encoder, skip connection
        d1 = self.dec1(u1) # primo blocco del decoder
        u2 = self.up2(d1)
        u2 = torch.cat([u2,e1], dim = 1) # concateno con e1 [64,28,28] cat [64,28,28] -> [128,28,28]
        d2 = self.dec2(u2) # secondo blocco del decoder
        out = self.out(d2) # layer finale
        return out

In [52]:
unet = UNet()  # Inizializzo la U-Net
x = torch.randn(4, 1, 28, 28)  # Batch di 4 immagini MNIST
t = torch.tensor([0, 100, 500, 999])  # Timestep
out = unet(x, t)  # Passo input e timestep alla U-Net
print(f"Forma dell'output: {out.shape}")  # Dovrebbe essere [4, 1, 28, 28]

Forma dell'output: torch.Size([4, 1, 28, 28])
