# <center>Arquitectura U-Net para modelos de difusión</center>

In [1]:
import torch
from torch import nn

## Bloques de la U-Net

### Embedding temporal

Un tiempo $t$ será codificado en un vector de dimensión $d$ dado por la concatenación de dos vectores $\sin(u(t)),\,\cos(u(t))\in\mathbb{R}^\frac{d}{2}$, donde las operaciones $\sin$ y $\cos$ son aplicadas elemento a elemento y $u(t)\in\mathbb{R}^\frac{d}{2}$ es el vector definido por:

$$
u(t)_i = \frac{t}{10000^{\frac{i}{\frac{d}{2} - 1}}}
$$

Posterior a esto, dicho vector será pasado a través de 2 capas fully connected para entregar una codificación temporal con el número de canales (dimensiones) pedida.

In [2]:
class TimeEmbedding(nn.Module):

    def __init__(self, n_channels: int):
        '''
        Parameters:
            - n_channels: número de dimensiones del embedding.
        '''

        assert n_channels % 8 == 0, 'la cantidad de canales debe ser divisible por 8.'

        super().__init__()
        
        self.n_channels = n_channels
        self.fc1 = nn.Linear(self.n_channels // 4, self.n_channels)
        self.fc2 = nn.Linear(self.n_channels, self.n_channels)
        self.activation = nn.SiLU()


    def forward(self, t: torch.Tensor):
        '''
        Parameters:
            - t[batch_size]: batch de posiciones temporales.
        
        Returns:
            - embedding[batch_size, n_channels]: embedding del batch temporal.
        '''

        d = self.n_channels // 4
        i = torch.arange(d/2)
        u_t = 10_000 ** (- i / (d/2 - 1))
        u_t = t[:, None] * u_t[None,  :]

        embedding = torch.cat((u_t.sin(), u_t.cos()), dim=1)
        embedding = self.activation(self.fc1(embedding))
        embedding = self.fc2(embedding)
        
        return embedding

In [3]:
# Test:
batch_size, n_channels = 128, 32

time_embedding = TimeEmbedding(n_channels)
t = torch.randint(0, 10000, size=[batch_size])
embedding = time_embedding(t)

assert embedding.shape == torch.Size([batch_size, n_channels])

### Bloque residual

Cada bloque residual está compuesto por dos convoluciones (que preservan la resolución) seguidas de una conexión residual con el input del bloque. Las convoluciones seguirán el orden `normalización -> activación -> convolución`, y para la segunda convolución se agregará `dropout` entre la activación y la convolución.

Por otra parte, el embedding temporal será sumado entre medio de las dos convoluciones. Para esto, se proyectará el embedding temporal para que la cantidad de canales del embedding temporal coincida con la cantidad de canales de la convolución y así poder realizar la suma.

Dado que la cantidad de canales en la entrada y la salida del bloque no necesariamente deben coincidir, al final del forward se proyectarán los canales de la entrada para poder realizar la conexión residual.

In [4]:
class ResidualBlock(nn.Module):

    def __init__(self, in_channels: int, out_channels: int, time_channels: int, n_groups: int = 32, dropout: float = 0.1):
        '''
        Parameters:
            - in_channels: cantidad de canales en la entrada.
            - out_channels: cantidad de canales en la salida.
            - time_channels: cantidad de canales para el embedding temporal.
            - n_groups: cantidad de grupos para group normalization.
            - dropout: tasa de dropout.
        '''

        super().__init__()

        assert in_channels % n_groups == out_channels % n_groups == 0, 'la cantidad de canales debe ser divisible por n_groups.'

        # Convolución 1:
        self.norm1 = nn.GroupNorm(n_groups, in_channels)
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, padding=1)

        # Convolución 2:
        self.norm2 = nn.GroupNorm(n_groups, out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)

        # Proyector de canales para la conexión residual:
        if in_channels != out_channels:
            self.residual_projector = nn.Conv2d(in_channels, out_channels, 1)
        else:
            self.residual_projector = nn.Identity()

        # Proyector de canales para el embedding temporal:
        self.time_projector = nn.Linear(time_channels, out_channels)

        self.activation = nn.SiLU()
        self.dropout = nn.Dropout(dropout)


    def forward(self, input: torch.Tensor, time_embedding: torch.Tensor):
        '''
        Parameters:
            - input[batch_size, in_channels, height, width]: batch de entrada.
            - time_embedding[batch_size, time_channels]: batch de embedding temporales.
        
        Returns:
            - output[batch_size, out_channels, height, width]: batch de salida.
        '''

        output = self.conv1(self.activation(self.norm1(input)))
        output += self.time_projector(self.activation(time_embedding))[:, :, None, None]
        output = self.conv2(self.dropout(self.activation(self.norm2(output))))
        output += self.residual_projector(input)
        return output

