## Exercício: Modelo de Linguagem com auto-atenção

Este exercício é similar ao da aula passada, mas iremos agora treinar uma rede neural *com auto-atenção* para prever a próxima palavra de um texto, data as palavras anteriores como entrada.

Na camada de auto-atenção, deve-se implementar (vide slide 34):
- Embeddings de posição
- Projeções lineares (WQ, WK, WV, WO)
- Camada de feed forward (2-layer MLP)

Instrucões:
- É necessário fazer duas implementações da camada de auto-atenção: uma usando laços (ineficiente, mas fácil de entender) e outra matricial (eficiente mas difícil de entender). Usar slide 36 como referência.

- Fazer um assert para garantir que o resultado das duas implementações é exatamente igual.

- No treinamento, usar apenas a implementação matricial.

In [None]:
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
import sklearn
import torch.nn.functional as F
import numpy as np


## Parâmetros

In [None]:
context_size = 8
embedding_dim = 64
hidden = 500
batch_size = 128

## Faz download e carrega o dataset

In [None]:
text1 = open("pg67724.txt","r").read()
text1_pt = text1.split("\n\n")[35:2675]

text2 = open("pg67725.txt","r").read()
text2_pt = text2.split("\n\n")[32:2191]

paragraphs = text1_pt + text2_pt

cleaned_paragraphs = [paragraph.replace("\n", " ") for paragraph in paragraphs if paragraph.strip()]

len(paragraphs), len(cleaned_paragraphs)

(4799, 4740)

## Análise do dataset

In [None]:
# Conta as palavras no dataset
from collections import Counter
import re

def count_words(texts):
    word_counts = Counter()
    for text in texts:
        word_counts.update(re.findall(r'\w+', text.lower()))
    return word_counts

word_counts = count_words(cleaned_paragraphs)

len(word_counts)

11837

## Criando um vocabulário

In [None]:
vocab_size = 3000

UNK = '<UNK>'
most_frequent_words = [UNK] + [word for word, count in word_counts.most_common(vocab_size)]
vocab = {word: i for i, word in enumerate(most_frequent_words)}
vocab_size = len(vocab)

In [None]:
import re

# Function to encode a sentence into a list of indices based on a vocabulary
def encode_sentence(sentence, vocab):
    # Tokenize the sentence into words and punctuation marks
    tokens = re.findall(r'\w+|[.,!?-]', sentence.lower())
    # Encode each token using the vocabulary, replacing unknown words with 0
    encoded_sentence = [vocab.get(word, 0) for word in tokens]
    return encoded_sentence

# Function to decode a list of indices into a sentence using a vocabulary
def decode_sentence(encoded_sentence, vocab):
    words = []
    # Iterate through each index in the encoded sentence
    for index in encoded_sentence:
        # Find the corresponding word in the vocabulary for the index
        # If the index is not found in the vocabulary, replace it with "<UNK>"
        word = next((word for word, code in vocab.items() if code == index), "<UNK>")
        words.append(word)
    return words

## Classe do dataset

In [None]:
def gera_input_target(text, context_size):
    # Initialize lists to store contexts and targets.
    contexts = []
    targets = []

    # Iterate over the text to generate contexts and corresponding targets.
    for i in range(len(text) - context_size):
        # Extract the context of size 'context_size' starting from index 'i'.
        context = text[i: i + context_size]
        # Retrieve the target element immediately following the context.
        target = text[i + context_size]
        # Append the context and target to their respective lists.
        contexts.append(context)
        targets.append(target)

    # Return the lists of contexts and targets.
    return contexts, targets


In [None]:
class MyDataset(Dataset):
    def __init__(self, text, vocab, context_size):
        # Initialize the dataset with the provided text, vocabulary, and context size.
        # Encode each sentence in the text using the provided vocabulary.
        self.vocab = vocab
        self.context_size = context_size
        self.data = [encode_sentence(sentence, self.vocab) for sentence in text]

        # Initialize lists to store contexts and targets.
        contexts_list = []
        targets_list = []

        # Iterate over the encoded data to generate inputs and targets.
        for coded in self.data:
            # Skip sentences shorter than the context size.
            if len(coded) < self.context_size:
                continue

            # Check if any token in the encoded sentence is unknown (UNK).
            if any(token == self.vocab[UNK] for token in coded):
                continue  # Skip this example if it contains unknown tokens.

            # Generate inputs and targets using the current sentence's encoded representation.
            inputs, targets = gera_input_target(coded, context_size)
            contexts_list.extend(inputs)
            targets_list.extend(targets)

        # Convert the lists of contexts and targets into tensors.
        self.contexts_tensor = torch.tensor(contexts_list)
        self.targets_tensor = torch.tensor(targets_list)

    def __len__(self):
        # Return the total number of samples in the dataset.
        return len(self.targets_tensor)

    def __getitem__(self, idx):
        # Retrieve and return the context and target tensors for the given index.
        return self.contexts_tensor[idx], self.targets_tensor[idx]


