[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/sensioai/blog/blob/master/041_attention/attention.ipynb)

# Mecanismos de Atención

## El *dataset*

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from transformers import BertModel, BertTokenizer
from tqdm import tqdm
import numpy as np

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
import unicodedata
import re

def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

def read_file(file, reverse=False):
    # Read the file and split into lines
    lines = open(file, encoding='utf-8').read().strip().split('\n')

    # Split every line into pairs and normalize
    pairs = [[normalizeString(s) for s in l.split('\t')[:2]] for l in lines]

    return pairs

In [None]:
SOS_token = 0
EOS_token = 1
PAD_token = 2

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {"SOS": 0, "EOS": 1, "PAD": 2}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS", 2: "PAD"}
        self.n_words = 3  # Count SOS, EOS and PAD

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

    def indexesFromSentence(self, sentence):
        return [self.word2index[word] for word in sentence.split(' ')]

    def sentenceFromIndex(self, index):
        return [self.index2word[ix] for ix in index]

Para poder aplicar la capa de `attention` necesitamos que nuestras frases tengan una longitud máxima definida.

In [None]:
MAX_LENGTH = 768

eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s ",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)

def filterPairs(pairs, filters, lang=0):
    return [p for p in pairs if p[lang].startswith(filters)]

def trimPairs(pairs):
    return [p for p in pairs if len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH]

In [None]:
def prepareData(file, filters=None, reverse=False):
    pairs = read_file(file, reverse)
    print(f"Tenemos {len(pairs)} pares de frases")

    if filters is not None:
        pairs = filterPairs(pairs, filters, int(reverse))
        print(f"Filtramos a {len(pairs)} pares de frases")

    pairs = trimPairs(pairs)
    print(f"Tenemos {len(pairs)} pares de frases con longitud menor de {MAX_LENGTH}")

    if reverse:
        input_lang = Lang('eng')
        output_lang = Lang('spa')
    else:
        input_lang = Lang('spa')
        output_lang = Lang('eng')

    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])

    print("Longitud vocabularios:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)

    return input_lang, output_lang, pairs

input_lang, output_lang, pairs = prepareData('/content/drive/MyDrive/FINAL/spa.txt')

# descomentar para usar el dataset filtrado
#input_lang, output_lang, pairs = prepareData('spa.txt', filters=eng_prefixes)

random.choice(pairs)

Tenemos 120614 pares de frases
Tenemos 120614 pares de frases con longitud menor de 768
Longitud vocabularios:
spa 12990
eng 24933


['i have a big problem .', 'tengo un quilombo de novela .']

In [None]:
# output_lang.indexesFromSentence('tengo mucha sed .')

In [None]:
# output_lang.sentenceFromIndex([3, 1028, 647, 5])

En el `Dataset` nos aseguraremos de añadir el *padding* necesario para que todas las frases tengan la misma longitud, lo cual no hace necesario utilizar la función `collate` que implementamos en el post anterior.

## El modelo

En lo que se refiere al `encoder`, seguimos usando exactamente la misma arquitectura. La única diferencia es que, además del último estado oculto, necesitaremos todas sus salidas para que el `decoder` pueda usarlas.

In [None]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')

    def forward(self, input_sentences, attention_mask):
        outputs = self.bert(input_sentences, attention_mask=attention_mask)
        encoder_outputs = outputs.last_hidden_state
        print(encoder_outputs.size)
        encoder_hidden = None  # No se utiliza en BERT
        return encoder_outputs, encoder_hidden

### El *decoder* con *attention*

In [None]:
from transformers import BertModel

# Cargar el modelo preentrenado de BERT
model = BertModel.from_pretrained('bert-base-uncased')
bert_output_dim = model.config.hidden_size

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
class AttnDecoder(torch.nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, n_layers, max_length):
        super().__init__()
        self.n_layers = n_layers  # Agregar el atributo n_layers
        self.hidden_size = hidden_size  # Agregar el atributo n_layers
        self.embedding = torch.nn.Embedding(input_size, embedding_size)
        self.rnn = torch.nn.GRU(embedding_size, hidden_size, num_layers=n_layers, batch_first=True)
        self.out = torch.nn.Linear(hidden_size+bert_output_dim, input_size)

        self.attn = torch.nn.Linear(hidden_size + embedding_size, max_length)
        self.attn_combine = torch.nn.Linear(hidden_size * 2, hidden_size)

    def forward(self, input_words, hidden, encoder_outputs):
        embedded = self.embedding(input_words)

        attn_weights = torch.nn.functional.softmax(self.attn(torch.cat((embedded.squeeze(1), hidden[0]), dim=1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs.transpose(1, 2))

        output = torch.cat((embedded.squeeze(1), attn_applied.squeeze(1)), 1)
        output = self.attn_combine(output)
        output = torch.nn.functional.relu(output)

        output, hidden = self.rnn(output.unsqueeze(1), hidden)
        output = self.out(output.squeeze(1))

        return output, hidden, attn_weights

In [None]:
from torch.nn.utils.rnn import pad_sequence

# Crear un nuevo objeto de clase Dataset para utilizar el tokenizador de BERT
class BERTDataset(torch.utils.data.Dataset):
    def __init__(self, pairs):
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, ix):
        input_sentence = self.pairs[ix][0]
        output_sentence = self.pairs[ix][1]

        # Tokenizar las oraciones de entrada y salida
        input_tokens = tokenizer.encode(input_sentence, add_special_tokens=True)
        output_tokens = tokenizer.encode(output_sentence, add_special_tokens=True)

        return torch.tensor(input_tokens), torch.tensor(output_tokens)

In [None]:
def collate_fn(batch):
    input_sentences, output_sentences = zip(*batch)
    input_sentences = pad_sequence(input_sentences, batch_first=True)
    output_sentences = pad_sequence(output_sentences, batch_first=True)
    return input_sentences, output_sentences

In [None]:
train_size = len(pairs) * 80 // 100
train_pairs = pairs[:train_size]
test_pairs = pairs[train_size:]

train_dataset = BERTDataset(train_pairs)
test_dataset = BERTDataset(test_pairs)

batch_size = 64
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

In [None]:
from transformers import BertModel

# Cargar el modelo preentrenado de BERT
model = BertModel.from_pretrained('bert-base-uncased')

# Obtener el tamaño del espacio oculto
hidden_size = model.config.hidden_size

# Obtener el tamaño del vocabulario
vocab_size = model.config.vocab_size

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
encoder = Encoder()
decoder = AttnDecoder(vocab_size, hidden_size, hidden_size, n_layers=2, max_length=MAX_LENGTH)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
# Definir el dispositivo de entrenamiento
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Mover los modelos al dispositivo de entrenamiento
encoder.to(device)
decoder.to(device)


AttnDecoder(
  (embedding): Embedding(30522, 768)
  (rnn): GRU(768, 768, num_layers=2, batch_first=True)
  (out): Linear(in_features=1536, out_features=30522, bias=True)
  (attn): Linear(in_features=1536, out_features=768, bias=True)
  (attn_combine): Linear(in_features=1536, out_features=768, bias=True)
)

In [None]:
# Definir los optimizadores y la función de pérdida
encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=1e-3)
decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Función de entrenamiento
def fit(encoder, decoder, dataloader, epochs=10):
    for epoch in range(1, epochs+1):
        encoder.train()
        decoder.train()
        train_loss = []
        bar = tqdm(dataloader)
        for input_sentences, output_sentences in bar:
            bs = input_sentences.size(0)
            loss = 0
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()

            # Preparar los datos en el dispositivo de entrenamiento
            input_sentences = input_sentences.to(device)
            output_sentences = output_sentences.to(device)

            # Generar la máscara de atención para el codificador
            attention_mask = (input_sentences != 0).to(device)

            # Obtener las salidas del codificador
            encoder_outputs, _ = encoder(input_sentences, attention_mask)

            # Inicializar el estado oculto del decodificador
            decoder_hidden = torch.zeros(decoder.n_layers, bs, decoder.hidden_size).to(device)

            # Iterar sobre las secuencias de salida
            decoder_input = torch.tensor([[output_lang.word2index['SOS']] * bs], device=device).transpose(0, 1)
            for i in range(output_sentences.size(1)):
                output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_outputs)
                loss += criterion(output, output_sentences[:, i])
                decoder_input = output_sentences[:, i].unsqueeze(1)

            # Retropropagación y optimización
            loss.backward()
            encoder_optimizer.step()
            decoder_optimizer.step()
            train_loss.append(loss.item())
            bar.set_description(f"Epoch {epoch}/{epochs} loss {np.mean(train_loss):.5f}")

