<a href="https://colab.research.google.com/github/Meddhif13/IASD/blob/main/labs/2_attention_lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:

from datasets import load_dataset
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from transformers import AutoTokenizer
import torch
from torch import nn
import numpy as np
from torch import nn
import torch
from torch.utils.data import DataLoader

# Transformer from scratch

In the two following sessions, we are going to code a Transformer model from scratch. For this first session, we focus on Attention for text classification. The goal is to develop a modular code, increasing the difficulty step by step.

# Attention

In this part, we will build an attention module, step by step. To have modular code, it is better to use `class` to code our module. We will mainly follow the course to get a good intuition of how attention works.

:red_circle: **WARNING** :red_circle: **You are not allowed to use any existing pre-built torch module, like `nn.Linear`.** You can only use pytorch `Tensor` and the module you create.


In [4]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

In [6]:
class_names = ["World", "Sports", "Business", "Sci/Tech"]

In [7]:
dataset = load_dataset("sh0416/ag_news")
dataset = dataset["train"].select(range(1000))
dataset = dataset.shuffle(seed=42)
dataset = dataset.train_test_split(test_size=0.3, seed=1234)
dataset["validation"] = dataset["test"]

print(dataset)


def preprocess_text(x):

    ids = tokenizer(x["description"], truncation=True, max_length=256, padding=False)[
        "input_ids"
    ]
    return {"input_ids": ids, "label": x["label"] - 1}


# Clean the dataset and tokenize it directly
dataset = dataset.map(preprocess_text)

README.md:   0%|          | 0.00/2.08k [00:00<?, ?B/s]

train.jsonl:   0%|          | 0.00/33.7M [00:00<?, ?B/s]

test.jsonl:   0%|          | 0.00/2.13M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/120000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/7600 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['label', 'title', 'description'],
        num_rows: 700
    })
    test: Dataset({
        features: ['label', 'title', 'description'],
        num_rows: 300
    })
    validation: Dataset({
        features: ['label', 'title', 'description'],
        num_rows: 300
    })
})


Map:   0%|          | 0/700 [00:00<?, ? examples/s]

Map:   0%|          | 0/300 [00:00<?, ? examples/s]

In [8]:
class DataCollator:
    def __init__(self, tokenizer, max_len=128):
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __call__(self, batch):
        # Tokenize the texts
        labels = [example["label"] for example in batch]
        tokenized_texts = [example["input_ids"] for example in batch]
        # Pad the tokenized texts
        max_len = max(len(text) for text in tokenized_texts)
        padded_texts = [
            text + [self.tokenizer.pad_token_id] * (max_len - len(text))
            for text in tokenized_texts
        ]
        pad_mask = [
            [1] * len(text) + [0] * (max_len - len(text)) for text in tokenized_texts
        ]
        return {
            "input_ids": torch.tensor(padded_texts),
            "pad_mask": torch.tensor(pad_mask),
            "labels": torch.tensor(labels),
        }

In [9]:
def validation_step(valid_dataloader, model, criterion):
    n_valid = len(valid_dataloader.dataset)
    model.eval()
    total_loss = 0.0
    correct = 0
    n_iter = 0
    with torch.no_grad():
        for batch in valid_dataloader:
            input_ids = batch["input_ids"].to(DEVICE)
            labels = batch["labels"].to(DEVICE)
            pad_mask = batch["pad_mask"].to(DEVICE)
            output = model(input_ids, pad_mask)
            loss = criterion(output, labels)
            total_loss += loss.item()
            correct += (output.argmax(axis=-1) == labels).sum().item()
            n_iter += 1
    return total_loss / n_iter, correct / n_valid


def train_one_epoch(train_dataloader, model, optimizer, criterion):
    model.train()
    total_loss = 0.0
    correct = 0
    n_train = len(train_dataloader.dataset)
    n_iter = 0
    for batch in train_dataloader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(DEVICE)
        labels = batch["labels"].to(DEVICE)
        pad_mask = batch["pad_mask"].to(DEVICE)
        class_scores = model(input_ids, pad_mask)  # (B, 4)

        loss = criterion(class_scores, labels)  # scalaire (1,)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (class_scores.argmax(axis=-1) == labels).sum().item()
        n_iter += 1

    return total_loss / n_iter, correct / n_train


def train(model, train_dataloader, valid_dataloader, lr=0.01, n_epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Track training loss, training accuracy, validation loss and validation accuracy and plot in the end
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []
    model.to(DEVICE)
    for epoch in tqdm(range(n_epochs)):
        train_loss, train_accuracy = train_one_epoch(
            train_dataloader, model, optimizer, criterion
        )
        valid_loss, valid_accuracy = validation_step(valid_dataloader, model, criterion)
        print(
            f"Epoch {epoch + 1}: train_loss: {train_loss:.4f}, train_accuracy: {train_accuracy:.4f}, valid_loss: {valid_loss:.4f}, valid_accuracy: {valid_accuracy:.4f}"
        )
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        valid_losses.append(valid_loss)
        valid_accuracies.append(valid_accuracy)

    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="train loss")
    plt.plot(valid_losses, label="valid loss")
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label="train accuracy")
    plt.plot(valid_accuracies, label="valid accuracy")
    plt.legend()