In [5]:
# Test:
in_channels, out_channels, time_channels = 32, 64, 5
batch_size, height, width = 128, 32, 32

residual_block = ResidualBlock(in_channels, out_channels, time_channels)
input = torch.randn([batch_size, in_channels, height, width])
time_embedding = torch.randn([batch_size, time_channels])
output = residual_block(input, time_embedding)

assert output.shape == torch.Size([batch_size, out_channels, height, width])

### Bloque de self-attention

En esta sección se implementará un módulo multicabezal de self-attention cuya estructura será similar a la que se propone en el paper de Transformers. Este módulo será aplicado luego de algunos bloques residuales.

Dado que en este caso se trabajará con imágenes (tensores de rango 3) en vez de secuencias (tensores de rango 2), las dimensiones de alto y ancho serán aplanadas en un vector para así representar una secuencia de largo igual a la cantidad pixeles de la imagen. Con esto, cada tiempo de la secuencia estará representado por un vector de largo igual a la cantidad de canales de la imagen.

Dicho de otra forma una, una secuencia genérica se representa por una matriz $X\in\mathcal{M}_{n,d}(\mathbb{R})$, donde $n$ el largo de la secuencia y $d$ es la dimensión de cada elemento de la secuencia. Al interpretar una imagen como secuencia (para así usar self-attention) se tendrá que $n=\text{alto}\cdot\text{ancho}$ y $d=\text{número de canales}$.

En un cabezal de self-attention convencional, la entrada $X\in\mathcal{M}_{n,d}(\mathbb{R})$ en proyectada 3 veces para formar 3 matrices $Q,\,K,\,V\in\mathcal{M}_{n,d_k}(\mathbb{R})$ (i.e., para formar cada una de estas matrices, los elementos de la secuencia (filas de $X$) son pasados por una capa lineal). Luego, cada cabezal de atención $H$ queda definido mediante 

$$
H = \text{softmax}_{\text{por filas}}\left(\frac{QK^\top}{\sqrt{d_k}}\right)V
$$

Finalmente, un módulo de self-attention consiste en varios cabezales concatenados y luego proyectados para reducir la dimensión de la concatenación.

