In [None]:
from datasets import load_dataset
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from transformers import AutoTokenizer
import torch
from torch import nn
import numpy as np
from torch.utils.data import DataLoader

# Transformer from scratch

In the two following sessions, we are going to code a Transformer model from scratch. For this first session, we focus on Attention for text classification. The goal is to develop a modular code, increasing the difficulty step by step.

# Attention

In this part, we will build an attention module, step by step. To have modular code, it is better to use `class` to code our module. We will mainly follow the course to get a good intuition of how attention works.

:red_circle: **WARNING** :red_circle: **You are not allowed to use any existing pre-built torch module, like `nn.Linear`.** You can only use pytorch `Tensor` and the module you create.


In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [None]:
class_names = ["World", "Sports", "Business", "Sci/Tech"]

In [None]:
dataset = load_dataset("sh0416/ag_news")
dataset = dataset["train"].select(range(1000))
dataset = dataset.shuffle(seed=42)
dataset = dataset.train_test_split(test_size=0.3, seed=1234)
dataset["validation"] = dataset["test"]

print(dataset)


def preprocess_text(x):

    ids = tokenizer(x["description"], truncation=True, max_length=256, padding=False)[
        "input_ids"
    ]
    return {"input_ids": ids, "label": x["label"] - 1}


# Clean the dataset and tokenize it directly
dataset = dataset.map(preprocess_text)

DatasetDict({
    train: Dataset({
        features: ['label', 'title', 'description'],
        num_rows: 700
    })
    test: Dataset({
        features: ['label', 'title', 'description'],
        num_rows: 300
    })
    validation: Dataset({
        features: ['label', 'title', 'description'],
        num_rows: 300
    })
})


Map:   0%|          | 0/300 [00:00<?, ? examples/s]

In [None]:
class DataCollator:
    def __init__(self, tokenizer, max_len=128):
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __call__(self, batch):
        # Tokenize the texts
        labels = [example["label"] for example in batch]
        tokenized_texts = [example["input_ids"] for example in batch]
        # Pad the tokenized texts
        max_len = max(len(text) for text in tokenized_texts)
        padded_texts = [
            text + [self.tokenizer.pad_token_id] * (max_len - len(text))
            for text in tokenized_texts
        ]
        pad_mask = [
            [1] * len(text) + [0] * (max_len - len(text)) for text in tokenized_texts
        ]
        return {
            "input_ids": torch.tensor(padded_texts),
            "pad_mask": torch.tensor(pad_mask),
            "labels": torch.tensor(labels),
        }

In [None]:
def validation_step(valid_dataloader, model, criterion):
    n_valid = len(valid_dataloader.dataset)
    model.eval()
    total_loss = 0.0
    correct = 0
    n_iter = 0
    with torch.no_grad():
        for batch in valid_dataloader:
            input_ids = batch["input_ids"].to(DEVICE)
            labels = batch["labels"].to(DEVICE)
            pad_mask = batch["pad_mask"].to(DEVICE)
            output = model(input_ids, pad_mask)
            loss = criterion(output, labels)
            total_loss += loss.item()
            correct += (output.argmax(axis=-1) == labels).sum().item()
            n_iter += 1
    return total_loss / n_iter, correct / n_valid


def train_one_epoch(train_dataloader, model, optimizer, criterion):
    model.train()
    total_loss = 0.0
    correct = 0
    n_train = len(train_dataloader.dataset)
    n_iter = 0
    for batch in train_dataloader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(DEVICE)
        labels = batch["labels"].to(DEVICE)
        pad_mask = batch["pad_mask"].to(DEVICE)
        class_scores = model(input_ids, pad_mask)  # (B, 4)

        loss = criterion(class_scores, labels)  # scalaire (1,)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (class_scores.argmax(axis=-1) == labels).sum().item()
        n_iter += 1

    return total_loss / n_iter, correct / n_train


