In [None]:
# %%capture
# !pip install datasets transformers

In [None]:
from typing import List, Optional
from tqdm import tqdm

# Transformer from scratch

In the two following sessions, we are going to code a Transformer model from scratch. For this first session, we focus on Attention for text classification. The goal is to develop a modular code, increasing the difficulty step by step.

# Tokenizer

Let start with the data. The goal is to build a clean interface with the dataset. 

🚧 **Question** 🚧 What are the main steps ? 


## Tokenizer 

Build a tokenizer (similar as in the first homework).

In [None]:
class WhiteSpaceTokenizer:
    def __init__(self, max_voc: Optional[int] = None):
        # The vocabulary will store the mapping between text tokens and their id.
        self.vocab = {}
        self.id_to_token = {}

        # We will keep track of the number of times a word appears in the corpus.
        self.frequencies = {}
        self.max_voc = max_voc

    def split_text(self, text: str) -> List[str]:
        """Converts the text to a list of tokens (substrings)."""
        return text.split(" ")

    def encode(self, text: str) -> List[int]:
        """Take a text as input and return its associated tokenization, as a list of ids."""
        list_tokens = self.split_text(text)
        list_ids = []
        for token in list_tokens:
            list_ids.append(self.vocab.get(token, -1))
        return list_ids

    def decode(self, ids: List[int]) -> str:
        tokens = [self.id_to_token[i] for i in ids]
        return " ".join(tokens)

    def fit(self, corpus: List[str]):
        """Fits the tokenizer to a list of texts to construct its vocabulary."""
        for text in tqdm(corpus):
            list_tokens = self.split_text(text)
            for token in list_tokens:
                token_count = self.frequencies.get(token, None)
                if token_count is None:
                    self.frequencies[token] = 0

                self.frequencies[token] += 1
        # Keep only most common words
        if self.max_voc is not None:
            self.frequencies = sorted(
                self.frequencies.items(), key=lambda x: x[1], reverse=True
            )
            self.frequencies = dict(self.frequencies[: self.max_voc])
        for i, token in enumerate(self.frequencies):
            self.vocab[token] = i
            self.id_to_token[i] = token
        self.num_words = len(self.vocab)
        # Add a special token for unknown words
        self.vocab["<UNK>"] = -1
        self.id_to_token[-1] = "<UNK>"
        print(f"Built a vocabulary of {self.num_words} words.")

## Dataset