In [6]:
class AttentionBlock(nn.Module):

    def __init__(self, n_channels: int, n_heads: int = 1, d_k: int = None, n_groups: int = 32):
        '''
        Parameters:
            - n_channels: cantidad de canales en la entrada.
            - n_heads: cantidad de cabezales.
            - d_k: dimensión de las matrices Q, K, V y de la salida de cabezal.
            - n_groups: cantidad de grupos para group normalization.
        '''

        super().__init__()

        if d_k is None:
            d_k = n_channels
        
        self.n_heads = n_heads
        self.d_k = d_k

        self.norm = nn.GroupNorm(n_groups, n_channels)

        # Proyecciones para las matrices Q, K, V de todos los cabezales:
        self.qkv_projection = nn.Linear(n_channels, n_heads * d_k * 3)
        
        # Proyección para la concatenación de cabezales:
        self.final_projection = nn.Linear(d_k * n_heads, n_channels)


    def forward(self, input: torch.Tensor):
        '''
        Parameters:
            - input[batch_size, n_channels, height, width]: batch de entrada.
        
        Returns:
            - output[batch_size, n_channels, height, width]: salida del módulo de self-attention.
        '''

        batch_size, n_channels, height, width = input.shape
        seq_length = height * width

        # Transformar el input en un batch de secuencias ([batch_size, seq_length, n_channels]):
        input = input.view(batch_size, n_channels, seq_length).permute(0, 2, 1)

        # Matrices Q, K, V (de todos los cabezales):
        qkv = self.qkv_projection(input)  # [batch_size, seq_length, n_heads * d_k * 3].
        qkv = qkv.view(batch_size, seq_length, self.n_heads, 3 * self.d_k)
        q, k, v = qkv.split(self.d_k, dim=-1)  # cada tensor de tamaño [batch_size, seq_length, n_heads, d_k].
        
        # Producto externo ([QK^T]_ij = <fila_i(Q), fila_j(K)>) y softmax:
        attn = torch.einsum('bihd,bjhd->bijh', q, k)  # [batch_size, seq_length, seq_length, h_heads].
        attn = torch.softmax(attn / (self.d_k ** 0.5), dim=2)

        # Producto atención-values:
        output = torch.einsum('bijh,bjhd->bihd', attn, v)  # [batch_size, seq_length, h_heads, d_k].
        
        # Reshape en forma de cabezales concatenados:
        output = output.permute(0, 1, 3, 2)  # [batch_size, seq_length, d_k, h_heads].
        output = output.reshape(batch_size, seq_length, self.d_k * self.n_heads)

        # Proyección final y conexión residual:
        output = self.final_projection(output)  # [batch_size, seq_length, n_channels].
        output += input

        # Recuperar tamaño como batch de imágenes:
        output = output.permute(0, 2, 1)  # [batch_size, n_channels, seq_length].
        output = output.view(batch_size, n_channels, height, width)

        return output

In [7]:
# Test:
n_channels, n_heads, d_k = 64, 2, 8
batch_size, height, width = 128, 32, 32

attention_block = AttentionBlock(n_channels, n_heads, d_k)
input = torch.randn([batch_size, n_channels, height, width])

assert attention_block(input).shape == torch.Size([batch_size, n_channels, height, width])

### Bloques principales de la U-Net

En esta sección se crearán clases simples para los bloques principales que se observan en la arquitectura U-Net.

#### Bloques `ResidualBlock` + `AttentionBlock`

Los bloques descendentes y ascendentes de la U buscan cambiar la cantidad de canales del input, sin cambiar la resolución de las imágenes. Ambos tipos de bloque son de la forma `bloque residual -> self-attention`. En la inicialización de estos bloques se debe indicar si usar self-attention ya que no todos los bloques de la U-Net lo utilizan.

In [8]:
class MainBlock(nn.Module):

    def __init__(self, in_channels: int, out_channels: int, time_channels: int, has_attn: bool):
        '''
        Parameters:
            - in_channels: cantidad de canales en la entrada.
            - out_channels: cantidad de canales en la salida.
            - time_channels: cantidad de canales para el embedding temporal.
            - has_attn: indica si el módulo aplicará atención luego del bloque residual.
        '''

        super().__init__()

        self.residual_block = ResidualBlock(in_channels, out_channels, time_channels)
        if has_attn:
            self.attention_block = AttentionBlock(out_channels)
        else:
            self.attention_block = nn.Identity()


    def forward(self, input: torch.Tensor, time_embedding: torch.Tensor):
        '''
        Parameters:
            - input[batch_size, in_channels, height, width]: batch de entrada.
            - time_embedding[batch_size, time_channels]: batch de embedding temporales.
        
        Returns:
            - output[batch_size, out_channels, height, width]: batch de salida.
        '''
        
        output = self.residual_block(input, time_embedding)
        output = self.attention_block(output)
        
        return output

In [9]:
# Test:
in_channels, out_channels, time_channels, has_attn = 96, 64, 5, True
batch_size, height, width = 128, 32, 32

main_block = MainBlock(in_channels, out_channels, time_channels, has_attn)
input = torch.randn([batch_size, in_channels, height, width])
time_embedding = torch.randn([batch_size, time_channels])
output = main_block(input, time_embedding)

assert output.shape == torch.Size([batch_size, out_channels, height, width])

El bloque intermedio de la U es de la forma `bloque residual -> self-attention -> bloque residual`:

In [10]:
class MiddleBlock(nn.Module):

    def __init__(self, n_channels: int, time_channels: int):
        '''
        Parameters:
            - n_channels: cantidad de canales en la entrada y en la salida.
            - time_channels: cantidad de canales para el embedding temporal.
        '''

        super().__init__()

        self.residual_block1 = ResidualBlock(n_channels, n_channels, time_channels)
        self.attention_block = AttentionBlock(n_channels)
        self.residual_block2 = ResidualBlock(n_channels, n_channels, time_channels)


    def forward(self, input: torch.Tensor, time_embedding: torch.Tensor):
        '''
        Parameters:
            - input[batch_size, n_channels, height, width]: batch de entrada.
            - time_embedding[batch_size, time_channels]: batch de embedding temporales.
        
        Returns:
            - output[batch_size, n_channels, height, width]: batch de salida.
        '''

        output = self.residual_block1(input, time_embedding)
        output = self.attention_block(output)
        output = self.residual_block2(input, time_embedding)

        return output

In [11]:
# Test:
n_channels, time_channels = 32, 5
batch_size, height, width = 128, 32, 32

middle_block = MiddleBlock(n_channels, time_channels)
input = torch.randn([batch_size, n_channels, height, width])
time_embedding = torch.randn([batch_size, time_channels])
output = middle_block(input, time_embedding)

assert output.shape == torch.Size([batch_size, n_channels, height, width])

#### Bloques de downsampling y upsampling

Estos bloques se utilizarán para cambiar la resolución de las imágenes. El bloque `DecreaseResolution` disminuirá el alto y ancho por 2, mientras que el bloque `IncreaseResolution` amplificará por 2. En algunas variantes de esta arquitectura se utiliza `MaxPool2d` para el downsampling (en vez de convolución) y `Upsample` para el upsampling (en vez de deconvolución). Estos métodos aceleran el procesamiento y simplifican el modelo ya que no tienen parámetros aprendibles.

Si bien estos bloques consisten únicamente en una convolución (o convolución transpuesta) y podrían introducirse directamente en la clase principal de la U-Net, aquí se modifica el forward para que también pueda recibir el embedding temporal (el cual no será usado). Esto permitirá poder trabajar los bloques `DecreaseResolution` e `IncreaseResolution` igual que los bloques `MainBlock` y `MiddleBlock` en el forward de la U-Net.

In [12]:
class DecreaseResolution(nn.Module):

    def __init__(self, n_channels):
        '''
        Parameters:
        - n_channels: cantidad de canales en la entrada y en la salida,
        '''

        super().__init__()

        self.conv = nn.Conv2d(n_channels, n_channels, 3, stride=2, padding=1)


    def forward(self, x: torch.Tensor, time_embedding: torch.Tensor):
        '''
        Parameters:
            - input[batch_size, n_channels, height, width]: batch de entrada.
            - time_embedding[batch_size, time_channels]: batch de embedding temporales.
        
        Returns:
            - output[batch_size, n_channels, height / 2, width / 2]: batch de salida.
        '''
        
        return self.conv(x)

In [13]:
# Test:
n_channels, time_channels = 32, 5
batch_size, height, width = 128, 32, 32

decrease_resolution = DecreaseResolution(n_channels)
input = torch.randn([batch_size, n_channels, height, width])
time_embedding = torch.randn([batch_size, time_channels])
output = decrease_resolution(input, time_embedding)