def train(model, train_dataloader, valid_dataloader, lr=0.01, n_epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Track training loss, training accuracy, validation loss and validation accuracy and plot in the end
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []
    model.to(DEVICE)
    for epoch in tqdm(range(n_epochs)):
        train_loss, train_accuracy = train_one_epoch(
            train_dataloader, model, optimizer, criterion
        )
        valid_loss, valid_accuracy = validation_step(valid_dataloader, model, criterion)
        print(
            f"Epoch {epoch + 1}: train_loss: {train_loss:.4f}, train_accuracy: {train_accuracy:.4f}, valid_loss: {valid_loss:.4f}, valid_accuracy: {valid_accuracy:.4f}"
        )
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        valid_losses.append(valid_loss)
        valid_accuracies.append(valid_accuracy)

    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="train loss")
    plt.plot(valid_losses, label="valid loss")
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label="train accuracy")
    plt.plot(valid_accuracies, label="valid accuracy")
    plt.legend()

## Attention-based classification model

The first step since we have access to text in a tokenized form is to use "static" word embeddings.

### Embeddings


🚧 **TODO** 🚧

Write a class named "Embeddings" that will take as input a list of ids and return associated vectors. The ids are integer and their range is given by the vocabulary size. This last quantity directly derives from the choice we made when building the tokenizer.

In [None]:
a = torch.randn(4, 10)  # 4 embeddings de dimension 10
indices = [[0, 2, 1], [1, 1, 3]]
indices = torch.tensor(indices)
indices.shape
a[indices].shape

torch.Size([2, 3, 10])

In [None]:
class Embeddings(nn.Module):
    def __init__(self, vocab_size, d):
        # TODO
        super().__init__()
        self.embeddings = nn.Parameter(torch.randn(vocab_size, d))

    def forward(self, x):
        # x is a batch of ids (B, L)
        return self.embeddings[x]

In [None]:
embedding_model = Embeddings(1000, 10)
embedding_model(indices).shape

torch.Size([2, 3, 10])

### Attention

Here we will write a 'AttentionBasedClassifier' `class` that will take as input a list of vectors and return a list of probabilities over the possible classes. The classes should be contextualized with the input vectors, using attention.

🚧 **TODO** 🚧

Write a class named "ClassAttention" that will take as input a list of word vectors, contextualize the embeddings over the classes and return a list of probabilities over classes.

**HINT** Split the architectures into modular blocks.

In [None]:
class Linear(nn.Module):
    def __init__(self, d_in, d_out, add_bias=True):
        # TODO
        super().__init__()
        self.proj_matrix = nn.Parameter(torch.randn(d_out, d_in))
        if add_bias:
            self.bias = nn.Parameter(torch.randn(d_out))
            self.add_bias = True
        else:
            self.add_bias = False

    def forward(self, x):
        x_out = self.proj_matrix @ x.transpose(1, 2)
        if self.add_bias:
            x_out = x_out + self.bias[None, None, :]
        return x_out.transpose(1, 2)


class LayerNorm(nn.Module):
    def __init__(self, d, eps=1e-5):
        super().__init__()
        self.gamma = nn.Parameter(torch.randn(d))
        self.eps = eps

    def forward(self, x):
        # x is (B, L, D)
        x_norm = x - x.mean(axis=2, keepdim=True)  # x_mean should be (B, L)
        var = x.var(axis=2, keepdim=True)
        x_norm = x_norm / torch.sqrt(var + self.eps)
        x_norm = x_norm * self.gamma[None, None, :]
        return x_norm

Pour construire l'attention

**Init**

- définir les matrices de projection (paramètres), $Q, K, V \in \mathbb{R}^{d \times d}$ (avec la class `Linear`).