We will use the AGNews dataset. It is a dataset with 4 classes: World, Sports, Business, Sci/Tech. We download it from HuggingFace Hub [here](https://huggingface.co/datasets/fancyzhx/ag_news).

In [None]:
from datasets import load_dataset
import unicodedata
import re

dataset = load_dataset("fancyzhx/ag_news")
print(dataset)


def preprocess_text(text: str) -> str:
    # lower case
    text = text.lower()

    # string normalization.
    text = unicodedata.normalize("NFD", text).encode("ascii", "ignore").decode()

    # remove non alpha numeric characters.
    text = re.sub(r"[^a-z0-9]", " ", text)

    # replace numbers by the <NUM> token.
    text = re.sub(r"\d+", "<NUM>", text)

    # remove double whitespaces.
    text = re.sub(" +", " ", text.strip())
    return text


# Clean the dataset
dataset = dataset.map(lambda x: {"text": preprocess_text(x["text"])})

🚧 **TODO** 🚧

Train the tokenizer on the AGNews train set.

In [None]:
tokenizer = WhiteSpaceTokenizer(max_voc=10000)
tokenizer.fit(dataset["train"]["text"])
print(tokenizer.vocab)

# Attention

In this part, we will build an attention module. We will begin by class attention as seen during the course to get a good intuition of how attention works.

:red_circle: **WARNING** :red_circle: **You are not allowed to use any existing pre-built torch module, like `nn.Linear`.**

🚧 **TODO** 🚧

Propose a chain of operations that should be applied to the input text, from the input to the output.
When it applies, write the dimension of the expected tensors.



## Attention model

The roadmap is to write our own modules for each part of the transformer block. For this we can inherit from the torch class `nn.Module`. Since we don't want to use already existing layers, we will rely on tensors and more importantly on `nn.Parameter`. 

🚧 **Question** 🚧
Why using `nn.Parameter` and what is the difference with `Tensor` ? 


### Embeddings


🚧 **TODO** 🚧

Write a class named "Embeddings" that will take as input a list of ids and return associated vectors.

In [None]:
from torch import nn
import torch


class Embeddings(nn.Module):
    def __init__(self, vocab_size, d):
        # TODO

### Attention

Here we will write a 'ClassAttention' class that will take as input a list of vectors and return a list of probabilities over classes. The classes should be contextualized with the input vectors, using attention.

🚧 **TODO** 🚧

Write a class named "ClassAttention" that will take as input a list of vectors and return a list of probabilities over classes.

:bulb: HINT: Split the architectures into modular blocks.

In [None]:
class Linear(nn.Module):
    def __init__(self, d_in, d_out, add_bias=True):
        # TODO



class Attention(nn.Module):
    def __init__(self, d):
        # TODO


class ClassAttention(nn.Module):
    def __init__(self, d, n_classes, vocab_size):
        # TODO

In [None]:
# Try the model on a simple input:
model = ClassAttention(d=50, n_classes=4, vocab_size=10000)
x = torch.randint(0, 10000, (2, 10))
output = model(x)
print(output.shape)

## Training

For training we need to iterate over the dataset.

### Data preparation

Here, we will assume that all texts have the same length, using truncation and discarding examples.

🚧 **Question** 🚧

Why do we need to have inputs of the same length?

**Answer**



In [None]:
def tokenize_truncate_and_discard(texts_list, labels_list, tokenizer, length=50):
    new_texts = []
    new_labels = []
    for text, label in zip(texts_list, labels_list):
        tokenized_text = tokenizer.encode(text)
        if len(tokenized_text) < length:
            continue
        new_texts.append(tokenized_text[:length])
        new_labels.append(label)

    return new_texts, new_labels


train_texts, train_labels = tokenize_truncate_and_discard(
    dataset["train"]["text"], dataset["train"]["label"], tokenizer
)
test_texts, test_labels = tokenize_truncate_and_discard(
    dataset["test"]["text"], dataset["test"]["label"], tokenizer
)

train_dataset = [(t, l) for t, l in zip(train_texts, train_labels)]
valid_dataset = [(t, l) for t, l in zip(test_texts, test_labels)]

print("Size before truncating:", len(dataset["train"]["text"]))
print("Size after truncating:", len(train_texts))

Now we need to make batches of examples. We will use the DataLoader class from PyTorch.

🚧 **TODO** 🚧

Load the data into batches. One batch should be a dictionary with the following keys:
- `"input_ids"`: tensor of size (batch_size, L)
- `"labels"`: tensor of size (batch_size,)

In [None]:
from torch.utils.data import DataLoader


class DataCollator:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer

    def __call__(self, batch):
        # `batch` is a list of tuples (text, label)
        # TODO


data_collator = DataCollator(tokenizer)

batch_size = 32

train_dataloader = # TODO
valid_dataloader = # TODO
n_valid = len(valid_dataset)
n_train = len(train_dataset)

🔴 **TEST**

In [None]:
batch = next(iter(train_dataloader))
print(batch)
assert isinstance(batch, dict)
assert "input_ids" in batch
assert "labels" in batch
assert isinstance(batch["input_ids"], torch.Tensor)
assert isinstance(batch["labels"], torch.Tensor)
assert batch["input_ids"].shape[0] == batch_size
assert batch["labels"].shape[0] == batch_size

🚧 **TODO** 🚧

Is something wrong with the data preparation? If yes, what is it?

**Answer**



### Training loop

Now write the training loop. Validate the model on the validation set every epoch.

In [None]:
import matplotlib.pyplot as plt
from torch import optim



def validation_step(valid_dataloader, model, criterion):
    # TODO


def train_one_epoch(train_dataloader, model, optimizer, criterion):
   # TODO


def train(model, train_dataloader, valid_dataloader, lr=0.01, n_epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    n_epochs = 5
    # Track training loss, training accuracy, validation loss and validation accuracy and plot in the end
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []

    for epoch in range(n_epochs):
        train_loss, train_accuracy = train_one_epoch(
            train_dataloader, model, optimizer, criterion
        )
        valid_loss, valid_accuracy = validation_step(valid_dataloader, model, criterion)
        print(
            f"Epoch {epoch + 1}: train_loss: {train_loss:.4f}, train_accuracy: {train_accuracy:.4f}, valid_loss: {valid_loss:.4f}, valid_accuracy: {valid_accuracy:.4f}"
        )
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        valid_losses.append(valid_loss)
        valid_accuracies.append(valid_accuracy)

    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="train loss")
    plt.plot(valid_losses, label="valid loss")
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label="train accuracy")
    plt.plot(valid_accuracies, label="valid accuracy")
    plt.legend()