In [None]:
# Entrenar el modelo
fit(encoder, decoder, train_dataloader, epochs=10)

  0%|          | 0/1508 [00:00<?, ?it/s]

<built-in method size of Tensor object at 0x7f0be6757a10>





RuntimeError: ignored

## Entrenamiento

Vamos a implementar el bucle de entrenamiento. En primer lugar, al tener ahora dos redes neuronales, necesitaremos dos optimizadores (uno para el `encoder` y otro para el `decoder`). Al `encoder` le pasaremos la frase en el idioma original, y obtendremos el estado oculto final. Este estado oculto lo usaremos para inicializar el `decoder` que, junto al token `<sos>`, generará la primera palabra de la frase traducida. Repetiremos el proceso, utilizando como entrada la anterior salida del decoder, hasta obtener el token `<eos>`.

In [None]:
# from tqdm import tqdm
# import numpy as np

# def fit(encoder, decoder, dataloader, epochs=10):
#     encoder.to(device)
#     decoder.to(device)
#     encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=1e-3)
#     decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=1e-3)
#     criterion = torch.nn.CrossEntropyLoss()
#     for epoch in range(1, epochs+1):
#         encoder.train()
#         decoder.train()
#         train_loss = []
#         bar = tqdm(dataloader['train'])
#         for batch in bar:
#             input_sentences, output_sentences = batch
#             bs = input_sentences.shape[0]
#             loss = 0
#             encoder_optimizer.zero_grad()
#             decoder_optimizer.zero_grad()
#             # obtenemos el último estado oculto del encoder
#             encoder_outputs, hidden = encoder(input_sentences)
#             # calculamos las salidas del decoder de manera recurrente
#             decoder_input = torch.tensor([[output_lang.word2index['SOS']] for b in range(bs)], device=device)
#             for i in range(output_sentences.shape[1]):
#                 output, hidden, attn_weights = decoder(decoder_input, hidden, encoder_outputs)
#                 loss += criterion(output, output_sentences[:, i].view(bs))
#                 # el siguiente input será la palabra predicha
#                 decoder_input = torch.argmax(output, axis=1).view(bs, 1)
#             # optimización
#             loss.backward()
#             encoder_optimizer.step()
#             decoder_optimizer.step()
#             train_loss.append(loss.item())
#             bar.set_description(f"Epoch {epoch}/{epochs} loss {np.mean(train_loss):.5f}")