## Attention-based classification model

The first step since we have access to text in a tokenized form is to use "static" word embeddings.

### Embeddings


🚧 **TODO** 🚧

Write a class named "Embeddings" that will take as input a list of ids and return associated vectors. The ids are integer and their range is given by the vocabulary size. This last quantity directly derives from the choice we made when building the tokenizer.

In [10]:
class Embeddings(nn.Module):
    def __init__(self, vocab_size, d):
        # TODO
        super().__init__()
        self.Embeddings = nn.Parameter(torch.randn(vocab_size, d))

    def forward(self, x):
        # TODO
        #x is a batch of IDs (B, L)
        self.Embeddings[x]


### Attention

Here we will write a 'AttentionBasedClassifier' `class` that will take as input a list of vectors and return a list of probabilities over the possible classes. The classes should be contextualized with the input vectors, using attention.

🚧 **TODO** 🚧

Write a class named "ClassAttention" that will take as input a list of word vectors, contextualize the embeddings over the classes and return a list of probabilities over classes.

**HINT** Split the architectures into modular blocks.

In [16]:
class Linear(nn.Module):
    def __init__(self, d_in, d_out, add_bias=True):
        # TODO
        super().__init__()
        self.proj_matrix = nn.parameter(torch.randn(d_out, d_in))
        if add_bias:
            self.bias = nn.parameter(torch.randn(d_out))
        else:
            self.bias = None
    def forward(self, x):
        # TODO
        x_out = self.proj_matrix @ x.transpose(1, 2)
        if self.bias is not None:
            x_out += x_out + self.bias[None, None, :]
        return x_out

class LayerNorm(nn.Module):
    def __init__(self, d, eps=1e-5):
        # TODO
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(d))
        self.eps = eps
    def forward(self, x):
        # TODO
        # x is (B, L, D)
        x_norm = x - x.mean( axis=2, keepdim=True)  #x_mean should be (B, L)
        var = x.var(axis=2, keepdim=True)
        x_norm = x_norm / self.gamma[None, None]
        return x_norm


In [17]:
x = torch.randn(2, 3, 10)

layernorm = LayerNorm(10)
out = layernorm(x)
out.mean(axis=-1)

tensor([[-1.1921e-08,  0.0000e+00, -5.9605e-09],
        [ 3.5763e-08,  1.4901e-09,  0.0000e+00]], grad_fn=<MeanBackward1>)

In [None]:
class Attention(nn.Module):
    def __init__(self, d, eps=1e-5):
        # TODO
        super().__init__()

    def forward(self, x):
        # TODO


In [19]:
# Try the model on a simple input:
model = LayerNorm(d=50)
x = torch.randint(0, 10000, (2, 10))
output = model(x)
print(output.shape)

RuntimeError: mean(): could not infer output dtype. Input dtype must be either a floating point or complex dtype. Got: Long

🚧 **TODO** 🚧

Update the code above such that it takes as argument an `attention_mask` for padding.

Use this `attention_mask` to modify the attention computation such that padding tokens do not interfere in the computation.

In [None]:
# TODO
INF = 1e10


class MaskedAttention(nn.Module):
    def __init__(self, d):
        # TODO

    def forward(self, x1, x2, pad_mask):
        # TODO


class MaskedClassAttention(ClassAttention):
    def __init__(self, d, n_classes, vocab_size):
        # TODO

    def forward(self, x, attention_mask):
        # TODO

🔴 **TEST**

In [None]:
model = MaskedClassAttention(d=50, n_classes=4, vocab_size=10000)

seq_len = 10
x_without_pad = torch.randint(0, 10000, (1, seq_len))
attention_mask = torch.ones(1, seq_len)
out_without_pad = model(x_without_pad, attention_mask)

x_with_pad = torch.randint(0, 10000, (1, 2 * seq_len))
attention_mask = torch.ones(1, 2 * seq_len)
attention_mask[:, seq_len:] = 0
out_with_pad = model(x_with_pad, attention_mask)

assert out_without_pad.shape == out_with_pad.shape

In [None]:
batch_size = 4
n_train = len(dataset["train"])
n_valid = len(dataset["test"])
data_collator = DataCollator(tokenizer)
train_dataloader = DataLoader(
    dataset["train"], batch_size=batch_size, collate_fn=data_collator, shuffle=True
)
valid_dataloader = DataLoader(
    dataset["test"], batch_size=batch_size, collate_fn=data_collator, shuffle=True
)

model = MaskedClassAttention(d=10, n_classes=4, vocab_size=len(tokenizer))
train(
    model,
    train_dataloader=train_dataloader,
    valid_dataloader=valid_dataloader,
)

# Self-attention

In this part, we will update the function to add a module of self-attention beforet the class attention.

In [None]:
class AlmostTransformer(ClassAttention):
    def __init__(self, d, n_classes, vocab_size):
        # TODO

    def forward(self, x, attention_mask):
        # TODO