In [1]:
%pip install -q transformers huggingface_hub
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import transformers

### Build-a-transformer

In this section, you will implement a transformer language model layer by layer, then use it to generate (hopefully) coherent text.

To understand how these layers work, please check out our guide to transformers from [nlp course for you -> transformers](https://lena-voita.github.io/nlp_course/seq2seq_and_attention.html#transformer_intro).


First, we download pre-trained weights for the [GPT2 model by OpenAI](https://openai.com/research/better-language-models) - a prominent model from 2019.



Idea & code by: Ilya Beletsky

In [2]:
from huggingface_hub import hf_hub_download
state_dict = torch.load(hf_hub_download("gpt2", filename="pytorch_model.bin"))
for key, value in tuple(state_dict.items()):
    if key.startswith('h.') and key.endswith('.weight') and value.ndim == 2:
        value.transpose_(1, 0)  # <-- for compatibility with modern PyTorch modules
    if key.startswith('h.') and key.endswith('.attn.bias') and value.ndim == 4:
        state_dict.pop(key)  # <-- triangular binar masks, not needed in this code

print('Weights:', repr(sorted(state_dict.keys()))[:320], '...')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Weights: ['h.0.attn.c_attn.bias', 'h.0.attn.c_attn.weight', 'h.0.attn.c_proj.bias', 'h.0.attn.c_proj.weight', 'h.0.ln_1.bias', 'h.0.ln_1.weight', 'h.0.ln_2.bias', 'h.0.ln_2.weight', 'h.0.mlp.c_fc.bias', 'h.0.mlp.c_fc.weight', 'h.0.mlp.c_proj.bias', 'h.0.mlp.c_proj.weight', 'h.1.attn.c_attn.bias', 'h.1.attn.c_attn.weight', 'h.1. ...


In the next few cells, we shall implement the model layer by layer to make use of those weights.

As you might recall, transformers contain two main layer types: attention and fully-connected layers.

The fully connected layers are by far easier to understand, so we shall begin there:

Please implement fully-connected layer __without residual or layer normalization__ (we'll add those in a bit).

In [3]:
class GeLUThatWasUsedInGPT2(nn.Module):
    def forward(self, x):
        return 0.5 * x * (1.0 + torch.tanh(math.sqrt(2.0 / math.pi) * (x + 0.044715 * x ** 3)))

class FullyConnected(nn.Module):
    def __init__(self, dim: int):
        super().__init__()
        self.c_fc = nn.Linear(dim, 4  * dim)
        self.gelu = GeLUThatWasUsedInGPT2()
        self.c_proj = nn.Linear(4 * dim, dim)

    def forward(self, x):
        # x.shape = [batch_size, seq_length, dim]
        out = self.c_fc(x)
        out = self.gelu(out)
        out = self.c_proj(out)
        return out


Now, let's test that it works with GPT-2 weights:

In [4]:
mlp = FullyConnected(dim=768)
mlp.load_state_dict({'c_fc.weight': state_dict['h.0.mlp.c_fc.weight'],
                     'c_fc.bias': state_dict['h.0.mlp.c_fc.bias'],
                     'c_proj.weight': state_dict['h.0.mlp.c_proj.weight'],
                     'c_proj.bias': state_dict['h.0.mlp.c_proj.bias']})

torch.manual_seed(1337)
x = torch.randn(1, 2, 768)  # [batch_size, sequence_length, dim]
checksum = torch.sum(mlp(x) * x)
assert abs(checksum.item() - 1282.3315) < 0.1, "layer outputs do not match reference"
assert torch.allclose(mlp(x[:, (1, 0), :])[:, (1, 0), :], mlp(x)), "mlp must be permutation-invariant"
print("Seems legit!")

Seems legit!


Now, let's get to attention layers.

Since GPT-2 needs to generate text from left to right, each generated token can only attend to tokens on the left (and itself). This kid of attention is called "Masked" self-attention, because it hides tokens to the right.

As before, please implement masked self-attention __without layernorm or residual connections.__

In [5]:
class MaskedSelfAttention(nn.Module):
    def __init__(self, dim: int, num_heads: int):
        super().__init__()
        self.c_attn = nn.Linear(dim, dim * 3)  # query + key + value, combined
        self.c_proj = nn.Linear(dim, dim)  # output projection
        self.dim, self.num_heads = dim, num_heads
        self.head_size = dim // num_heads

    def forward(self, x):
        q, k, v = self.c_attn(x).split(dim=-1, split_size=self.dim)
        assert q.shape == k.shape == v.shape == x.shape, "q, k and v must have the same shape as x"


        # Note: this is an inefficient implementation that uses a for-loop.
        # To get the full grade during homework, please re-implement this code:
        # 1) do not use for-loops (or other loops). Compute everything in parallel with vectorized operations
        # 2) do not use F.scaled_dot_product_attention - write your own attention code using basic PyTorch ops
        head_outputs = []
        for head_index in range(self.num_heads):
            head_selector = range(self.head_size * head_index, self.head_size * (head_index + 1))

            head_queries = q[..., head_selector]
            head_keys = k[..., head_selector]
            head_values = v[..., head_selector]

            single_head_output = F.scaled_dot_product_attention(
                head_queries, head_keys, head_values,
                is_causal=True)
            # docs: https://pytorch.org/docs/stable/generated/torch.nn.functional.scaled_dot_product_attention.html
            head_outputs.append(single_head_output)

        combined_head_outputs = torch.cat(head_outputs, dim=-1)
        return self.c_proj(combined_head_outputs)


Test that it works

In [6]:
attn = MaskedSelfAttention(dim=768, num_heads=12)
attn.load_state_dict({'c_attn.weight': state_dict['h.0.attn.c_attn.weight'],
                      'c_attn.bias': state_dict['h.0.attn.c_attn.bias'],
                      'c_proj.weight': state_dict['h.0.attn.c_proj.weight'],
                      'c_proj.bias': state_dict['h.0.attn.c_proj.bias']})

torch.manual_seed(1337)
x = torch.randn(1, 10, 768)  # [batch_size, sequence_length, dim]
checksum = torch.sum(attn(x) * x)
assert abs(checksum.item() - 2703.6772) < 0.1, "layer outputs do not match reference"
assert not torch.allclose(attn(x[:, (1, 0), :])[:, (1, 0), :], attn(x[:, (0, 1), :])), "masked attention must *not* be permutation-invariant"
print("It works!")

It works!


We can now combine attention and MLP to build the full transformer layer:

![img](https://i.imgur.com/1sq2vHO.png)

In [7]:
class TransformerLayer(nn.Module):
    def __init__(self, dim: int, num_heads: int):
        super().__init__()
        self.ln_1 = nn.LayerNorm(dim)
        self.attn = MaskedSelfAttention(dim, num_heads)
        self.ln_2 = nn.LayerNorm(dim)
        self.mlp = FullyConnected(dim)

    def forward(self, x):
        out = self.ln_1(x)
        attn = self.attn(out)
        out = x + attn
        out1 = self.ln_2(out)
        out1 = self.mlp(out1)
        return out + out1

In [8]:
layer = TransformerLayer(dim=768, num_heads=12)
layer.load_state_dict({k[5:]: v for k, v in state_dict.items() if k.startswith('h.10.')})
assert abs(torch.sum(layer(x) * x).item() - 9874.7383) < 0.1
print("Good job!")

Good job!


In [9]:
class GPT2(nn.Module):
    def __init__(self, vocab_size: int, dim: int, num_heads: int, num_layers: int, max_position_embeddings: int = 1024):
        super().__init__()
        self.wte = nn.Embedding(vocab_size, dim)  # token embeddings
        self.wpe = nn.Embedding(max_position_embeddings, dim)  # position embeddings
        self.ln_f = nn.LayerNorm(dim)   # final layer norm - goes after all transformer layers, but before logits

        self.h = nn.Sequential(*(TransformerLayer(dim, num_heads) for layer in range(num_layers)))

    def forward(self, input_ids):
        # input_ids.shape: [batch_size, sequence_length], int64 token ids
        position_ids = torch.arange(input_ids.shape[1], device=input_ids.device).unsqueeze(0)

        token_embeddings = self.wte(input_ids)
        position_embeddings = self.wpe(position_ids)
        full_embeddings = token_embeddings + position_embeddings

        transformer_output = self.h(full_embeddings)
        transformer_output_ln = self.ln_f(transformer_output)

        # final layer: we predict logits by re-using token embeddings as linear weights
        output_logits = transformer_output_ln @ self.wte.weight.T
        return output_logits


In [10]:
tokenizer = transformers.AutoTokenizer.from_pretrained('gpt2', add_prefix_space=True)
model = GPT2(vocab_size=50257, dim=768, num_heads=12, num_layers=12)
model.load_state_dict(state_dict)

input_ids = tokenizer("A quick", return_tensors='pt')['input_ids']

predicted_logits = model(input_ids)
most_likely_token_id = predicted_logits[:, -1].argmax().item()

print("Prediction:", tokenizer.decode(most_likely_token_id))

Prediction:  look


In [11]:
text = "The Fermi paradox "
tokens = tokenizer.encode(text)
print(end=tokenizer.decode(tokens))
line_length = len(tokenizer.decode(tokens))

for i in range(500):
    # Predict logits with your model
    with torch.no_grad():
        logits = model(torch.as_tensor([tokens])) * 1.3

    # Sample with probabilities
    p_next = torch.softmax(logits[0, -1, :], dim=-1).data.cpu().numpy()
    next_token_index = np.random.choice(len(p_next), p=p_next)

    tokens.append(int(next_token_index))
    print(end=tokenizer.decode(tokens[-1]))
    line_length += len(tokenizer.decode(tokens[-1]))
    if line_length > 120:
      line_length = 0
      print()



 The Fermi paradox  is perhaps the most widely recognized example of the most recent fast-growing age of science.
 In 2005
, a strong new wave of paleoanthropologist Michael Mann published the following graph showing how fast the paleoanthropological
 record has evolved.  In that year, he calculated that the rate of change in age and stature was about 30% more rapid than
 that of the recent past.  The graph shows that much of the change in age, along with changes in stature, was due to changes
 in the timing of ancient changes.  As well, it shows that modern humans are already about twice as fast as they were in the
 past.
And what about the transition to higher productivity?  We are already within the previous 20 to 25 years of modern
 human growth, and by 30, 80% of the changes in the lifespan of the human population will have occurred in the last 2,000
 years.
Moving along, there are still a few ways in which the rapid decline of the human lifespan may be analogous to the
 rapid cha

__Reminder:__ after class, please go to `MaskedSelfAttention.forward` above and finish the job!
```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```

```


### Here's how you can do the same with transformers library

In [18]:
tokenizer = transformers.AutoTokenizer.from_pretrained('gpt2', add_prefix_space=True)
model = transformers.AutoModelForCausalLM.from_pretrained('gpt2')
print('Generated text:', tokenizer.decode(
    model.generate(
        **tokenizer("The Fermi paradox ", return_tensors='pt'),
        do_sample=True, max_new_tokens=50
    ).flatten().numpy(),
    skip_special_tokens=True
))


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Generated text:  The Fermi paradox  was the original paradox of the Fermi Paradox but the Fermi paradox was also the first one to emerge from the vacuum and was known as the Fermi Paradox , and a new paradox became known as the Fermi Paradox.


Another approach to gen text from model output logits as in example above.

In [13]:
text = "The Fermi paradox "
tokens = tokenizer.encode(text)
print(end=tokenizer.decode(tokens))
line_length = len(tokenizer.decode(tokens))

for i in range(500):
    # Predict logits with your model
    with torch.no_grad():
        logits = model(torch.as_tensor([tokens])).logits * 1.3

    # Sample with probabilities
    p_next = torch.softmax(logits[0, -1, :], dim=-1).data.cpu().numpy()
    next_token_index = np.random.choice(len(p_next), p=p_next)

    tokens.append(int(next_token_index))
    print(end=tokenizer.decode(tokens[-1]))
    line_length += len(tokenizer.decode(tokens[-1]))
    if line_length > 120:
      line_length = 0
      print()

 The Fermi paradox  is the most concrete example of this effect. This paradox is preserved by the nonlinearity of space. In
 other words, the time-space of an object (and sometimes the interval between objects) is always determined by the time of
 its initial step, and if it is only a few moments, then a finite amount of time will have elapsed. It can thus be observed
 that, in particular, when a given moment of the time-space of a point is discrete, the interval between a point's points
 must be finite, and there needs to be enough time between those points to achieve sufficiently large amounts of time. The
 existence of such a paradox implies that, even in conditions of absolute time equilibrium, we can be held to be infinitely
 well-informed about the time-space of our little quadrants of matter. We can simply assume that the space of our little qu
bits is finite. But this assumption is contradicted by the fact that, when a point has an interval of restricted time between
 points, t