# Transformer Model - Solutions

In this tutorial we will explore the Transformer architecture which is the fundamental building block of today's Large Language Models. A focus will be set on the transformer encoder including the self-attention layer. We will implement a sentiment classification model which predicts the binary sentiment of given movie reviews: positive or negative.


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import torch.optim as optim
import numpy as np
import tqdm as tqdm
import matplotlib.pyplot as plt
import pickle
from typing import List, Dict

#### Data Preparation

We start of by loading the movie reviews as well as the corresponding sentiment labels. The label ```y=0``` corresponds to negative sentiment, ```y=1``` corresponds to positive sentiment.

Investigate the printed samples to familiarize yourself with the data.

In [None]:
with open("data/movie_reviews.pkl", "rb") as fp:  
    texts = pickle.load(fp)
with open("data/movie_labels.pkl", "rb") as fp:
    labels = pickle.load(fp)

for text, label in zip(texts[:5], labels):
    print("Review:", text, "Sentiment:", label.item())

##### Tokenization
In the first processing step we want to tokenize the input sequences. We choose to use simple tokenization and just treat each individual word (seperated by a blank space) as a token. 
Implement the function ```tokenize_text``` which receives a list of movie reviews. 
Each review of type `str` is transformed into a list containing the seperate tokens of the sequence (i.e. a list with elements of type `str`). 

Further, add the classification token ```<cls>``` to the beginning of each token list.
This token will later be used as an indicator that a classification based on the subsequent tokens is queried and marks the position where the corresponding output (class prediction) will be computed (see below).

In [None]:
def tokenize_text(texts: List[str]) -> List[List[str]]:
    tokenized_texts = []

    ### BEGIN SOLUTION
    for text in texts:
        tokenized_text = text.split()
        tokenized_text.insert(0, "<cls>")
        tokenized_texts.append(tokenized_text)
    return tokenized_texts
    ### END SOLUTION

In [None]:
texts_tokenized = tokenize_text(texts)
assert len(texts_tokenized) == len(texts)

Next, we create a dictionary to encode the tokens to numeric values, which can be processed by the transformer model. 
To do so, we map each individual token to a unique index, and, at the same time, assemble the inverse mapping for decoding. 
Implement the function ```create_dictionaries``` which takes the list of tokenized input sequences and creates both, the ```encoding_dict``` as well as the ```decoding_dict``` which will be used to convert tokens to indices and vice versa. 
The classification token ```<cls>``` is assigned the index ```0```. 
Also add a padding token (`pad`) and a token for unknown words (```<unk>```) to the dictionaries and assign the highest indices to them.
The padding token will be used to fill shorter sequences to identical lengths, which is necessary as the model requires all input sequences to be of identical lengths.

In [None]:
def create_dictionaries(texts_tokenized):
    encoding_dict = {}
    decoding_dict = {}
    encoding_dict["<cls>"] = 0
    decoding_dict[0] = "<cls>"
    idx = 1

    ### BEGIN SOLUTION
    for tokenized_text in texts_tokenized:
        for token in tokenized_text:
            if token not in encoding_dict:
                encoding_dict[token] = idx
                decoding_dict[idx] = token
                idx +=1
    encoding_dict["<pad>"] = idx
    decoding_dict[idx] = "<pad>"
    encoding_dict["<unk>"] = idx+1
    decoding_dict[idx+1] = "<unk>"
    ### END SOLUTION
    return encoding_dict, decoding_dict

In [None]:
encoding_dict, decoding_dict = create_dictionaries(texts_tokenized)
assert "film" == decoding_dict[encoding_dict["film"]]
assert "music" == decoding_dict[encoding_dict["music"]]

The encoding dictionary can be used to convert the tokenized training text to numerical values, which can be used as input to our language model.

Implement the function `encode_token_sequence(tokenized_sequence: List[str], encoding_dict:Dict[str, int], default_token = "<unk>")` which encodes a sequence of tokens based on the given dictionary.
Whenever the sequence contains a token which is not contained in the encoding dictionary, use the default token instead.