In [None]:
from sklearn.model_selection import train_test_split

train_text, test_text = train_test_split(cleaned_paragraphs, test_size=0.2, random_state=18)

In [None]:
# Gera os dataset de treino e validação
train_dataset = MyDataset(train_text, vocab,context_size)
test_dataset = MyDataset(test_text, vocab, context_size)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
sample = next(iter(train_loader))

## Model

In [None]:
# Camada de atenção para implementação em Loop

def attention_loop(seq, WQ, WK, WV, WO):
    E = []
    for q in seq:
        q = WQ(q)  # Apply WQ to the query vector
        scores = []
        for k in seq:
            k = WK(k)  # Apply WK to the key vector
            score = torch.dot(q, k.transpose(-1,0))  # Compute dot product of q and k
            scores.append(score)
        scores_tensor = torch.tensor(scores)  # Convert scores to a tensor
        probs = scores_tensor.softmax(dim=-1)  # Normalize scores using softmax

        new_embedding = 0
        for v, p in zip(seq, probs):
            v = WV(v)  # Apply WV to the value vector
            new_embedding += v * p  # Weighted sum of values using probabilities

        new_embedding = WO(new_embedding)  # Apply WO to the final embedding
        E.append(new_embedding)

    return torch.stack(E)  # Stack all embeddings into a tensor


In [None]:
# Camada de atenção para implementação em martrizes

def attention_matrix(Q, K, V, linearWO):
        scores = torch.matmul(Q, K.transpose(-2, -1)) # shape = B,L,L
        probs = F.softmax(scores, dim=-1) # B,L,L
        E = torch.matmul(probs, V)  # shape = B,L,D
        return linearWO(E)

In [None]:
# Testando as funções

# Define the dimensions for testing
test_embedding_dim = 5
test_vocab_size = 5

# Create an embedding layer
embedding = nn.Embedding(test_vocab_size, test_embedding_dim)

# Initialize linear transformations for query, key, value, and output
WQ = nn.Linear(test_embedding_dim, test_embedding_dim)
WK = nn.Linear(test_embedding_dim, test_embedding_dim)
WV = nn.Linear(test_embedding_dim, test_embedding_dim)
WO = nn.Linear(test_embedding_dim, test_vocab_size)

# Generate test input
test_input = torch.tensor([0, 1, 2, 3, 4])
test_embedded = embedding(test_input)

# Compute attention using loop-based function
print(f"\nFunção loop")
output_loop = attention_loop(test_embedded, WQ, WK, WV, WO)
print(output_loop)

# Compute attention using matrix-based function
print(f"\nFunção matricial")
Q = WQ(test_embedded)
K = WK(test_embedded)
V = WV(test_embedded)
output_matricial = attention_matrix(Q, K, V, WO)
print(output_matricial)

# Compare the results of the two functions
are_equal = torch.allclose(output_matricial, output_loop)
print("\nAs saídas das duas funções são iguais:", are_equal)



Função loop
tensor([[ 0.1430,  0.1908,  0.5903, -0.2245, -0.0678],
        [ 0.1358,  0.1837,  0.5331, -0.2078, -0.0862],
        [ 0.1371,  0.2015,  0.5825, -0.2111, -0.0718],
        [ 0.1236,  0.2443,  0.5987, -0.1784, -0.0689],
        [ 0.1186,  0.1965,  0.5186, -0.1874, -0.1002]],
       grad_fn=<StackBackward0>)

Função matricial
tensor([[ 0.1430,  0.1908,  0.5903, -0.2245, -0.0678],
        [ 0.1358,  0.1837,  0.5331, -0.2078, -0.0862],
        [ 0.1371,  0.2015,  0.5825, -0.2111, -0.0718],
        [ 0.1236,  0.2443,  0.5987, -0.1784, -0.0689],
        [ 0.1186,  0.1965,  0.5186, -0.1874, -0.1002]],
       grad_fn=<AddmmBackward0>)

As saídas das duas funções são iguais: True


