In [34]:
from transformers import AutoTokenizer, AutoConfig
from keras import layers, activations
import tensorflow as tf
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1"

Cargamos un Tokenizer ya entrenado. En este caso usaremos Bert que agrega caracteres especiales como [CLS] y [SEP]:

In [35]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
text = "time flies like an arrow"

Para observar como funciona el tokenizer, pasaremos el text "time flies like an arrow".

In [36]:
inputs = tokenizer(text, add_special_tokens=True, return_tensors = "tf") #tokenizamos nuestro texto
print("text: ", text)
print("text_ids: ", inputs.input_ids)
print("tokens: ", tokenizer.convert_ids_to_tokens(inputs.input_ids[0]))

text:  time flies like an arrow
text_ids:  tf.Tensor([[  101  2051 10029  2066  2019  8612   102]], shape=(1, 7), dtype=int32)
tokens:  ['[CLS]', 'time', 'flies', 'like', 'an', 'arrow', '[SEP]']


Tomemos la configuración usual del modelo "bert-base-uncased" usado para definir nuestro Encoder y Decoder.

In [37]:
config = AutoConfig.from_pretrained("bert-base-uncased") #Cargamos la config standard
token_emb = layers.Embedding(config.vocab_size, config.hidden_size) #Cargamos el embedding con el input_dim y output_dim requeridos
print("input_dim: ", token_emb.input_dim, "output_dim: ", token_emb.output_dim)

input_dim:  30522 output_dim:  768


Podemos notar que tiene un vocabulario de 30522 token (incluyendo tokens especiales), y representa cada token como un vector de 768 dimensiones.

Empezaremos definiendo el Encoder. Para esto, necesitamos definir:

- Función que calcula el Self-Attention entre el query (Q), key (K) y value (V). Recordemos que

$$ SelfAttention(Q, K ,V) = softmax\left(\frac{Q\cdot K^t}{\sqrt{dim_k}}\right)\cdot V $$

En este caso no estamos utilizando un masking para el Self Attention.

- Attention que proyecta nuestro hidden_state para luego calcular el Self-attention. Tiene parametros $Q$, $K$ y $V$. 

- Teniendo el attentionhead, definimos el Multi Attention Head, que aplica una cantidad de Attention para luego concatenar el resultado y proyectarlo a la dimensión deseada.

- Finalmente, definimos una layer Feed Forward consistente de proyectar, aplicar GELU, proyectar y luego un Dropout.

In [43]:
def scaled_dot_product_attention(Q, K, V): #Dados Q, K y V, calcula el self-attention
    dim_k = tf.cast(Q.shape[-1], tf.float32)
    scores = tf.matmul(Q, tf.transpose(K, perm=[0, 2, 1])) / tf.sqrt(dim_k) 
    weights = tf.nn.softmax(scores, axis = -1)
    attn = tf.matmul(weights,V)
    return attn

