# ISTA: Applied Deep Learning for Scientists -- Homework 2

**Topics**: Language processing, Sequential models and Transformers

This homework consists of two theoretical questions and one practical question.

For the theoretical questions, you can type out the answers in cells dedicated for the answers (Markdown supports LaTeX equations).
For the practical question, please fill in the missing blocks of code.
Please submit the homeworks via email to all the TAs.

Deadline: March 25, 2024, 11:55 PM.

TAs: Dingling Yao (dingling.yao@ist.ac.at), Valentino Maiorca (valentino.maiorca@ist.ac.at), Sanketh Vedula (sanketh@campus.technion.ac.il)

Name: < your name >

ID: < your ID >

## Question 1 -- Bigram

Consider the following toy example (similar to the one from Jurafsky & Martin (2015)):
Training data:

- < s > I am Sam < /s >
- < s > Sam I am < /s >
- < s > Sam I like < /s >  
- < s > Sam I do like < /s >
- < s > do I like Sam < /s >


Assume that we use a bigram language model based on the above training data.

1. Compute the Bigram probabilities as a table (as shown in the lecture).


|  p( col / row) | < s > | I | am | Sam | like | do | < /s > |
|--------|-------|---|----|-----|------|----|--------|
| < s >  |       |   |    |     |      |    |        |
| I      |       |   |    |     |      |    |        |
| am     |       |   |    |     |      |    |        |
| Sam    |       |   |    |     |      |    |        |
| like   |       |   |    |     |      |    |        |
| do     |       |   |    |     |      |    |        |
| < /s > |       |   |    |     |      |    |        |

2. Which of the following is the most probable sentence according to the model? Provide your arguments

  (1) < s > Sam I do I like < /s >

  (2) < s > Sam I am < /s >

  (3) < s > I do like Sam I am < /s >

### **TODO**: Answer for question 1

# Question 2 -- Self-attention

In a self-attention layer without trainable parameters (thus no QKV projection), an interesting behavior is observed. For an input sequence of length n, where each vector is uniformly drawn from a unit d-dimensional sphere and d is significantly larger than n (d ≫ n), the output sequence is nearly identical to the input. Why does this phenomenon occur? Please explain your reasoning.

### **TODO**: Answer for question 2

# Question 3: Coding

In [None]:
!pip install datasets lightning gensim transformers nltk

In [None]:
from datasets import load_dataset, DatasetDict

In [None]:
dataset = load_dataset("imdb")
dataset

In [None]:
# Split the train dataset into train and validation preserving the same distribution of labels as the original one
fit_data = dataset["train"].train_test_split(
    test_size=0.1, train_size=0.4, seed=42, stratify_by_column="label"
)
train_data, val_data = fit_data["train"], fit_data["test"]
test_data = dataset["test"].train_test_split(
    test_size=0.4, seed=42, stratify_by_column="label"
)["test"]

data = DatasetDict({"train": train_data, "val": val_data, "test": test_data})
data

In [None]:
sample = data["train"][0]
sample

In [None]:
data["train"].features["label"], data["train"].features["label"].int2str(1)

In [None]:
from torch import nn
import torch
import torch.nn.functional as F
from typing import Mapping, Sequence, List, Any
import typing as ty
from collections import Counter
import re
import string
import nltk

nltk.download("stopwords")

In [None]:
def simple_tokenize(text: str) -> List[str]:
    """A simple function to tokenize a given text."""
    text = text.lower()
    text = re.sub(f"[{re.escape(string.punctuation)}]", "", text)
    tokens = text.split()
    return tokens

In [None]:
data = data.map(lambda x: {**x, "tokenized": simple_tokenize(x["text"][:1000])})
data

In [None]:
data["train"][0]  # Now we have a new column called "tokenized" with the tokenized text

In [None]:
def token_count(tokenized_texts: Sequence[Sequence[str]]) -> Counter[str]:
    """Count the number of times each token appears in the tokenized texts."""
    tokens = [token for text in tokenized_texts for token in text]
    token2count = Counter(tokens)
    return token2count