#         val_loss = []
#         encoder.eval()
#         decoder.eval()
#         with torch.no_grad():
#             bar = tqdm(dataloader['test'])
#             for batch in bar:
#                 input_sentences, output_sentences = batch
#                 bs = input_sentences.shape[0]
#                 loss = 0
#                 # obtenemos el último estado oculto del encoder
#                 encoder_outputs, hidden = encoder(input_sentences)
#                 # calculamos las salidas del decoder de manera recurrente
#                 decoder_input = torch.tensor([[output_lang.word2index['SOS']] for b in range(bs)], device=device)
#                 for i in range(output_sentences.shape[1]):
#                     output, hidden, attn_weights = decoder(decoder_input, hidden, encoder_outputs)
#                     loss += criterion(output, output_sentences[:, i].view(bs))
#                     # el siguiente input será la palabra predicha
#                     decoder_input = torch.argmax(output, axis=1).view(bs, 1)
#                 val_loss.append(loss.item())
#                 bar.set_description(f"Epoch {epoch}/{epochs} val_loss {np.mean(val_loss):.5f}")

In [None]:
# fit(encoder, decoder, dataloader, epochs=30)

## Generando traducciones

Una vez tenemos nuestro modelo entrenado, podemos utilizarlo para traducir frases del inglés al castellano de la siguiente manera.

In [None]:
# input_sentence, output_sentence = dataset['train'][10]
# input_lang.sentenceFromIndex(input_sentence.tolist()), output_lang.sentenceFromIndex(output_sentence.tolist())

In [None]:
# def predict(input_sentence):
#     # obtenemos el último estado oculto del encoder
#     encoder_outputs, hidden = encoder(input_sentence.unsqueeze(0))
#     # calculamos las salidas del decoder de manera recurrente
#     decoder_input = torch.tensor([[output_lang.word2index['SOS']]], device=device)
#     # iteramos hasta que el decoder nos de el token <eos>
#     outputs = []
#     decoder_attentions = torch.zeros(MAX_LENGTH, MAX_LENGTH)
#     i = 0
#     while True:
#         output, hidden, attn_weights = decoder(decoder_input, hidden, encoder_outputs)
#         decoder_attentions[i] = attn_weights.data
#         i += 1
#         decoder_input = torch.argmax(output, axis=1).view(1, 1)
#         outputs.append(decoder_input.cpu().item())
#         if decoder_input.item() == output_lang.word2index['EOS']:
#             break
#     return output_lang.sentenceFromIndex(outputs), decoder_attentions

In [None]:
# output_words, attn = predict(input_sentence)
# output_words

## Visualización de atención

Una de las ventajas que nos da la capa de atención es que nos permite visualizar en qué partes de los inputs se fija el modelo para generar cada una de las palabras en el output, dando un grado de explicabilidad a nuestro modelo (una propiedad siempre deseada en nuestro modelos de `Machine Learning`).

In [None]:
# import matplotlib.pyplot as plt
# import matplotlib.ticker as ticker

# def showAttention(input_sentence, output_words, attentions):
#     lim1, lim2 = input_sentence.index('EOS')+1, output_words.index('EOS')+1
#     fig = plt.figure(dpi=100)
#     ax = fig.add_subplot(111)
#     cax = ax.matshow(attentions[:lim2, :lim1].numpy(), cmap='bone')
#     fig.colorbar(cax)
#     # Set up axes
#     ax.set_xticklabels([' '] + input_sentence[:lim1], rotation=90)
#     ax.set_yticklabels([' '] + output_words)
#     # Show label at every tick
#     ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
#     ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
#     plt.show()

In [None]:
# showAttention(input_lang.sentenceFromIndex(input_sentence.tolist()), output_words, attn)

## Resumen

En este post hemos visto como introducir mecanismos de atención en nuestra arquitectura `encoder-decoder`, los cuales permiten a nuestra red neuronal focalizarse en partes concretas de los *inputs* a la hora de generar los *outputs*. Esta nueva capa no solo puede mejorar nuestros modelos sino que además también es interpretable, dándonos una idea del razonamiento detrás de las predicciones de nuestro modelo. Las redes neuronales con mejores prestaciones a día de hoy en tareas de `NLP`, los `transformers`, están basados enteramente en este tipo de capas de atención.