model = ClassAttention(d=50, n_classes=4, vocab_size=10000)

train(model, train_dataloader, valid_dataloader)

## Padding and masking

Up to know, we assumed that our texts have the same length. To achieve that, we truncated the texts. However, in practive we want to keep the full texts.

We will investigate masking and padding to handle texts of different lengths.

🚧 **Question** 🚧

What will padding achieve?

What should we be careful about when using padding in the attention model?

**Answer**



In [None]:
class DataCollator:
    def __init__(self, tokenizer, pad_token_id):
        self.label_pad_token_id = pad_token_id
        self.tokenizer = tokenizer

    def __call__(self, batch):
        # Tokenize the texts
        texts = [example["text"] for example in batch]
        labels = [example["label"] for example in batch]
        tokenized_texts = [self.tokenizer.encode(text) for text in texts]
        # Pad the tokenized texts
        max_len = max(len(text) for text in tokenized_texts)
        padded_texts = # TODO
        attention_mask = # TODO
        return {
            "input_ids": torch.tensor(padded_texts),
            "attention_mask": torch.tensor(attention_mask),
            "labels": torch.tensor(labels),
        }


batch_size = 32
n_train = len(dataset["train"])
n_valid = len(dataset["test"])
data_collator = DataCollator(tokenizer, tokenizer.vocab["<UNK>"])
train_dataloader = DataLoader(
    dataset["train"], batch_size=batch_size, collate_fn=data_collator, shuffle=True
)
valid_dataloader = DataLoader(
    dataset["test"], batch_size=batch_size, collate_fn=data_collator, shuffle=True
)

In [None]:
batch = next(iter(train_dataloader))
print(batch)

🚧 **TODO** 🚧

Update the code above such that it takes as argument an `attention_mask` for padding.

Use this `attention_mask` to modify the attention computation such that padding tokens do not interfere in the computation.

In [None]:
# TODO
INF = 1e10


class MaskedAttention(nn.Module):
    def __init__(self, d):
        # TODO
 


class MaskedClassAttention(ClassAttention):
    def __init__(self, d, n_classes, vocab_size):
        super().__init__(d, n_classes, vocab_size)
        self.attention = MaskedAttention(d)

    def forward(self, x, attention_mask):
        word_vectors = self.word_embeddings(x)
        class_vectors = self.class_embeddings.unsqueeze(0)
        out = self.attention(class_vectors, word_vectors, attention_mask)
        out = self.class_projection(out)
        return out.squeeze(-1)

🔴 **TEST**

In [None]:
model = MaskedClassAttention(d=50, n_classes=4, vocab_size=10000)

seq_len = 10
x_without_pad = torch.randint(0, 10000, (1, seq_len))
attention_mask = torch.ones(1, seq_len)
out_without_pad = model(x_without_pad, attention_mask)

x_with_pad = torch.randint(0, 10000, (1, 2 * seq_len))
attention_mask = torch.ones(1, 2 * seq_len)
attention_mask[:, seq_len:] = 0
out_with_pad = model(x_with_pad, attention_mask)

assert out_without_pad.shape == out_with_pad.shape

In [None]:
def validation_step(valid_dataloader, model, criterion):
    # TODO


def train_one_epoch(train_dataloader, model, optimizer, criterion):
    # TODO


def train(model, train_dataloader, valid_dataloader, lr=0.01, n_epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # Track training loss, training accuracy, validation loss and validation accuracy and plot in the end
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []

    for epoch in range(n_epochs):
        train_loss, train_accuracy = train_one_epoch(
            train_dataloader, model, optimizer, criterion
        )
        valid_loss, valid_accuracy = validation_step(valid_dataloader, model, criterion)
        print(
            f"Epoch {epoch + 1}: train_loss: {train_loss:.4f}, train_accuracy: {train_accuracy:.4f}, valid_loss: {valid_loss:.4f}, valid_accuracy: {valid_accuracy:.4f}"
        )
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        valid_losses.append(valid_loss)
        valid_accuracies.append(valid_accuracy)

    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="train loss")
    plt.plot(valid_losses, label="valid loss")
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label="train accuracy")
    plt.plot(valid_accuracies, label="valid accuracy")
    plt.legend()

In [None]:
model = MaskedClassAttention(
    d=10,
    n_classes=4,
    vocab_size=10000,
)
train(
    model,
    train_dataloader=train_dataloader,
    valid_dataloader=valid_dataloader,
)