In [None]:
token2count: ty.Counter = token_count(data["train"]["tokenized"])
f"Total number of unique tokens: {len(token2count)}"

In [None]:
token2count.most_common(10)
# they are not very informative, are they? That's because we have not removed the stop words https://en.wikipedia.org/wiki/Stop_word (yet)

In [None]:
from nltk.corpus import stopwords

en_stopwords = set(stopwords.words("english"))

In [None]:
# some special tokens we will use
PAD_TOKEN_STR = "<pad>"  # this is used to pad the sequences to the same length
UNK_TOKEN_STR = (
    "<unk>"  # this is used to represent tokens that are not in the vocabulary
)

In [None]:
token2count = Counter(
    {token: count for token, count in token2count.items() if token not in en_stopwords}
)

In [None]:
# we will restrict the vocabulary to the 10k most common tokens (which is ~10% of the total number of unique tokens)
vocab_size = 7_000
vocab = {
    word: index
    for index, word in enumerate(
        [PAD_TOKEN_STR, UNK_TOKEN_STR]
        + [token for token, _ in token2count.most_common(vocab_size)]
    )
}
list(vocab.items())[:10]

In [None]:
data["train"][0]

## Encode the tokens using end2end trained embeddings


In [None]:
encoding_dim: int = 100

In [None]:
class TextEncoder(nn.Module):
    def __init__(
        self,
        token2index: Mapping[str, int],
        encoding_dim: int,
        oov_token: str = UNK_TOKEN_STR,
        pad_token: str = PAD_TOKEN_STR,
        aggregation: str = "mean",
    ):
        super().__init__()
        assert pad_token in token2index
        assert oov_token in token2index

        self.token2index = token2index
        self.oov_index = self.token2index[oov_token]

        # initialize the token weights randomly
        token_embeddings = torch.randn(len(token2index), encoding_dim)
        token_embeddings[token2index[oov_token]] = token_embeddings.mean(dim=0)
        token_embeddings[token2index[pad_token]] = 0
        self.aggregation = aggregation

        self.token_embeddings = nn.Parameter(token_embeddings)

    @classmethod
    def collate_fn(cls, batch):
        texts = [sample["text"] for sample in batch]

        token_ids = [
            torch.as_tensor(
                [
                    vocab.get(token, vocab[UNK_TOKEN_STR])
                    for token in sample["tokenized"]
                ]
            )
            for sample in batch
        ]

        max_len = max(len(sample) for sample in token_ids)
        token_ids = torch.stack(
            [F.pad(sample, (0, max_len - len(sample))) for sample in token_ids]
        )

        labels = [sample["label"] for sample in batch]

        return {
            "texts": texts,
            "token_ids": token_ids,
            "labels": torch.as_tensor(labels),
        }

    def forward(self, batch: Mapping[str, Any]) -> torch.Tensor:
        token_ids = batch["token_ids"]  # (batch_size, max_len)
        text_encodings = self.token_embeddings[
            token_ids
        ]  # (batch_size, max_len, space_dim)
        # now we could aggregate the encodings for each text, to get a single encoding for each one...
        if self.aggregation == "mean":
            # ...a way to do this is to sum the encodings and divide by the number of (non-padding) tokens
            mask = token_ids != self.token2index[PAD_TOKEN_STR]
            text_lengths = mask.sum(dim=1).unsqueeze(1).float()
            text_encodings = text_encodings.sum(dim=1) / text_lengths
        elif self.aggregation == None:
            # ...or we could just return the encodings for each token
            pass
        else:
            raise ValueError(
                f"Invalid aggregation: {self.aggregation}. Are you implementing a new one? :]"
            )
        return text_encodings

In [None]:
encoder = TextEncoder(token2index=vocab, encoding_dim=encoding_dim)

In [None]:
from torch.utils.data import DataLoader
import lightning as pl
from torchmetrics import MetricCollection, Accuracy