**Forward**
- input vectors $x \in \mathbb{R}^{B\times L\times D}$
- les 3 projections, on a $q, k, v\in \mathbb{R}^{B \times L \times D}$.
- calcule la "similarité", $\forall i \in [1, L], \forall j \in [1, L], \ s_{ij} = q_i ^T k_j$.
- normaliser les scores (avec le softmax) $s_{ij} = \dfrac{e^{s_{ij}}}{\sum_{l=1}^L e^{s_{il}}}$
- on calcule les vecteurs de sortie: $y_i = \sum_{j=1}^L \alpha_{ij} v_j$.

In [None]:
q = torch.randn(2, 3, 5)
k = torch.randn(2, 3, 5)
s = q @ k.transpose(1, 2)

In [None]:
q = torch.randn(3, 5)
k = torch.randn(3, 5)
s = q @ k.T
print(s.shape) # (3, 3)

IndentationError: unexpected indent (<ipython-input-16-9ad19f0dd3c6>, line 3)

In [None]:
import math
class Attention(nn.Module):
    def __init__(self, d):
        super().__init__()
        self.q_proj = Linear(d, d, add_bias=False)
        self.v_proj = Linear(d, d, add_bias=False)
        self.k_proj = Linear(d, d, add_bias=False)

    def forward(self, x):
        q = self.q_proj(x) # (B, L, D)
        k = self.k_proj(x) # (B, L, D)
        v = self.v_proj(x) # (B, L, D)

        s_ij = q @ k.transpose(1, 2) # (B, L, L)

        # optionnel
        d = x.shape[2]
        s_ij = s_ij / math.sqrt(d)

        # a_ij = torch.exp(s_ij)
        # a_ij = a_ij / a_ij.sum(dim=2, keepdim=True)

        a_ij = torch.nn.functional.softmax(s_ij, dim=2) # (B, L, L)
        y = a_ij @  v

        return y





In [None]:
B = 4
L = 10
d = 8
a = torch.randn(B, L, L)
v = torch.randn(B, L, d)
y = a @ v
print(y.shape)
# i = 1
# attention_i = a[0, i] # (L)
# y_i = (v[0] * attention_i[:, None]).sum(axis=0) # d
# print(y_i.shape)

torch.Size([4, 10, 8])


In [None]:
# Try the model on a simple input:
model = Attention(d=d)
x = torch.randn(B, L, d)
output = model(x)
print(output.shape)

torch.Size([4, 10, 8])


🚧 **TODO** 🚧

Update the code above such that it takes as argument an `attention_mask` for padding.

Use this `attention_mask` to modify the attention computation such that padding tokens do not interfere in the computation.

In [None]:
# TODO
INF = 1e10


class MaskedAttention(nn.Module):
    def __init__(self, d):
        # TODO

    def forward(self, x, pad_mask):
        # TODO

🔴 **TEST**

In [None]:
model = MaskedClassAttention(d=50, n_classes=4, vocab_size=10000)

seq_len = 10
x_without_pad = torch.randint(0, 10000, (1, seq_len))
attention_mask = torch.ones(1, seq_len)
out_without_pad = model(x_without_pad, attention_mask)

x_with_pad = torch.randint(0, 10000, (1, 2 * seq_len))
attention_mask = torch.ones(1, 2 * seq_len)
attention_mask[:, seq_len:] = 0
out_with_pad = model(x_with_pad, attention_mask)

assert out_without_pad.shape == out_with_pad.shape

In [None]:
batch_size = 4
n_train = len(dataset["train"])
n_valid = len(dataset["test"])
data_collator = DataCollator(tokenizer)
train_dataloader = DataLoader(
    dataset["train"], batch_size=batch_size, collate_fn=data_collator, shuffle=True
)
valid_dataloader = DataLoader(
    dataset["test"], batch_size=batch_size, collate_fn=data_collator, shuffle=True
)

model = MaskedClassAttention(d=10, n_classes=4, vocab_size=len(tokenizer))
train(
    model,
    train_dataloader=train_dataloader,
    valid_dataloader=valid_dataloader,
)