In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, context_size, embedding_dim):
        super(PositionalEncoding, self).__init__()
        # Initialize positional encoding matrix
        self.pe = torch.zeros(context_size, embedding_dim)

        # Compute positional encodings
        position = torch.arange(0, context_size, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-np.log(10000.0) / embedding_dim))
        self.pe[:, 0::2] = torch.sin(position * div_term)
        self.pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = self.pe.unsqueeze(0)  # Add batch dimension

    def forward(self, x):
        # Add positional encodings to input embeddings
        if len(x.shape) == 3:
            _, seq_len, _ = x.size() # batch size, context size, embedding dim
        else:
            seq_len, _ = x.size()    # context size, embedding dim

        pe = self.pe[:, :seq_len, :]
        return x + pe.to(x.device)  # Return input embeddings with positional encodings added


class AttentionModel_Matrix(nn.Module):
    def __init__(
        self,
        vocab_size,      # Vocabulary size (assuming `vocab` is defined elsewhere)
        context_size,    # Context window size
        embedding_dim,   # Dimensionality of word embeddings
        hidden           # Dimensionality of hidden layer in MLP
    ):
        super().__init__()  # Call the constructor of the parent class

        # Define embedding layer and positional encoding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_encoding = PositionalEncoding(context_size, embedding_dim)

        # Linear transformations for Q, K, V, and output
        self.linearWQ = nn.Linear(embedding_dim, embedding_dim)
        self.linearWK = nn.Linear(embedding_dim, embedding_dim)
        self.linearWV = nn.Linear(embedding_dim, embedding_dim)
        self.linearWO = nn.Linear(embedding_dim, embedding_dim)

        # Fully connected layers for classification
        self.fc1 = nn.Linear(context_size * embedding_dim, hidden)
        self.fc2 = nn.Linear(hidden, vocab_size)

    def self_attention_layer(self, q, k, v, wo):
        # Compute self-attention scores
        scores = torch.matmul(q, k.transpose(-2, -1)) # shape = B,L,L
        probs = F.softmax(scores, dim=-1)  # Apply softmax to obtain attention weights
        e = torch.matmul(probs, v)  # Compute weighted sum of values (V)
        return wo(e)  # Apply linear transformation to the weighted sum

    def forward(self, x):
        # Embed input tokens and add positional encodings
        embedding_input = self.embedding(x)
        embedding_input = self.pos_encoding(embedding_input)

        # Linear transformations for Q, K, and V
        Q = self.linearWQ(embedding_input)
        K = self.linearWK(embedding_input)
        V = self.linearWV(embedding_input)

        # Compute self-attention
        E = self.self_attention_layer(Q, K, V, self.linearWO)

        # Reshape for fully connected layers
        if len(E.shape) == 3:
            batch_size, context_size, vocab_size = E.shape
            E = E.view(batch_size, -1)
        else:
            batch_size, context_size = E.shape
            E = E.view(batch_size, -1)

        # Apply fully connected layers
        o = self.fc1(E)
        o = F.relu(o)
        logits = self.fc2(o)

        return logits

# Create an instance of the AttentionModel_Matrix class
model_matrix = AttentionModel_Matrix(vocab_size, context_size, embedding_dim, hidden)


In [None]:
# Verificando os parâmetros treináveis
parametros_treinaveis = list(model_matrix.parameters())

# Imprima a quantidade de parâmetros treináveis e a lista de parâmetros
print(f'Quantidade de parâmetros treináveis: {len(parametros_treinaveis)}')
print("Parâmetros treináveis:")
for parametro in parametros_treinaveis:
    print(parametro.shape)

total_parametros = 0

# Itere sobre os parâmetros treináveis e calcule o número total de parâmetros
for parametro in parametros_treinaveis:
    if len(parametro.shape) == 2:  # Verifica se é um tensor de peso de camada totalmente conectada
        total_parametros += (parametro.shape[0] * parametro.shape[1])  # Multiplica os tamanhos da entrada e saída
    elif len(parametro.shape) == 1:  # Verifica se é um tensor de bias
        total_parametros += parametro.shape[0]  # Adiciona o número de parâmetros de bias

# Imprima o número total de parâmetros
print(f'Número total de parâmetros: {total_parametros}')

Quantidade de parâmetros treináveis: 13
Parâmetros treináveis:
torch.Size([3001, 64])
torch.Size([64, 64])
torch.Size([64])
torch.Size([64, 64])
torch.Size([64])
torch.Size([64, 64])
torch.Size([64])
torch.Size([64, 64])
torch.Size([64])
torch.Size([500, 512])
torch.Size([500])
torch.Size([3001, 500])
torch.Size([3001])
Número total de parâmetros: 1968705


In [None]:
# Camadas da Rede
print(model_matrix)