In [None]:
class SimpleClassifier(pl.LightningModule):
    def __init__(
        self,
        encoder: nn.Module,
        encoding_dim: int,
        n_classes: int,
        train_encoder: bool = True,
    ):
        super().__init__()
        if not train_encoder:
            for param in encoder.parameters():
                param.requires_grad = False
        self.train_encoder = train_encoder
        self.encoder = encoder
        # we will use a linear probe to classify the encoded texts
        self.classifier = nn.Linear(encoding_dim, n_classes)

        # some metrics
        self.train_metrics = MetricCollection(
            {"accuracy": Accuracy(task="multiclass", num_classes=n_classes)}
        )
        self.val_metrics = self.train_metrics.clone()
        self.test_metrics = self.train_metrics.clone()

    def on_fit_start(self):
        if not self.train_encoder:
            self.encoder.eval()

    def forward(self, texts: Sequence[Sequence[str]]) -> torch.Tensor:
        # get a single encoding for each text
        encoded = self.encoder(texts)
        # and then classify it with the linear probe
        return self.classifier(encoded)

    def _step(self, batch, batch_idx, stage: str):
        logits = self(batch)
        loss = F.cross_entropy(logits, batch["labels"].to(self.device))

        self.log(f"{stage}_loss", loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log_dict(
            getattr(self, f"{stage}_metrics")(logits, batch["labels"]),
            on_step=stage == "train",
            on_epoch=True,
            prog_bar=stage == "train",
        )

        return loss

    def training_step(self, batch, batch_idx):
        return self._step(batch, batch_idx, stage="train")

    def validation_step(self, batch, batch_idx):
        return self._step(batch, batch_idx, stage="val")

    def test_step(self, batch, batch_idx):
        return self._step(batch, batch_idx, stage="test")

    def configure_optimizers(self):
        # Adam-driven deep learning
        return torch.optim.Adam(self.parameters(), lr=1e-3)

In [None]:
model = SimpleClassifier(
    encoder,
    encoding_dim=encoding_dim,
    n_classes=train_data.features["label"].num_classes,
)

In [None]:
model.eval()
sentence = "This movie is great, the best I've seen!"
tokenizer_out = simple_tokenize(sentence)
with torch.no_grad():
    token_ids = torch.as_tensor(
        [vocab.get(token, vocab[UNK_TOKEN_STR]) for token in tokenizer_out]
    )
    logits = model.classifier(model.encoder({"token_ids": token_ids[None]}))
    probs = logits.softmax(dim=-1)
    print(
        f"The class is {train_data.features['label'].int2str(probs.argmax().item())} with a probability of {probs.max().item():.2%}"
    )

In [None]:
def train_model(
    model,
    collate_fn,
    max_epochs: int,
    batch_size: int,
    num_workers: int = 1,
    accelerator: str = "gpu",
):
    train_loader = DataLoader(
        data["train"],
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_fn,
        num_workers=num_workers,
        pin_memory=True,
    )
    val_loader = DataLoader(
        data["val"],
        batch_size=batch_size,
        shuffle=False,
        collate_fn=collate_fn,
        num_workers=num_workers,
        pin_memory=True,
    )
    test_loader = DataLoader(
        data["test"],
        batch_size=batch_size,
        shuffle=False,
        collate_fn=collate_fn,
        num_workers=num_workers,
        pin_memory=True,
    )

    trainer = pl.Trainer(max_epochs=max_epochs, accelerator=accelerator)
    trainer.fit(model=model, train_dataloaders=train_loader, val_dataloaders=val_loader)
    trainer.test(model=model, dataloaders=test_loader)

In [None]:
train_model(
    model=model,
    collate_fn=encoder.collate_fn,
    max_epochs=3,
    batch_size=32,
    num_workers=2,
    accelerator="auto",
)

### Some qualitative evaluation of the obtained encodings


In [None]:
model.eval()
sentence = "This movie is great, the best I've seen!"
tokenizer_out = simple_tokenize(sentence)
with torch.no_grad():
    token_ids = torch.as_tensor(
        [vocab.get(token, vocab[UNK_TOKEN_STR]) for token in tokenizer_out]
    )
    logits = model.classifier(model.encoder({"token_ids": token_ids[None]}))
    probs = logits.softmax(dim=-1)
    print(
        f"The class is {train_data.features['label'].int2str(probs.argmax().item())} with a probability of {probs.max().item():.2%}"
    )

In [None]:
# the text encoder is so simple we can directly apply the classifier on top of the word encodings
token_scores = model.classifier.cpu()(model.encoder.token_embeddings.detach().cpu())
token_scores = token_scores.softmax(dim=-1)
token_scores.shape

In [None]:
index2token = {index: token for token, index in vocab.items()}

In [None]:
# most_negative_tokens
most_negative_ids = (
    token_scores[:, 0].topk(20).indices.tolist()
)  # the first dimension corresponds to the positive class
most_negative_tokens = [index2token[index] for index in most_negative_ids]
most_negative_tokens

In [None]:
# most_positive_tokens
most_positive_ids = (
    token_scores[:, 1].topk(20).indices.tolist()
)  # the second dimension corresponds to the negative class
most_positive_tokens = [index2token[index] for index in most_positive_ids]
most_positive_tokens

## Encode the tokens using end2end trained embeddings, but with an RNN as pooling operation instead of a simple mean

In [None]:
class RNNEncoder(TextEncoder):
    def __init__(
        self,
        token2index: Mapping[str, int],
        encoding_dim: int,
        hidden_dim: int,
        oov_token: str = UNK_TOKEN_STR,
        pad_token: str = PAD_TOKEN_STR,
        rnn_type: str = "lstm",
    ):
        super().__init__(
            token2index, encoding_dim, oov_token, pad_token, aggregation="rnn"
        )
        rnn_type = rnn_type.lower()
        assert rnn_type in {"lstm", "gru"}
        self.rnn_type = rnn_type
        self.hidden_dim = hidden_dim
        self.rnn = getattr(nn, rnn_type.upper())(
            encoding_dim, hidden_dim, batch_first=True
        )

    def forward(self, batch: Mapping[str, Any]) -> torch.Tensor:
        token_ids = batch["token_ids"]  # (batch_size, max_len)
        mask = token_ids != self.token2index[PAD_TOKEN_STR]
        text_lengths = mask.sum(dim=1)

        token_encodings = self.token_embeddings[token_ids]
        packed_encodings = nn.utils.rnn.pack_padded_sequence(
            token_encodings, text_lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        packed_output, (hidden, cell) = self.rnn(packed_encodings)
        return hidden[-1]

In [None]:
model = SimpleClassifier(
    RNNEncoder(token2index=vocab, encoding_dim=encoding_dim, hidden_dim=100),
    encoding_dim=100,
    n_classes=train_data.features["label"].num_classes,
)
model

In [None]:
train_model(
    model=model,
    collate_fn=encoder.collate_fn,
    max_epochs=3,
    batch_size=32,
    num_workers=2,
    accelerator="auto",
)

## Encode the tokens using pre-trained vectors from GloVe

In [None]:
import gensim.downloader as api

# loading the pretrained word embeddings using gensim
gensim_model = api.load("glove-wiki-gigaword-100")

In [None]:
# an example of how to retrieve the embedding of a single word
gensim_model["movie"]

In [None]:
class GensimEncoder(TextEncoder):
    def __init__(
        self,
        token2index: Mapping[str, int],
        gensim_model,
        oov_token: str = UNK_TOKEN_STR,
        pad_token: str = PAD_TOKEN_STR,
        aggregation: str = "mean",
    ):
        super().__init__(
            token2index, 100, oov_token, pad_token, aggregation=aggregation
        )

        # OOV vector
        oov_vector = torch.as_tensor(gensim_model["unk"])

        token_embeddings = []
        token_embeddings.append(torch.zeros_like(oov_vector))
        token_embeddings.append(oov_vector)
        # we are skipping the first two since they are the special PAD and UNK tokens
        for token, index in list(token2index.items())[2:]:
            # we want to map each token to its token embedding according to the gensim_model
            # we append each embedding (remember to convert to torch.Tensor) to the token_embeddings list
            if token in gensim_model:
                ...  # your code here
            else:
                # the token is not found in the gensim_model, we treat it as an OOV token
                token_embeddings.append(oov_vector)
                self.token2index[token] = self.oov_index

        # then we convert the token_embeddings list of tensors to a single tensor
        token_embeddings = torch.stack(token_embeddings, dim=0)

        # requires_grad set to False means we are not training those parameters
        self.token_embeddings = nn.Parameter(token_embeddings, requires_grad=False)

In [None]:
encoder = GensimEncoder(token2index=vocab, gensim_model=gensim_model)
model = SimpleClassifier(
    encoder,
    encoding_dim=100,
    n_classes=train_data.features["label"].num_classes,
)
model

In [None]:
train_model(
    model=model,
    collate_fn=encoder.collate_fn,
    max_epochs=3,
    batch_size=32,
    num_workers=2,
    accelerator="auto",
)

## Encode the tokens using pre-trained transformers

In [None]:
from transformers import AutoTokenizer, AutoModel

In [None]:
# Let's see an example on how to use a pre-trained model to encode texts
# in this case, we load the tokenizer and the transformer from the Hugging Face model hub
# each model has its own tokenizer, compatible with the model's architecture/input format
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
model = AutoModel.from_pretrained("bert-base-cased")

sentence = "This movie is great, the best I've seen!"
tokenizer_out = tokenizer(sentence, return_tensors="pt")
print(f"Tokenizer output: {tokenizer_out}\n")

print(
    f"The sentence is tokenized as: {tokenizer.convert_ids_to_tokens(tokenizer_out['input_ids'][0])}"
)
print(f"With a total of {len(tokenizer_out['input_ids'][0])} tokens\n")
# Notice how the the first token is the special token [CLS] and the last one is [SEP]


model_out = model(
    **tokenizer_out
)  # we can pass the tokenizer output directly to the model
sentence_encoding = model_out.last_hidden_state
print(
    f"Sentence encoding: {sentence_encoding.shape}"
)  # (batch_size, sequence_length, hidden_size)

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(
        self,
        hf_model_name: str,
    ):
        super().__init__()
        self.tokenizer = ...  # fill this with the right tokenizer
        self.transformer = ...  # fill this with the right transformer

        # we don't want to train the transformer, so we freeze its parameters and put it in eval mode
        self.transformer.requires_grad_(False).eval()

    def collate_fn(self, batch):
        """You can ignore this method, it is automatically used to prepare the batch for the forward method."""

        texts = [sample["text"] for sample in batch]
        tokenizer_out = self.tokenizer(
            texts, padding=True, truncation=True, return_tensors="pt", max_length=150
        )
        labels = [sample["label"] for sample in batch]
        return {
            "texts": texts,
            "tokenizer_out": tokenizer_out,
            "labels": torch.as_tensor(labels),
        }

    def forward(self, batch: Mapping[str, Any]) -> torch.Tensor:
        """We want this function to return the encoding of each sentence in the batch.
        The encoding will be selected from the last hidden state of the model output.
        From the last hidden state, we will select the encoding corresponding to the [CLS] token for each sentence.
        Then, we will return the encodings as a tensor of shape (batch_size, hidden_size), ready to be used by the classifier."""

        # the batch here already contains the output of the tokenizer
        # it is associated with the "tokenizer_out" key in the batch dictionary
        model_out = (
            ...
        )  # call the transformer with the tokenizer output, selecting the last hidden state for each sentence
        # the shape of model_out should be (batch_size, sequence_length, hidden_size)

        sentence_encoding = (
            ...
        )  # then select the encoding corresponding to the [CLS] token for each sentence
        # the shape of the sentence_encoding should be (batch_size, hidden_size)

        return sentence_encoding

In [None]:
model_name = "roberta-base"
encoder = TransformerEncoder(hf_model_name=model_name)
encoding_dim = encoder.transformer.config.hidden_size

model = SimpleClassifier(
    TransformerEncoder(hf_model_name=model_name),
    encoding_dim=encoding_dim,
    n_classes=train_data.features["label"].num_classes,
    train_encoder=False,
)
model

In [None]:
train_model(
    model=model,
    collate_fn=encoder.collate_fn,
    max_epochs=3,
    batch_size=64,
    num_workers=2,
    accelerator="auto",
)