In [1]:
import math

import torch
from torch import nn

# Multi-Head Attention

En este notebook vamos a seguir la implementación de [LabML](https://nn.labml.ai/transformers/mha.html) para intentar entender cómo funciona este mecanismo.

## Preparación

In [2]:
class PrepareForMultiHeadAttention(nn.Module):

    def __init__(self, 
                 d_model, # Dimensión de entrada
                 heads, # Cantidad de "cabezas" en paralelo
                 d_k, # Dimensión de salida
                 bias):
        """
        Utilizaremos una capa lineal para proyectar los (Key, Value, Query)
        Tenemos que tener tantas salidas como cabezas*dimensión de salida.
        """
        
        super(PrepareForMultiHeadAttention, self).__init__()
        self.linear = nn.Linear(d_model, heads*d_k, bias=bias)
        self.heads = heads
        self.d_k = d_k

    def forward(self, x):
        """
        En el paso hacia delante, lo que tenemos que hacer es proyectar las entradas
        y separarlo en las cabezas.

        La entrada puede tener dimensiones [seq_len, batch, d_model] o [batch, d_model],
        así que aplicaremos la transformación a la última dimensión (d_model).
        Esto quiere decir que la salida de cada cabeza tiene que tener dimensiones
        [seq_len, batch, heads, d_k] o [batch, heads, d_k].
        """
        
        head_shape = x.shape[:-1]
        x = self.linear(x)
        x = x.view(*head_shape, self.heads, self.d_k)

## Multi-Head Attention

Aquí es dónde vamos a calcular la *scaled multi-head attention* para cualquier tupla `(query, key, value)`:

$$
Attention(Q, K, V) = \underset{seq}{softmax\left( \frac{QK^T}{\sqrt{d_k}} \right)}
$$

El producto escalar entre $Q$ y $K$ es una medida de similitud, por lo que esta operación se puede entender como buscar la `key` que más se parece a la `query` y obtener su `value`. Esta formulación se asemeja bastante a una búsqueda en una base de datos, donde introducimos aquello que queremos buscar (`query`) y se nos devuelven los elementos de la base de datos (`value`) cuyo indentificador `key` se parece más a la `query` inicial.

> También se puede hacer una analogía con introducir una `query` en un diccionario de Python y obtener los valores cuya `key` se parece más a la `query` introducida. 

Antes de aplicar la función $softmax$, el producto escalar de escala por un factor $\frac{1}{\sqrt{d_k}}$ para evitar que los valores muy grandes del producto escalar den gradientes muy pequeños cuando $d_k$ es grande. Esta $softmax$ se aplica en la dirección de la secuencia (o tiempo), es decir, nos sirve para pesar los diferentes momentos temporales. **(creo)**

> El resultado de la función $softmax$ tiene que sumar 1, por lo que, si $d_k$ es grande, este 1 hay que repartirlo entre muchos sumandos. Si hay un elemento que es mucho más grande que el resto, habrá elementos que sean prácticamente 0 y podríamos tener problemas con el gradiente.

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self, 
                 heads, # Cantidad de cabezas
                 d_model, # Dimensión de entrada
                 dropout_prob=0.1, # Probabilidad del Dropout
                 bias=True):
        
        ## En `PrepareForMultiHeadAttention` la dimensión de salida de las proyecciones
        ## lineales es d_k*heads, por lo que ahora tenemos que definir d_k = d_model//heads
        ## para que las proyecciones mantengan la dimensionalidad.
        self.d_k = d_model // heads
        self.heads = heads

        self.query = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=bias)
        self.key = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=bias)
        self.value = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=bias)

        self.softmax = nn.Softmax(dim=1) # Yo a priori habría dicho que la dimensión de la secuencia era la 0
        self.output = nn.Linear(d_model, d_model) # Hay una capa lineal al final de todo el proceso
        self.dropout = nn.Dropout(p=dropout_prob)
        
        self.scale = 1 / torch.sqrt(self.d_k) # Factor de escala

        self.attn = None # Se guardan las atenciones por lo que pueda pasar

    def get_scores(self,
                   query, # Vector Q
                   key): # Vector K
        """
        Aquí es donde calcularemos el producto escalar entre los vectores Q y K 
        para obtener una medida de su similitud. Una forma fácil de hacerlo es 
        utilizando `torch.einsum`.
        """        

        return torch.einsum('ibhd, jbhd -> ijbh', query, key)
    
    def prepare_mask(self,
                     mask, # Tiene shape [seq_len_q, seq_len_k, batch_size]
                     query_shape, # Shape del vector Q
                     key_shape): # Shape del vector K
        """
        Esta función nos sirve para asegurarnos de que la shape de la máscara
        se corresponde con las shapes de los vectores Q y K.

        Lo que nos indica la máscara es si la query i tiene acceso a la key j
        en el batch b. (mask[i,j,b] = 0/1)
        """
        assert mask.shape[0] == 1 or mask.shape[0] == query_shape[0]
        assert mask.shape[1] == key_shape[0]
        assert mask.shape[2] == 1 or mask.shape[2] == query_shape[1]

        ## Como queremos aplicar la misma máscara a todas las cabezas, 
        ## añadimos una dimensión al final para que se aplique broadcasting.
        mask = mask.unsqueeze(-1)
        return mask

    def forward(self,
                *,
                query, # Vector Q []
                key, # Vector K
                value, # Vector V
                mask = None): # Máscara
        
        ## Extraemos la longitud de la secuencia y el tamaño del batch
        seq_len, batch_size, _ = query.shape

        ## En caso de haber una máscara, la preparamos
        if mask is not None:
            self.prepare_mask(mask, query.shape, key.shape)

        ## Proyectamos los vectores Q, K y V antes de pasar por la MultiHeadAttention
        query = self.query(query)
        key = self.key(key)
        value = self.value(value)

        ## Obtenemos los scores de atención y los escalamos
        scores = self.get_scores(query, key)
        scores = scores*self.scale

        ## Aplicamos la máscara en caso de haberla
        if mask is not None:
            scores = scores.masked_fill(mask==0, float('-inf'))

        ## Aplicamos la función softmax para obtener la atención
        attn = self.softmax(scores)

        ## Y aplicamos también el Dropout
        attn = self.dropout(attn)

        ## Calculamos el producto escalar con el vector V
        x = torch.einsum('ijbh, jbhd -> ibhd', attn, value)

        ## Guardamos attn por lo que pueda pasar
        self.attn = attn.detach()

        ## Finalmente, concatenamos las salidas de todas las cabezas
        ## y pasamos el vector resultante por la capa de salida
        x = x.reshape(seq_len, batch_size, -1)

        return self.output(x)