assert output.shape == torch.Size([batch_size, n_channels, height // 2, width // 2])

El módulo de upsampling es igual al anterior pero usando una convolución transpuesta:

In [14]:
class IncreaseResolution(nn.Module):

    def __init__(self, n_channels):
        '''
        Parameters:
        - n_channels: cantidad de canales en la entrada y en la salida,
        '''

        super().__init__()

        self.conv = nn.ConvTranspose2d(n_channels, n_channels, 4, stride=2, padding=1)


    def forward(self, x: torch.Tensor, time_embedding: torch.Tensor):
        '''
        Parameters:
            - input[batch_size, n_channels, height, width]: batch de entrada.
            - time_embedding[batch_size, time_channels]: batch de embedding temporales.
        
        Returns:
            - output[batch_size, n_channels, height * 2, width * 2]: batch de salida.
        '''
        
        return self.conv(x)

In [15]:
# Test:
n_channels, time_channels = 32, 5
batch_size, height, width = 128, 32, 32

increase_resolution = IncreaseResolution(n_channels)
input = torch.randn([batch_size, n_channels, height, width])
time_embedding = torch.randn([batch_size, time_channels])
output = increase_resolution(input, time_embedding)

assert output.shape == torch.Size([batch_size, n_channels, height * 2, width * 2])

## U-Net

In [16]:
class UNet(nn.Module):

    def __init__(self, image_channels: int = 3, inital_channels: int = 64, channel_factors: tuple = (1, 2, 2, 2),
                 has_attn: tuple = (False, False, False, False), n_blocks: int = 2):
        '''
        Parameters:
            - image_channels: cantidad de canales de las imágenes.
            - inital_channels: cantidad de canales luego de la proyección inicial.
            - channel_factors: factores de reducción/amplificación de la cantidad de canales.
            - has_attn: indica si se usará o no atención en los bloques residuales de cada factor.
            - n_blocks: cantidad de bloques principales en cada factor.
        '''
        super().__init__()

        time_channels = inital_channels * 4
        self.n_blocks = n_blocks
        self.time_embedding = TimeEmbedding(time_channels)

        # Proyección inicial:
        self.image_projection = nn.Conv2d(image_channels, inital_channels, kernel_size=3, padding=1)
        
        # Parte descendente:
        self.down_blocks = nn.ModuleList()
        in_channels = inital_channels
        for i, factor in enumerate(channel_factors):

            out_channels = in_channels * factor
            for _ in range(n_blocks):
                self.down_blocks.append(MainBlock(in_channels, out_channels, time_channels, has_attn[i]))
                in_channels = out_channels

            self.down_blocks.append(DecreaseResolution(out_channels))

        # Parte media:
        self.middle = MiddleBlock(out_channels, time_channels)

        # Parte ascendente:
        self.up_blocks = nn.ModuleList()

        for i, factor in enumerate(reversed(channel_factors)):

            self.up_blocks.append(IncreaseResolution(in_channels))

            for n in range(n_blocks):
                if n == 0:
                    in_channels *= 2  # primer bloque recibe skip conection.
                elif n == n_blocks - 1:
                    out_channels //= factor  # último bloque reduce resolución.
                
                self.up_blocks.append(MainBlock(in_channels, out_channels, time_channels, has_attn[i]))
                in_channels = out_channels
            
        # Capas finales:
        self.last_normalization = nn.GroupNorm(8, inital_channels)
        self.last_activation = nn.SiLU()
        self.projection_to_image = nn.Conv2d(in_channels, image_channels, 3, padding=1)


    def forward(self, input, times):
        '''
        Parameters:
            - input[batch_size, in_channels, height, width]: batch de imágenes.
            - times[batch_size]: tiempos asociados a cada imagen.
        
        Returns:
            - output[batch_size, in_channels, height, width]: salida de la U-Net.
        '''

        time_embedding = self.time_embedding(times)
        input = self.image_projection(input)

        # Encoding:
        down_outputs = [input]
        for i, block in enumerate(self.down_blocks, 2):
            input = block(input, time_embedding)
            if i % (self.n_blocks + 1) == 0:
                down_outputs.append(input)

        # Centro:
        input = self.middle(input, time_embedding)

        # Decoding:
        for i, block in enumerate(self.up_blocks, 1):
            if i % (self.n_blocks + 1) == 2:
                down_connection = down_outputs.pop()
                input = torch.cat((input, down_connection), dim=1)
            input = block(input, time_embedding)

        input = self.projection_to_image(self.last_activation(self.last_normalization(input)))
        return input

In [17]:
batch_size, in_channels, height, width = 2, 3, 512, 512

unet = UNet()
input = torch.randn([batch_size, in_channels, height, width])
times = torch.randn([batch_size])

assert unet(input, times).shape == torch.Size([batch_size, in_channels, height, width])