In [None]:
def encode_token_sequence(tokenized_sequence: List[str], encoding_dict: Dict[str, int], default_token = "<unk>", max_length=8):
    # query default token encoding once
    default_token_encoded = encoding_dict[default_token]
    encoded_sequence = []

    ### BEGIN SOLUTION
    for i in range(max_length):
        if i < len(tokenized_sequence):
            token = tokenized_sequence[i]
        else:
            token = "<pad>"
        if token in encoding_dict:
            encoded_token = encoding_dict[token]
        else:
            encoded_token = default_token_encoded
        encoded_sequence.append(encoded_token)
    ### END SOLUTION
    return encoded_sequence

To convert a given sequence of indices back to tokenized text, implement the function ```decode_sequence(encoded_sequence: List[int], decoding_dict: Dict[int, str])``` which decodes a sequence of indices based on the given ```decoding_dict```.

In [None]:
def decode_sequence(encoded_sequence: List[int], decoding_dict: Dict[int, str]):
    ### BEGIN SOLUTION
    decoded_sequence = []
    for token in encoded_sequence:
        decoded_sequence.append(decoding_dict[token])
    ### END SOLUTION
    return decoded_sequence

We use ```encode_token_sequence``` to convert the tokenized input sequences ```texts_tokenized``` to sequences of indices.

In [None]:
encoded_texts = []
for tokenized_sequence in texts_tokenized:
    encoded = encode_token_sequence(tokenized_sequence, encoding_dict)
    encoded_texts.append(encoded)

#### Self-Attention Implementation
##### Scaled dot-product attention
After having performed the essential data pre-processing we will now implement the self-attention layer, which is the heart of both transformer encoder and transformer decoder. 
Therefore, we start off by defining the function ```scaled_dot_product_attention``` which receives ```query```, ```key``` and ```value``` tensors and returns the weighted sum of the ```value``` tensors according to the scaled dot-product formulation introduced in the lecture:
$$\text{attn}(Q,K,V)=\text{softmax} \bigg( \dfrac{QK^T}{\sqrt{d_k}} \bigg) V$$


In [None]:
def scaled_dot_product_attention(query: torch.Tensor, key: torch.Tensor, value: torch.Tensor):
    ### BEGIN SOLUTION
    scale_factor = 1 / math.sqrt(key.size(-1)) # factor for scaling the dot-product
    scores = query @ key.transpose(-2, -1)
    scores = scores * scale_factor
    scores = F.softmax(scores, dim=-1)
    weighted_values = scores @ value # weighted sum of value tensors
    ### END SOLUTION
    return weighted_values, scores

In [None]:
sequence_length = 4
dk = 128 # dimension of key tensors = dimension of query tensors
dv = 64 # dimension of value tensors
query = torch.randn(sequence_length, dk)
key = torch.randn(sequence_length, dk)
value = torch.randn(sequence_length, dv)

weighted_values, scores = scaled_dot_product_attention(query, key, value)
assert weighted_values.size(-1) == dv

##### SelfAttentionLayer
We continue to use the previously defined function ```scaled_dot_product_attention``` to implement a single layer of the Transformer Encoder. We define the class ```TransformerEncoderLayer``` which inherits from ```torch.nn.Module```. In the ```__init__``` method we first initialize the learnable weight matrices ```self.wq```, ```self.wk``` and ```self.wv``` for computing ```key```, ```query``` and ```value``` vectors in the ```forward``` method.

Take a moment to reconsider that this projection of keys, queries and values can in fact be done using PyTorch's linear layer (as the computations of a neural network layer without activation function are basically a multiplication of the input vector by the layer's parameter-matrix).



<img src="data/encoder_layer.png" 
        alt="Picture" 
        style="display: block; margin: 0 auto" />

In [None]:
class TransformerEncoderLayer(torch.nn.Module):
    def __init__(self, dmodel: int, dk: int, dv: int):
        super().__init__()
        # initialize key, query and value weight matrices for self-attention layer
        ### START SOLUTION
        self.wq = nn.Linear(dmodel, dk)
        self.wk = nn.Linear(dmodel, dk)
        self.wv = nn.Linear(dmodel, dv)
        ### END SOLUTION

        # initialize layer norm and feed-forward network
        self.layer_norm1 = nn.LayerNorm(dmodel)
        self.linear1 = nn.Linear(dmodel, 1024)
        self.linear2 = nn.Linear(1024, dmodel)
        self.layer_norm2 = nn.LayerNorm(dmodel)

    def forward(self, x: torch.Tensor):
        ### START SOLUTION
        # compute self-attention
        query = self.wq(x)
        key = self.wk(x)
        value = self.wv(x)
        weighted_values, scores = scaled_dot_product_attention(query, key, value)
        
        # add and normalize
        x = x + weighted_values
        x_res = self.layer_norm1(x)

        # feed-forward network
        x = self.linear1(x_res)
        x = F.relu(x)
        x = self.linear2(x)

        # add and normalize
        x = x + x_res
        z = self.layer_norm2(x)
        ### END SOLUTION
        return z, scores