class AttentionHead(layers.Layer): #Dado un hidden state, utiliza sus propios Q, K y V para proyectar y luego calcular el self-attention.
    def __init__(self, head_dim):
        super().__init__()
        self.Q = layers.Dense(head_dim) #Proyectamos nuestro hidden_state a head_dim dimensiones
        self.K = layers.Dense(head_dim) 
        self.V = layers.Dense(head_dim) 
        
    def call(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(self.Q(hidden_state), self.K(hidden_state), self.V(hidden_state)) #Calculamos Self-Attention de lo proyectado
        return attn_outputs
    
class MultiHeadAttention(layers.Layer): #Crea varios AttentionHead para crear un Multi-Self-Attention 
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = [AttentionHead(head_dim) for _ in range(num_heads)]
        self.output_linear = layers.Dense(embed_dim)
        
    def call(self, hidden_state):
        x = tf.concat([h(hidden_state) for h in self.heads], axis = -1)
        x = self.output_linear(x)
        return x
    
class FeedForward(layers.Layer): #Un FF Layer.
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        intermediate_size = config.intermediate_size
        self.linear_1 = layers.Dense(intermediate_size)
        self.linear_2 = layers.Dense(embed_dim)
        self.gelu = activations.gelu
        dropout_rate = config.hidden_dropout_prob
        self.dropout = layers.Dropout(dropout_rate)
        
    def call(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

Con esto ya podemos definir nuestro Layer en que consite el Encoder

In [None]:
class TransformerEncoderLayer(layers.Layer): #Nuestro Encoder
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = layers.LayerNormalization()
        self.layer_norm_2 = layers.LayerNormalization()
        self.multi_head = MultiHeadAttention(config)
        self.feedforward = FeedForward(config)
    
    def call(self, x):
        hidden_state = self.layer_norm_1(x)
        hidden_state = self.multi_head(hidden_state)
        x = hidden_state + x
        hidden_state = self.layer_norm_2(x)
        hidden_state = self.feedforward(hidden_state)
        x = hidden_state + x
        return x
    
class Embeddings(layers.Layer): #Definimos también un Embedding que codifica tanto nuestros tokens como su posición en la frase
    def __init__(self, config):
        super().__init__()
        vocab_size = config.vocab_size
        embed_dim = config.hidden_size
        self.token_embedding = layers.Embedding(vocab_size, embed_dim)
        self.positional_embedding = layers.Embedding(config.max_position_embeddings, embed_dim)
        self.layer_norm = layers.LayerNormalization()
        self.dropout = layers.Dropout(config.hidden_dropout_prob)
    
    def call(self, input_ids):
        seq_length = input_ids.shape[0]
        position_ids = tf.range(seq_length, dtype=tf.int32)
        position_ids = tf.expand_dims(position_ids, axis=0)
        token_emb = self.token_embedding(input_ids)
        pos_emb = self.positional_embedding(position_ids)
        embedding = token_emb + pos_emb
        embedding = self.layer_norm(embedding)
        embedding = self.dropout(embedding)   
        return embedding

Teniendo ya todos los componente, definimos el Encoder:

In [40]:
class TransformerEncoder(layers.Layer): 
    def __init__(self, config):
            super().__init__()
            self.embeddings = Embeddings(config) #Hacemos embedding
            self.layers = [TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)] #Luego lo pasamos por una cierta cantidad de EncoderLayers

    def call(self, x):
        x = self.embeddings(x) 
        for layer in self.layers:
            x = layer(x)
        return x

Notamos que el Encoder toma un input de la forma (batch_size, length), y nos entrega un tensor con forma (batch_size, lenght, dimension)

In [44]:
encoder = TransformerEncoder(config)
print("input size: ", inputs.input_ids.shape)
print("output size: ", encoder(inputs.input_ids).shape)

input size:  (1, 7)
output size:  (1, 7, 768)


Para implementar un Decoder, necesitamos aplicar un mask en el attention, pues solo dependeremos del token actual y los anteriores al token para hacer nuestra predicción. Para esto, consideremos una matriz mask con 0 arriba de la diagonal

In [None]:
def scaled_dot_product_attention_with_mask(Q, K, V, mask = None): #Actualizado para considerar el caso con mask
    dim_k = tf.cast(Q.shape[-1], tf.float32)
    scores = tf.matmul(Q, tf.transpose(K, perm=[0, 2, 1])) / tf.sqrt(dim_k)
    seq_len = tf.shape(scores)[1]
    if mask is not None:
        mask = tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
        mask = tf.reshape(mask, (1, seq_len, seq_len))  # Add batch and head dimensions
        scores = tf.where(mask == 0, -float("inf"), scores)
    weights = tf.nn.softmax(scores, axis = -1)
    return tf.matmul(weights, V)

class MaskedSelfAttention(layers.Layer): #SelfAttention con el Masked product.
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.Q = layers.Dense(head_dim)
        self.K = layers.Dense(head_dim) 
        self.V = layers.Dense(head_dim)
        
    def call(self, hidden_state, mask):
        attn_outputs = scaled_dot_product_attention_with_mask(self.Q(hidden_state), self.K(hidden_state), self.V(hidden_state), mask)
        return attn_outputs

class MultiHeadAttentionMasked(layers.Layer): #Multi Self-Attention con el masked product.
    def __init__(self, config, mask):
        super().__init__()
        self.mask = mask
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        self.heads = [MaskedSelfAttention(config) for _ in range(num_heads)]
        self.output_linear = layers.Dense(embed_dim)
        
    def call(self, hidden_state):
        j = self.heads[0]
        attention_outputs = [h(hidden_state, self.mask) for h in self.heads]
        x = tf.concat(attention_outputs, axis=-1)
        x = self.output_linear(x)
        return x

También queremos considerar el attention con el output del Encoder. Para esto, definimos el Encoder decoder attention.

In [None]:
class Encoder_decoder_attention(layers.Layer): #Encoder decoder attention
    def __init__(self):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.Q = layers.Dense(head_dim)
        self.K = layers.Dense(head_dim) 
        self.V = layers.Dense(head_dim)
    
    def call(self, mid_repr, encoder_key, encoder_value):
        attn_outputs = scaled_dot_product_attention(self.Q(mid_repr), self.K(encoder_key), self.V(encoder_value)) #Calculamos Self-Attention
        return attn_outputs

class MultiHeadEncoderDecoderAttention(layers.Layer): #Multi Encoder decoder attention
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        self.heads = [Encoder_decoder_attention() for _ in range(num_heads)]
        self.output_linear = layers.Dense(embed_dim)
        
    def call(self, mid_repr, encoder_key, encoder_value):
        attention = tf.concat([h(mid_repr, encoder_key, encoder_value) for h in self.heads], axis = -1)
        attention = self.output_linear(attention)
        return attention

Así,  ya tenemos todas las herramientas para construir el Layer que comprendrá nuestro Decoder:

In [None]:
class TransformerDecoderLayer(layers.Layer):
    def __init__(self, config, mask):
        super().__init__()
        self.mask = mask
        self.layer_norm_1 = layers.LayerNormalization()
        self.layer_norm_2 = layers.LayerNormalization()
        self.multi_head_masked = MultiHeadAttentionMasked(config, mask)
        self.encoder_decoder_attention = MultiHeadEncoderDecoderAttention(config)
        self.feedforward = FeedForward(config)
    
    def call(self, x, encoder_key, encoder_value):
        hidden_state = self.layer_norm_1(x)
        hidden_state = self.multi_head_masked(hidden_state)
        x = hidden_state + x
        hidden_state = self.layer_norm_2(x)
        hidden_state = self.encoder_decoder_attention(hidden_state, encoder_key, encoder_value)
        hidden_state = self.feedforward(hidden_state)
        x = hidden_state + x
        return x

Nuestro Transformer será una aplicación sucesiva de estos layers. Más adelante agregaremos un classifcation head y un token prediction.

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, config, mask):
        super().__init__()
        self.mask = mask
        self.embeddings = Embeddings(config)
        self.layers = [TransformerDecoderLayer(config, mask) for _ in range(config.num_hidden_layers)]

    def call(self, x, encoder_key, encoder_value):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x, encoder_key, encoder_value)
        return x

