# Exercise 8: Transformers

## Preamble
The following code downloads and imports all necessary files and modules into the virtual machine of Colab. Please make sure to execute it before solving this exercise. This mandatory preamble will be found on all exercise sheets.

In [None]:
import sys, os
if 'google.colab' in sys.modules:
  if os.getcwd() == '/content':
    !git clone 'https://github.com/inb-luebeck/cs4405.git'
    os.chdir('cs4405')

!pip install transformers==4.47.1 accelerate
from utils import utils_8 as utils
from transformers.modeling_utils import PreTrainedModel
import torch
from torch import nn
if torch.cuda.is_available():
    torch.set_default_device('cuda')
torch.set_default_dtype(torch.bfloat16)

phi3config = utils.get_config()
tokenizer = utils.get_tokenizer()

## Note
The model implementation used in this exercise is based on [Microsoft's Phi-3](https://arxiv.org/abs/2404.14219).

The model code in this exercise includes blocks that are not essential for a fundamental understanding of the transformer architecture. Those blocks are marked with `### OPTIMIZATION` and can be ignored by the reader for a better overview of the model at large.

# Language Modeling Basics
## Tokenization
The input of a (large) language model (LLM) is a sequence of word/token IDs, which are their indices in a predefined dictionary. Likewise, a sequence can be decoded back into text by looking up the words/word fragments corresponding to the IDs and concatenating them.

The dictionary tokens are defined based on the frequency of that character sequence in a given language or dataset.

Here is a demonstration for tokenization:

In [None]:
text = "A simplified example for tokenization"

token_ids = tokenizer(text, add_special_tokens=False).input_ids
print("Token IDs:", token_ids, "\n")

# Decoding tokens one by one to show which word fragments are encoded as individual tokens
decoded_tokens = tokenizer.convert_ids_to_tokens(token_ids)
print("List of decoded tokens:", decoded_tokens, "\n")

# Decoding the text as a whole from the token ID sequence
decoded_text = tokenizer.decode(token_ids)
print(f"Decoded text: '{decoded_text}'")

## Embeddings
For every entry in the token dictionary the model holds a trainable representation vector. These vectors are initialized with random values, just like other neural network parameters, and are trained with the rest of the model via backpropagation.

The trainable representation vectors are called embeddings and they are stored in a matrix (we will call it the embedding matrix), with each row corresponding to one token ID (one entry in the token dictionary). The embedding matrix can be interpreted as a lookup table.

For this LLM, we use:
 - a tokenizer with a dictionary size of $32064$ tokens
 - embedding vectors of dimension $3072$

## Model Output
For the language modeling task, the model is trained to predict the next token. The prediction gives every token in the dictionary a score based on its likelihood. This is treated like a classification where each token in the dictionary is interpreted as a class, and the ground truth label is the actual next token ID in the sequence.

In the case of decoder-only transformer-based language models (like the model used in this exercise) the model simultaneously produces a prediction for **every** token in the input sequence. I.e. the $i$-th vector in the output list contains the probability distribution for the $(i+1)$-th token, $i\le n$ for an input sequence of length $n$.

## Transformers
Transformer network architectures use one or more transformer blocks in sequence. Each transformer block takes a list of feature vectors $[in_0, in_1, \dots, in_n]$ as input and produces a list (with the same length) of new feature vectors as output $[out_0, out_1, \dots, out_n]$. Every vector of the output list is a new representation of the input vector at the same position in the list, but its representation is enriched with relational information between it and the other feature vectors.

The input feature vectors $[in_0, in_1, \dots, in_n]$ for the first transformer block are semantic units from the input sample, e.g. words or tokens in a text or image regions/patches in an image, or different images from an image series or video.

The new representations can be modeled via a transformation function $f$ such that $out_i = f(in_i, [in_0, in_1, \dots, in_n])$. The first argument, $in_i$, is used to produce a `query` and the second argument serves as `keys` and `values`. The intuition being that the `query` vector states which kind of information we are seeking in the other feature vectors, the `keys` encoding what information can be found in a feature vector, and the `values` representing the detailed information.

For the transformer architecture the transformation function $f$ is modeled via self-attention or cross-attention (see lecture notes) to determine relevant information from other feature vectors for a feature vector $in_i$, followed by a multi-layer-perceptron (MLP) to process how the feature vectors are related and how to represent that relation in a new representation $out_i$.

### Self-Attention
For a list of feature vectors $[in_0, in_1, \dots, in_n]$ query, key and value vectors are calculated via a transformation matrices $W_Q$, $W_K$ and $W_V$ as follows:
$$q_i = W_Q\times in_i,$$

$$k_i = W_K\times in_i,$$

$$v_i = W_K\times in_i.$$

The attention calculation looks as follows:

$$y_i = \sum_j softmax_j\left(\frac{q_i\times k_0}{\sqrt{keydim}}, \frac{q_i\times k_1}{\sqrt{keydim}}, \dots, \frac{q_i\times k_n}{\sqrt{keydim}}\right)\times v_j$$

where $keydim$ is the dimensionality of the key vectors.

We call $\frac{q_i\times k_j}{\sqrt{keydim}}$ attention scores, as they determine how much input $in_j$ contributes to the new representation of input $in_i$.

We can arrange the input feature vectors as row vectors in a matrix $X$ to calculate all queries in a single matrix multiplication like so:

$$Q = W_QX^T,$$

$$K = W_KX^T,$$

$$V = W_VX^T.$$

The attention calculation then corresponds to:

$$softmax\left(\frac{QK^T}{\sqrt{keydim}}\right)V$$

with softmax along the rows.

### Multi-Head Attention
In practice, the whole attention mechanism is applied 32 times simultaneously (i.e. 32 attention heads), each with their own transformation matrices for $W_Q, W_K, W_V$, so that some of the attention calculations can represent different relations between the tokens than others. For example, one might focus on grammatical relations between words while others might focus on phonetical similarities between words, if the training data included poems and lyrics.

## Exercise 8.1: Transformer Implementation

### Self-Attention

Implement a self-attention layer.

In [None]:
import math

class MySingleSelfAttentionLayer(nn.Module):
    def __init__(self):
        super().__init__()
        feature_dim = 3072

        # dimensionality per attention head
        self.head_dim = 96

        # TODO: create the projection matrices for the queries, keys and values (using nn.Linear without bias) - use head_dim for the dimensionality of the resulting vectors
        self.q_proj =
        self.k_proj =
        self.v_proj =

    def forward(self, hidden_states):
        # TODO: calculate the queries, keys and values
        queries =
        keys =
        values =


        # apply position embeddings - this enriches the query and key vectors with information about their position within the input sequence
        queries, keys = utils.apply_pos_emb(hidden_states, queries, keys, values)


        # TODO: calculate all attention scores using matrix multiplication
        attn_weights =


        # applying a causal attention mask, i.e. offsetting attention scores to -inf wherever the key token comes later in the sentence/text than the query token, so that the prediction model can't predict the next word by just looking at it (instead of infering it from the prior text)
        attention_mask = utils.get_attention_mask(phi3config, hidden_states)
        attn_weights = attn_weights + attention_mask


        # apply softmax to get a weighted sum with a total weight of 1.0
        attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(values.dtype)


        # TODO: calculate the weighted sum of value vectors based on the attention they received
        attn_output =

        return attn_output

In [None]:
# Sanity check to verify matching dimensions
def sanity_check_single():
  test_layer = MySingleSelfAttentionLayer().cpu()
  test_input = torch.rand(4, 5, 3072, device='cpu')
  test_output = test_layer(test_input)
  assert list(test_output.shape) == [test_input.shape[0], test_input.shape[1], test_input.shape[2]//32], "Tensor shape mismatch! Expected output tensor shape to match the input tensor's shape(/32)!"

sanity_check_single()

### Multi-Head Attention
Use multiple attention layers to implement a multi-head self attention layer.

In [None]:
class MySelfAttentionLayer(nn.Module):
    def __init__(self):
        super().__init__()
        feature_dim = 3072

        # multi-head attention, head count
        num_heads = 32

        # TODO: create 32 attention heads (MySingleSelfAttentionLayer) in a list, wrap it with nn.ModuleList
        self.attention_heads =


        ### OPTIMIZATION ###
        self.o_proj = nn.Linear(feature_dim, feature_dim, bias=False)
        ####################

    def forward(self, hidden_states):
        # TODO: calculate the self-attention for every attention head and concatenate their result vectors
        attn_outputs =


        ### OPTIMIZATION ###
        # combine results from different attention heads using a linear layer
        attn_outputs = self.o_proj(attn_outputs)
        ####################

        return attn_outputs

In [None]:
# Sanity check to verify matching dimensions
def sanity_check_multi():
  test_layer = MySelfAttentionLayer().cpu()
  test_input = torch.rand(4, 5, 3072, device='cpu')
  test_output = test_layer(test_input)
  assert list(test_output.shape) == list(test_input.shape), "Tensor shape mismatch! Expected output tensor shape to match the input tensor's shape!"

sanity_check_multi()

### Transformer Block

Implement a transformer block by combining a self attention layer with a MLP.

In [None]:
# This is where we implement a decoder-only transformer block - the most common architecture among state-of-the-art LLMs
class MyTransformerBlock(nn.Module):
    def __init__(self):
        super().__init__()
        # TODO: create a self attention layer (MySelfAttentionLayer - implementation above)
        self.self_attn =
        # TODO: create an MLP (MyMLP - implementation below)
        self.mlp =

        ### OPTIMIZATION ###
        # layer normalization was used during training for better training behavior
        self.input_layernorm = utils.Phi3RMSNorm(phi3config.hidden_size, eps=phi3config.rms_norm_eps)
        self.post_attention_layernorm = utils.Phi3RMSNorm(phi3config.hidden_size, eps=phi3config.rms_norm_eps)
        ####################

    def forward(self, hidden_states):
        ### OPTIMIZATION ###
        # keep a copy of the original input vectors
        residual = hidden_states
        # applying layer normalization
        hidden_states = self.input_layernorm(hidden_states)
        ####################


        # TODO: calculate multi-head self attention
        attn_outputs =


        ### OPTIMIZATION ###
        # combine each input vector's previous representation with the most attended related vectors
        hidden_states = residual + attn_outputs
        # prepare the skip connection around the MLP
        residual = hidden_states
        # applying another layer normalization
        hidden_states = self.post_attention_layernorm(hidden_states)
        ####################


        # TODO: apply the MLP to calculate relations between each input vector's previous representation and its most attended related vectors
        hidden_states =


        ### OPTIMIZATION ###
        # apply skip connection around MLP
        hidden_states = residual + hidden_states
        ####################
        outputs = (hidden_states,)

        return outputs


# A self-gated MLP
class MyMLP(nn.Module):
    def __init__(self):
        super().__init__()

        self.gate_up_proj = nn.Linear(phi3config.hidden_size, 2 * phi3config.intermediate_size, bias=False)
        self.down_proj = nn.Linear(phi3config.intermediate_size, phi3config.hidden_size, bias=False)

    def forward(self, hidden_states):
        up_states = self.gate_up_proj(hidden_states)

        gate, up_states = up_states.chunk(2, dim=-1)
        up_states = up_states * utils.silu(gate)

        return self.down_proj(up_states)

In [None]:
# Sanity check to verify matching dimensions
def sanity_check_block():
  test_layer = MyTransformerBlock().cpu()
  test_input = torch.rand(4, 5, 3072, device='cpu')
  test_output = test_layer(test_input)[0]
  assert list(test_output.shape) == list(test_input.shape), "Tensor shape mismatch! Expected output tensor shape to match the input tensor's shape!"

sanity_check_block()

### Transformer Model

Stack 32 transformer blocks and add a final linear layer that outputs a score for every token ID in the dicitonary.

For more details, see the sections about **Embeddings** and **Model Output** above.

In [None]:
class MyModel(PreTrainedModel):
    def __init__(self):
        super().__init__(phi3config)
        # TODO: create the embedding matrix (nn.Embedding(...))
        self.embed_tokens =


        # TODO: make a list of 32 transformer blocks (MyTransformerBlock, implementation above) that we will propagate the inputs through
        # NOTE: wrap the list in "nn.ModuleList(my_list)" to inform pytorch that these are neural network components (that hold model parameters) and not just a list of python variables
        self.layers =

        ### OPTIMIZATION ###
        # layer normalization was used during training for better training behavior
        self.norm = utils.Phi3RMSNorm(phi3config.hidden_size, eps=phi3config.rms_norm_eps)
        ####################

        # TODO: create a linear layer (without a bias) that maps the representations produced by the last transformer block onto the dictionary (giving every token ID a likelihood score)
        # NOTE: as we will see below, we model every transformer block to produce outputs with the same hidden dimension as its inputs - so the output dimension of the last transformer block is still the same as the dimension of the input of the first transformer block, i.e. equal to the embedding dimension
        self.lm_head =

    def forward(self, input_ids, return_dict=False, **kwargs):
        # TODO: get the embedding vectors for the input token ids
        inputs_embeds =

        # out initial hidden representations are the token embeddings
        hidden_states = inputs_embeds

        # propagate through all transformer blocks sequentially
        for transformer_block in self.layers:
            # TODO: get the new hidden representations by transforming the current hidden representations
            layer_outputs =

            hidden_states = layer_outputs[0]


        ### OPTIMIZATION ###
        # applying layer normalization
        hidden_states = self.norm(hidden_states)
        ####################


        # TODO: calculate the next token likelihood scores from the last hidden representations using the linear mapping layer
        logits =
        logits = logits.float()

        return utils.format_model_output(logits, return_dict)


    def prepare_inputs_for_generation(self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs):
        model_inputs = {"input_ids": input_ids}
        return model_inputs

In [None]:
# Sanity check to verify matching dimensions
def sanity_check_model():
  test_layer = MyModel().cuda()
  test_input = torch.randint(0, phi3config.vocab_size, (4, 5), device='cuda')
  test_output = test_layer(test_input)[0]
  assert list(test_output.shape) == [test_input.shape[0], test_input.shape[1], phi3config.vocab_size], "Tensor shape mismatch! Expected output tensor shape to match the input tensor's shape and dictionary size!"

sanity_check_model()

### Comprehension Questions
Retrieve the shape of the tensor that goes into the first transformer block.
 - What is the meaning of each of those dimensions?
 - How can we use the output of the model? Give an example with a demonstration.
 - What does the softmax function look like, mathematically? And how does an attention score of negative infinity contribute?

## Exercise 8.2: Decoding the Model Output

### Setup

This exercise can utilize GPU acceleration. If you are using Google Colab you can enable access to a cloud GPU by selecting from the menu above:

**Runtime > Change runtime type > Hardware accelerator > GPU**

If you are running this notebook on your own machine, GPU acceleration is available if you have an Nvidia GPU and a CUDA-enabled driver installed.

### Model Instantiation

Create an instance of your model class in the following code block. The code also loads pretrained weights for the model and inserts them into the model's layers, so that we can try the model out without having to train it ourselfes (which would take an excessive amount of time and ressources for large language models).

Once your model is confirmed to work and generated plausible outputs, the method below can also be used to instantiate a better optimized implementation of the same model for faster processing (by providing `use_optimized_implementation=True`).

In [None]:
# Helper code to create a mode instance
def create_model_instance(use_optimized_implementation=False):
    # with use_optimized_implementation=False this function uses our model implementation
    # with use_optimized_implementation=True a more optimized implementation is used

    if not use_optimized_implementation:
        torch.set_default_device('cuda')
        torch.set_default_dtype(torch.bfloat16)

        # retrieving pretrained weights
        model_weights = utils.get_model_weights()

        # TODO: create an instance of your model
        model =

        # loading pretrained weights into the model
        model.load_state_dict(model_weights)
    else:
        model = utils.get_model()

    return model

### A single Forward Pass

Create an instance of your model using `create_model_instance`, tokenize an input text and feed the token sequence into the model to predict the next token. When selecting the next token, take the one with the highest score (i.e. greedy decoding).

In [None]:
def experiment1():
    # TODO: create a model instance
    model =

    # Some test input
    input_text = "Hello, nice to"

    # TODO: tokenize the input text
    token_ids =

    # make a pytorch tensor out of the token id list
    token_ids_tensor = torch.tensor(token_ids)

    # add a batch dimension (we only have one sequence so batch size is 1)
    batch = token_ids_tensor.unsqueeze(dim=0)

    # move the batch to the GPU
    batch = batch.cuda()

    with torch.no_grad():
        # TODO: feed the batch into the model
        out =
        logits = out[0] # a model could output more data, like the attention scores, to cache them for auto-regression, but we are only interested in the logits

        # TODO: the "logits" variable contains the prediction scores for every token's successor in the sequence, not just for the last token. Get the scores for the last token.
        next_token_logits =

        # TODO: determine the next token ID (i.e. the id/index of the token with the highest score)
        next_token_id =

        # TODO: decode the sequence with the new token attached at the end
        new_text =

        print(new_text)

experiment1()

### Top k

Convert the predicted token scores into token probabilities by applying softmax to them.

In [None]:
def experiment2():
    # TODO: create a model instance
    model =

    # Some test input
    input_text = "Hello, nice to"

    # TODO: tokenize the input text
    token_ids =

    # make a pytorch tensor out of the token id list
    token_ids_tensor = torch.tensor(token_ids)

    # add a batch dimension (we only have one sequence so batch size is 1)
    batch = token_ids_tensor.unsqueeze(dim=0)

    # move the batch to the GPU
    batch = batch.cuda()

    with torch.no_grad():
        # TODO: feed the batch into the model
        out =
        logits = out[0] # a model could output more data, like the attention scores, to cache them for auto-regression, but we are only interested in the logits

        # TODO: the "logits" variable contains the prediction scores for every token's successor in the sequence, not just for the last token. Get the scores for the last token.
        next_token_logits =


        # TODO: calculate the probabilities for all token IDs (i.e. their probability to be the next token in the sequence)
        next_token_probabilities =

        # TODO: sort by probability and store the id/index corresponding to each probability
        next_token_probabilities, next_token_ids =

        k = 5

        # TODO: take the top k tokens
        top_k_probs, top_k =

        # TODO: for each of the top-k entries print the decoded text and the probability
        for p, token_id in zip(top_k_probs, top_k):
            print("Probability:", p.item() * 100)
            text =
            print(text, '\n')

experiment2()

### Autoregression

Use the model to continue the text in a loop.

In [None]:
from IPython.display import display, clear_output
class printer(str):
    def __repr__(self):
       return self

def experiment3():
    # TODO: create a model instance
    model =

    # Some test input
    input_text = "(Narrator:) Hello, nice to"

    # TODO: tokenize the input text
    token_ids =

    max_new_tokens = 10

    # The token that the model uses to signify the end of the output
    end_token_id = tokenizer.convert_tokens_to_ids(['<|end|>'])[0]

    with torch.no_grad():
        for _ in range(max_new_tokens):
            # make a pytorch tensor out of the token id list
            token_ids_tensor = torch.tensor(token_ids)

            # add a batch dimension (we only have one sequence so batch size is 1)
            batch = token_ids_tensor.unsqueeze(dim=0)

            # move the batch to the GPU
            batch = batch.cuda()

            # TODO: feed the batch into the model
            out =
            logits = out[0]

            # TODO: the "logits" variable contains the prediction scores for every token's successor in the sequence, not just for the last token. Get the scores for the last token.
            next_token_logits =

            # TODO: determine the next token ID (i.e. the id/index of the token with the highest score)
            next_token_id =

            # TODO: add the new token to the input sequence
            token_ids...

            # show the generated text so far
            clear_output(wait=True)
            display(printer(tokenizer.decode(token_ids)))

            if (next_token_id == end_token_id):
                break

experiment3()

## Additional Content

The libraries used here already provide premade functions to predict and decode text in a loop.
An example is provided below.

The `temperature` parameter is simply a factor that the logits/token scores are divided by before applying softmax. It controls the variance of the generated text - the lower the temperature, the more probabilistic weight moves towards the tokens with the highest scores.

The `do_sample` parameter picks the next token at random based on each token ID's predicted probability, rather than greedily always picking the one with the highest probability. This can be combined with a `top_p` parameter that can be used to filter out less likely tokens so we don't occasionally encounter very inaccurate tokens due to bad luck, or the `top_k` parameter that only keeps the k most likely tokens to sample from.

In [None]:
def generation_example():
    model = create_model_instance(True)

    input_text = "Hello, nice to"

    token_ids = tokenizer.encode(input_text)
    output_ids = model.generate(
        torch.tensor([token_ids]).to(model.device),
        do_sample=True,
        temperature=0.6,
        max_new_tokens=30,
    )
    new_token_ids = output_ids.tolist()[0][len(token_ids):]
    all_token_ids = token_ids + new_token_ids

    result_text = tokenizer.decode(all_token_ids, skip_special_tokens=True)
    print(result_text)

generation_example()

The model was also fine tuned to continue text in a chat format. The tokenizer includes the template.
Below you can find an example for LLMs work in a chat setting.

You can also see that the model predicts an "<|end|>" token where it predicts the response should be finished. This is what the generating loop is looking for to terminate before `max_new_tokens` is reached.

In [None]:
def chat_example():
    model = create_model_instance(True)

    chat = [
        {"role": "user", "content": "Write a short sentence about drying paint."},
    ]
    prompt = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)

    print(f'Prompt: \n"\n{prompt}"\n')

    token_ids = tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt")

    output_ids = model.generate(
        token_ids,
        do_sample=True,
        temperature=0.6,
        max_new_tokens=30,
    )
    new_token_ids = output_ids.tolist()[0][token_ids.shape[1]:]

    response_text = tokenizer.decode(new_token_ids, skip_special_tokens=False)
    print(f"Model response: \n{response_text}")

chat_example()