Before using the ```TransformerEncoderLayer``` to implement the final model for the sentiment prediction we have to make sure that the model is able to reason about the positions of the individual tokens in the input sequence. In the lecture we introduced the positional encodings which are added to the token embeddings prior to being passed to the first encoder layer. These encodings are usually following fixed patterns (e.g. sine or cosine functions) which are depending on the respective position in the input sequence. By adding them to the token embeddings the model is able to learn about the absolute or relative positions of the individual tokens in the sequence. In this excercise we implement a positional encoding which follows a sine/cosine pattern. 
We first initialize the empty matrix $P$ (called ```positional_encoding``` in the code below) of size ```(max_seq_length)```. Next, we start filling the matrix from the first row up to the ```max_length``` row:

$$P(k, 2i) = \sin(\dfrac{k}{n^{2i/d}})$$
$$P(k, 2i + 1) = \cos(\dfrac{k}{n^{2i/d}})$$

$$ d: \text{encoder model dimension}$$
$$ k: \text{position of a token in the input sequence} $$
$$ n: \text{user defined scaler}  $$
$$ i: \text{mapping to column indices} \quad (0 \leq i < d/2)$$

In [None]:
torch.manual_seed(14)
torch.cuda.manual_seed(14)
dmodel = 128
dk = 128
dv = 128

In [None]:
def generate_positional_encoding(max_seq_length: int, d: int, n: int) -> torch.Tensor:
    ### BEGIN SOLUTION
    positional_encoding = np.zeros((max_seq_length, d))
    for k in np.arange(max_length):
        for i in np.arange(d // 2):
            theta = k / (n ** ((2 * i) / dmodel))
            positional_encoding[k, 2 * i] = math.sin(theta)
            positional_encoding[k, 2 * i + 1] = math.cos(theta)
    ### BEGIN SOLUTION
    return torch.tensor(positional_encoding, dtype=torch.float32)


# maximum sequence length
max_length = 32
encodings = generate_positional_encoding(max_length, dmodel, 32)

In [None]:
fig, axes = plt.subplots(1, 1, figsize=(24, 24))
axes.imshow(encodings)

With the ```TransformerEncoderLayer``` and the positional encodings implemented we are now ready to set up the entire model for predicting the sentiment of given movie reviews. In the ```__init__``` method of our ```SentimentPredictionModel``` we initialize two instances ```self.encoder_layer1``` and ```self.encoder_layer2``` of the ```TransformerEncoderLayer```. In the ```forward``` method we first sum the positional encodings to the token embeddings. These embeddings are then passed through both encoder layers. 

For our goal of sentiment classification, we will train the model's output in position of the classification token ```<cls>``` (which is at the first position in our input sequence) to contain the class prediction for the complete sequence. 
Through the self-attention layer, the encoding of the ```<cls>``` should be able to attend to all other tokens of the input sequence and can therefore access all the relevant information of the sequence to classify. 
The final prediction head computes the class logits for both negative and positive sentiment based on that model output for the classification indicator `<cls>`.
These logits are returned for loss computation.

(The model output for all other positions in the sequence are not relevant in this case.
During training, no loss is computed for these outputs and, thus, they can not be expected to contain meaningful information.) 

In [None]:
class SentimentPredictionModel(nn.Module):
    def __init__(self, vocab_size: int, dmodel: int, dk: int, dv: int, positional_encodings: torch.Tensor):
        super().__init__()
        self.positional_encodings = positional_encodings
        self.embeddings = torch.nn.Embedding(vocab_size, dmodel)
        ### BEGIN SOLUTION
        self.encoder_layer1 = TransformerEncoderLayer(dmodel, dk, dv)
        self.encoder_layer2 = TransformerEncoderLayer(dmodel, dk, dv)
        ### END SOLUTION
        
        self.prediction_head1 = nn.Linear(dmodel, 16)
        self.prediction_head2 = nn.Linear(16, 2)

    def forward(self, x):
        layer_scores = []
        ### BEGIN SOLUTION
        x_emb = self.embeddings(x).view((x.shape[0], -1))
        positional_encodings = self.positional_encodings[:x_emb.size(0)]

        x = x_emb + positional_encodings
        z, scores = self.encoder_layer1(x)
        layer_scores.append(scores)
        z, scores = self.encoder_layer2(z)
        layer_scores.append(scores)
        ### END SOLUTION

        z_class = z[0]
        pred = F.relu(self.prediction_head1(z_class))
        class_logits = self.prediction_head2(pred)
        return class_logits, layer_scores


##### Sentiment Classification

In the final step we can now train our sentiment classification model on the movie review dataset. Therefore we split the entire dataset into a training and a validation dataset and instantiate DataLoader instances for both: ```train_loader``` and ```val_loader```.

In [None]:
model = SentimentPredictionModel(len(encoding_dict), dmodel, dk, dv, encodings)
lr = 1e-5
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
device = 'cpu'

train_idx = int(len(encoded_texts) * 0.5)
encoded_texts_train = torch.tensor(encoded_texts[:train_idx])
labels_train = labels[:train_idx]
encoded_texts_val = torch.tensor(encoded_texts[train_idx:])
labels_val = labels[train_idx:]

In [None]:
train_dataset = torch.utils.data.TensorDataset(encoded_texts_train, labels_train)
val_dataset = torch.utils.data.TensorDataset(encoded_texts_val, labels_val)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1)
val_loader = torch.utils.data.DataLoader(val_dataset)