Testeamos que nuestro Decoder está entregando los resultados esperados:

In [None]:
batch_size = 1
seq_len = 9
d_model = config.hidden_size

x = tf.random.uniform((batch_size, seq_len))
encoder_key = tf.random.uniform((batch_size, seq_len, 768))
encoder_value = tf.random.uniform((batch_size, seq_len, 768))

decoder = TransformerDecoder(config, mask=None)
output = decoder(x, encoder_key, encoder_value)
print("Input shape:", x.shape)
print("Output shape:", output.shape)

Input shape: (1, 9)
Output shape: (1, 9, 768)


Con esto podemos armar nuestro Encoder-Decoder transformer. Notemos que el output del encoder es evaluado dentro del decoder, pero el decoder tiene su propio input con el que empieza a generar tokens.

In [47]:
class EncoderDecoder(layers.Layer):
    def __init__(self, config, mask):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.decoder = TransformerDecoder(config, mask)

    def run(self, x):
        encoder_out = self.encoder(x)
        decoder_out =  self.decoder(tf.constant([[101]]), encoder_out, encoder_out) #[101] corresponde al token [CLS] con que empieza un texto.
        return decoder_out
    
    def call(self, x):
        return self.run(x)

In [50]:
mask = True
encoder_decoder = EncoderDecoder(config, mask)
x = tf.random.uniform((1, 7))
print("input: ", x.shape)
print("total output: ", encoder_decoder(x).shape)

input:  (1, 7)
total output:  (1, 1, 768)


In [None]:
class Unembedding(layers.Layer):
    def __init__(self):
        super().__init__()
        vocab_size = config.vocab_size
        self.unembedding = layers.Dense(vocab_size)
        
    def call(self, x):
        return self.unembedding(x)

In [80]:
unembedder = Unembedding()

In [81]:
tokenizer.convert_ids_to_tokens(tf.argmax(unembedder(x), axis = -1))

['white']

Ahora agregaremos un prediction head a nuestro model, para poder crear un Encoder-Decoder que sea auto-regresivo.