AttentionModel_Matrix(
  (embedding): Embedding(3001, 64)
  (pos_encoding): PositionalEncoding()
  (linearWQ): Linear(in_features=64, out_features=64, bias=True)
  (linearWK): Linear(in_features=64, out_features=64, bias=True)
  (linearWV): Linear(in_features=64, out_features=64, bias=True)
  (linearWO): Linear(in_features=64, out_features=64, bias=True)
  (fc1): Linear(in_features=512, out_features=500, bias=True)
  (fc2): Linear(in_features=500, out_features=3001, bias=True)
)


In [None]:
sample = next(iter(train_loader))
input = sample[0]
target = sample[1]

In [None]:
output = model_matrix(input)

In [None]:
output.argmax(dim=1)

tensor([2844,  288,  419,  419, 1962, 1032,  288, 1962,  419,  218,  626, 1947,
         662,  419])

In [None]:
target.shape

torch.Size([14])

## Training

In [None]:
# Verifica se há uma GPU disponível e define o dispositivo para GPU se possível, caso contrário, usa a CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_matrix.parameters(), lr=0.1)
model_matrix.to(device)

# Calculate loss before training
model_matrix.eval()  # Set the model to evaluation mode
initial_loss = 0
with torch.no_grad():
    for inputs, targets in train_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        # Forward pass
        logits = model_matrix(inputs)
        initial_loss += criterion(logits, targets)

    avg_loss = initial_loss / len(train_loader)

initial_PPL = torch.exp(avg_loss)
print(f'Initial Loss: {avg_loss:.4f}, n\
        Initial Perplexity: {initial_PPL}')

# Training loop
num_epochs = 15
for epoch in range(num_epochs):
    model_matrix.train()
    running_loss = 0
    for inputs, targets in train_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        # Forward pass
        logits = model_matrix(inputs)
        loss_train = criterion(logits, targets)

        # Backward and optimize
        optimizer.zero_grad()
        loss_train.backward()
        optimizer.step()

        running_loss += loss_train.item()

        ppl_train = torch.exp(loss_train)

    train_loss = running_loss / len(train_loader)

    model_matrix.eval()

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            logits = model_matrix(inputs)

    loss_test = criterion(logits, targets)
    ppl_test = torch.exp(loss_test)

    print(f'Epoch [{epoch+1}/{num_epochs}], \
            Loss Treinamento: {loss_train.item():.4f}, \
            PPL Treinamento: {ppl_train.item():.4f}, \
            Loss Teste: {loss_test.item():.4f}, \
            PPL Teste: {ppl_test.item():.4f}')

Initial Loss: 8.0027, n        Initial Perplexity: 2989.056884765625
Epoch [1/15],             Loss Treinamento: 8.0027,             PPL Treinamento: 2989.0513,             Loss Teste: 7.8811,             PPL Teste: 2646.8284
Epoch [2/15],             Loss Treinamento: 7.7705,             PPL Treinamento: 2369.6553,             Loss Teste: 7.8080,             PPL Teste: 2460.2358
Epoch [3/15],             Loss Treinamento: 7.5043,             PPL Treinamento: 1815.8245,             Loss Teste: 7.6810,             PPL Teste: 2166.7964
Epoch [4/15],             Loss Treinamento: 7.0945,             PPL Treinamento: 1205.3552,             Loss Teste: 7.3799,             PPL Teste: 1603.4224
Epoch [5/15],             Loss Treinamento: 6.2855,             PPL Treinamento: 536.7534,             Loss Teste: 6.5350,             PPL Teste: 688.8222
Epoch [6/15],             Loss Treinamento: 4.2119,             PPL Treinamento: 67.4825,             Loss Teste: 6.7213,             PPL Teste: 829

## Exemplo de uso

In [None]:
import random

def generate_text(model: AttentionModel_Matrix, length: int, vocab: vocab, context_size):

    # Ensure that the length is at least equal to the context size
    assert length > context_size

    # Initialize the sentence with random words from the vocabulary
    sentence = random.sample(range(0, vocab_size), context_size)

    with torch.no_grad():
        while len(sentence) < length:
            x = torch.unsqueeze(torch.asarray(sentence[-context_size:]), dim=0).to(device)
            y = model(x).squeeze()
            y = nn.Softmax(dim=0)(y)

            # Choose the next word according to probabilities
            y = random.choices(range(0, vocab_size), y)
            sentence.append(y[0])

    return ' '.join(decode_sentence(sentence, vocab))


max_length= 10
generate_text(model_matrix, max_length, vocab, context_size)

'dahi hirtos velhos perdia pequeno resignação viii outr murmurio amigo'

## Referências

César Bastos da Silva

Ramon Simões Abilio