Inside the training loop, please implement the core training functionalities:
 * extract the movie review texts and the corresponding sentiment labels
 * pass the batch of review texts through the model to obtain the prediction ```out``` and the attention scores ```scores```
 * use the prediction and the groundtruth label to compute the ```loss``` value
 * execute one step of gradient descent (remember to call `zero_grad()` before executing the loss' `backward()` pass and computing the weight updates via `step()`)
 * add the loss computed after the forward pass of the current batch to ```episode_loss_train```.

In [None]:
n_episodes = 100
episode_losses_train = []
episode_losses_val = []

for episode in range(n_episodes):
    episode_loss_train = 0
    episode_loss_val = 0
    model.train()
    for batch_idx, sample in enumerate(train_loader):
        sample_x, sample_y = sample
        ### START SOLUTION
        out, scores = model(sample_x.squeeze())
        loss = criterion(out, sample_y[0])
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        episode_loss_train += loss.item()
        #### END SOLUTION
        
    model.eval()
    for batch_idx, sample in enumerate(val_loader):
        sample_x, sample_y = sample
        out, scores = model(sample_x.squeeze())
        loss = criterion(out, sample_y[0])
        episode_loss_val += loss.item()
    episode_losses_train.append(episode_loss_train / len(encoded_texts_train))
    episode_losses_val.append(episode_loss_val / len(encoded_texts_val))

##### Loss Visualization

Looking at the training and validation loss, what behaviour can you observe?

In [None]:
fig, axes = plt.subplots(1, 1)
axes.plot(episode_losses_train)
axes.plot(episode_losses_val)

##### Score Visualization

The following code cells produce a visualization of the scores computed in the self-attention mechanism for a single training sample.

If you focus on the first row in the matrix-shaped color based visualization (the row for the `<cls>`-token), you can identify the sequence's tokens which the classifier attends to most.

Visualize the attention scores for different data samples, investigate how they vary and which words seem to be important for the model's classification decision.

In [None]:
encoded_text = list(encoded_texts_train[13].tolist())
print(encoded_text)
decoded_text = decode_sequence(encoded_text, decoding_dict)
print(decoded_text)
out, scores = model(torch.tensor(encoded_text))

In [None]:
fig, axes = plt.subplots(1, 1)
im = axes.imshow(scores[0].detach())
plt.colorbar(im)