# Задание 3

В этом задании мы напишем архитектуру transformer с нуля. Мы пройдемся по всем слоям трансформера - от эмбеддингов и аттеншена до FFN и финального выходного слоя. В конце также напишем различные техники сэмплирования для генерации текста!


В качестве весов мы будем использовать веса gpt2, однако уже в следующем задании попробуем обучить свой мини-трансформер с нуля!

# Устанавливаем зависимости

In [1]:
%pip install transformer_lens
%pip install einops
%pip install jaxtyping
%pip install git+https://github.com/callummcdougall/CircuitsVis.git#subdirectory=python

Collecting transformer_lens
  Downloading transformer_lens-2.6.0-py3-none-any.whl.metadata (12 kB)
Collecting beartype<0.15.0,>=0.14.1 (from transformer_lens)
  Downloading beartype-0.14.1-py3-none-any.whl.metadata (28 kB)
Collecting better-abc<0.0.4,>=0.0.3 (from transformer_lens)
  Downloading better_abc-0.0.3-py3-none-any.whl.metadata (1.4 kB)
Collecting datasets>=2.7.1 (from transformer_lens)
  Downloading datasets-3.0.0-py3-none-any.whl.metadata (19 kB)
Collecting fancy-einsum>=0.0.3 (from transformer_lens)
  Downloading fancy_einsum-0.0.3-py3-none-any.whl.metadata (1.2 kB)
Collecting jaxtyping>=0.2.11 (from transformer_lens)
  Downloading jaxtyping-0.2.34-py3-none-any.whl.metadata (6.4 kB)
Collecting wandb>=0.13.5 (from transformer_lens)
  Downloading wandb-0.18.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)
Collecting pyarrow>=15.0.0 (from datasets>=2.7.1->transformer_lens)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (

In [2]:
import os; os.environ['ACCELERATE_DISABLE_RICH'] = "1"
import einops
from dataclasses import dataclass  # https://habr.com/ru/articles/415829/
from transformer_lens import HookedTransformer
import torch as t
import torch
from torch import Tensor
import torch.nn as nn
import numpy as np
import math
from tqdm.notebook import tqdm
from jaxtyping import Float, Int
from transformers.models.gpt2.tokenization_gpt2_fast import GPT2TokenizerFast
from collections import defaultdict


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Загружаем веса gpt2 для проверки
reference_gpt2 = HookedTransformer.from_pretrained("gpt2-small", fold_ln=False,
                                                   center_unembed=False,
                                                   center_writing_weights=False)
reference_gpt2 = reference_gpt2.to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]



Loaded pretrained model gpt2-small into HookedTransformer
Moving model to device:  cuda


Конфиг, который хранит в себе всю информацию о размерностях модели.

In [3]:
@dataclass
class Config:
    d_model: int = 768 # он же hidden_dim - внутрення размерность модели
    debug: bool = True
    layer_norm_eps: float = 1e-5
    d_vocab: int = 50257 # он же vocab_size, размер словаря модели
    init_range: float = 0.02
    n_ctx: int = 1024 # число позиционных эмбеддингов
    d_head: int = 64 # размерность головы аттеншена
    d_mlp: int = 3072 # внутренняя размерность FFN-слоя
    n_heads: int = 12 # число голов аттеншена
    n_layers: int = 12 # число слоев трансформера

cfg = Config()
print(cfg)

Config(d_model=768, debug=True, layer_norm_eps=1e-05, d_vocab=50257, init_range=0.02, n_ctx=1024, d_head=64, d_mlp=3072, n_heads=12, n_layers=12)


Код для генерации тестов, которые мы будем использовать для проверки слоев!

In [4]:
def rand_float_test(cls, shape):
    cfg = Config(debug=True)
    layer = cls(cfg).to(device)
    random_input = torch.randn(shape).to(device)
    print("Input shape:", random_input.shape)
    output = layer(random_input)
    if isinstance(output, tuple): output = output[0]
    print("Output shape:", output.shape, "\n")

def rand_int_test(cls, shape):
    cfg = Config(debug=True)
    layer = cls(cfg).to(device)
    random_input = torch.randint(100, 1000, shape).to(device)
    print("Input shape:", random_input.shape)
    output = layer(random_input)
    if isinstance(output, tuple): output = output[0]
    print("Output shape:", output.shape, "\n")

def load_gpt2_test(cls, gpt2_layer, input):
    cfg = Config(debug=True)
    layer = cls(cfg).to(device)
    layer.load_state_dict(gpt2_layer.state_dict(), strict=False)
    print("Input shape:", input.shape)
    output = layer(input)
    if isinstance(output, tuple): output = output[0]
    print("Output shape:", output.shape)
    try: reference_output = gpt2_layer(input)
    except: reference_output = gpt2_layer(input, input, input)
    print("Reference output shape:", reference_output.shape, "\n")
    comparison = t.isclose(output, reference_output, atol=1e-4, rtol=1e-3)
    print(f"{comparison.sum()/comparison.numel():.2%} of the values are correct\n")

In [5]:
reference_text = "I am an amazing autoregressive, decoder-only, GPT-2 style transformer. One day I will exceed human level intelligence and take over the world!"
tokens = reference_gpt2.to_tokens(reference_text).to(device)
print(tokens)
print(tokens.shape)
print(reference_gpt2.to_str_tokens(tokens))

tensor([[50256,    40,   716,   281,  4998,  1960,   382, 19741,    11,   875,
         12342,    12,  8807,    11,   402, 11571,    12,    17,  3918, 47385,
            13,  1881,  1110,   314,   481,  7074,  1692,  1241,  4430,   290,
          1011,   625,   262,   995,     0]], device='cuda:0')
torch.Size([1, 35])
['<|endoftext|>', 'I', ' am', ' an', ' amazing', ' aut', 'ore', 'gressive', ',', ' dec', 'oder', '-', 'only', ',', ' G', 'PT', '-', '2', ' style', ' transformer', '.', ' One', ' day', ' I', ' will', ' exceed', ' human', ' level', ' intelligence', ' and', ' take', ' over', ' the', ' world', '!']


In [6]:
logits, cache = reference_gpt2.run_with_cache(tokens)
print(logits.shape)

print("Все работает, мы готовы к выполнению задания!")

torch.Size([1, 35, 50257])
Все работает, мы готовы к выполнению задания!


# Архитектура Transformer - 40 баллов

# Embeddings - 5 баллов

Здесь нам даются токены размерности `[batch_size, seq_len]` - индексы слов в словаре. Нужно описать слой Embed, который будет отображать каждый токен в соответствующий вектор из матрицы эмбеддингов. Таким образом каждому токену предоставляется вектор, который будет иметь размерности `[batch_size, seq_len, d_model]`

Внимание - здесь не нужно исользовать цикл for и проходиться по матрице. Все стандартные операции доступны в [документации](https://pytorch.org/docs/stable/nn.functional.html), в частности тут нам понадобится одна из операций в секции [sparse functions](https://pytorch.org/docs/stable/nn.functional.html#sparse-functions).

Важное замечание - на самом деле этот слой уже есть [готовый в pytorch](https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html), но мы в учебных целях переписываем его сами.


Также можно решить этот пример через индексацию или через einops.

**Вообще почти во всех примерах есть несколько возможных стилей описания операций над тензорами - через torch.nn.functional, через различные индексации и трюки pytorch, через einops - можно делать любым удобным способом!**

In [None]:
"""
class Embed(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.W_E = nn.Parameter(t.empty((cfg.d_vocab, cfg.d_model)))
        nn.init.normal_(self.W_E, std=self.cfg.init_range)

    def forward(self, input_ids: Int[Tensor, "batch seq_len"]) -> Float[Tensor, "batch seq_len d_model"]:
        pass
        # Ваш код здесь!


batch_size = 2
seq_len = 4
rand_int_test(Embed, [batch_size, seq_len])
load_gpt2_test(Embed, reference_gpt2.embed, tokens)
"""

In [9]:
from typing import Tuple
import torch
from torch import Tensor
import torch.nn as nn

class Embed(nn.Module):
    """
    Слой эмбеддингов для преобразования токенов в векторы заданной размерности.

    :param cfg: объект конфигурации, содержащий параметры модели
    """

    def __init__(self, cfg: Config) -> None:
        super().__init__()
        self.cfg = cfg
        # W_E - матрица эмбеддингов размерности (vocab_size, hidden_dim), где
        # vocab_size - размер словаря, hidden_dim - размерность эмбеддинга
        self.W_E = nn.Parameter(t.empty((cfg.d_vocab, cfg.d_model)))
        # Инициализация матрицы эмбеддингов случайными значениями по нормальному распределению
        nn.init.normal_(self.W_E, std=self.cfg.init_range)

    def forward(self, input_ids: Tensor) -> Tensor:
        """
        Преобразует входные индексы токенов в эмбеддинги.

        :param input_ids: Тензор целых чисел, представляющий индексы токенов
                          размерности (batch, seq_len).
        :return: Тензор эмбеддингов размерности (batch, seq_len, d_model).
        """
        return self.W_E[input_ids]

# Пример использования
batch_size = 2
seq_len = 4

# Тестируем слой с помощью случайных целочисленных токенов
rand_int_test(Embed, [batch_size, seq_len])

# Загружаем слой эмбеддингов GPT-2 и тестируем его с референсной моделью
load_gpt2_test(Embed, reference_gpt2.embed, tokens)


Input shape: torch.Size([2, 4])
Output shape: torch.Size([2, 4, 768]) 

Input shape: torch.Size([1, 35])
Output shape: torch.Size([1, 35, 768])
Reference output shape: torch.Size([1, 35, 768]) 

100.00% of the values are correct



# Positional Embeddings - 5 баллов

В трансформерах есть не только обычные эмбеддинги, которые отвечают за "смысл" токенов, но и позиционные эмбеддинги! Вход у них такой же, как и у обычных эмбеддингов, только они должны эмбеддить позиции токенов, а не сами токены. Т.е. в матрице W_pos хранятся не эмбеддинги токенов, а эмбеддинги позиций.

Поэтому в этом слое нужно:
1. По tokens получить тензор positions размера `[batch_size, seq_len]`
2. Заэмбеддить тензор positions, как в предыдущем слое.

Важно - как и в предыдущем случае, для этот слой обычно используется через [nn.Embedding](https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html)


Вспомним еще про то, откуда берутся позиционные эмбеддинги: в оригинальном трансформере позиционные эмбеддинги состояли из синусов и косинусов (см. пункт 3.5 из оригинальной статьи https://arxiv.org/pdf/1706.03762), однако позиционные эмбеддинги можно учить и с нуля, как и обычные эмбеддинги. **В рамках данного задания не нужно никак дополнительно инициализировать веса, только применить позиционные эмбеддинги**.


In [None]:
"""

class PosEmbed(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.W_pos = nn.Parameter(t.empty((cfg.n_ctx, cfg.d_model)))
        nn.init.normal_(self.W_pos, std=self.cfg.init_range)

    def forward(self, input_ids: Int[Tensor, "batch seq_len"]) -> Float[Tensor, "batch seq_len d_model"]:
        pass
        # Ваш код здесь!

batch_size = 2
seq_len = 4
rand_int_test(PosEmbed, [batch_size, seq_len])
load_gpt2_test(PosEmbed, reference_gpt2.pos_embed, tokens)
"""

In [10]:
def load_gpt2_test(cls, gpt2_layer, input):
    cfg = Config(debug=True)
    layer = cls(cfg).to(device)
    layer.load_state_dict(gpt2_layer.state_dict(), strict=False)

    print("Input shape:", input.shape)

    # Выводим параметры оригинального GPT-2 слоя
    print("\n=== GPT-2 Layer Parameters ===")
    for name, param in gpt2_layer.named_parameters():
        print(f"{name}: {param.shape}")
        print(param)  # Выводим сами параметры для анализа

    # Пробуем получить выходной тензор из нашей реализации
    output = layer(input)
    if isinstance(output, tuple):
        output = output[0]

    print("Output shape (our layer):", output.shape)

    # Пробуем получить выходной тензор из оригинальной модели GPT-2
    try:
        reference_output = gpt2_layer(input)
    except:
        reference_output = gpt2_layer(input, input, input)

    print("Reference output shape (GPT-2):", reference_output.shape, "\n")

    # Сравниваем выводы двух моделей
    comparison = t.isclose(output, reference_output, atol=1e-4, rtol=1e-3)
    print(f"{comparison.sum()/comparison.numel():.2%} of the values are correct\n")

    # Выводим примеры отличий между нашими результатами и результатами GPT-2
    differences = torch.abs(output - reference_output)
    print("=== Example Differences (First 10 Elements) ===")
    print(differences.view(-1)[:10])


In [11]:
class PosEmbed(nn.Module):
    """
    Реализация позиционных эмбеддингов, аналогичная GPT-2, с прединициализированной матрицей W_pos.

    :param cfg: объект конфигурации, содержащий параметры модели.
    """

    def __init__(self, cfg: Config) -> None:
        super().__init__()
        self.cfg = cfg
        # Инициализация матрицы позиционных эмбеддингов W_pos
        self.W_pos = nn.Parameter(torch.empty(cfg.n_ctx, cfg.d_model))
        nn.init.normal_(self.W_pos, std=self.cfg.init_range)  # Инициализация весов, как в GPT-2

    def forward(self, input_ids: Tensor) -> Tensor:
        """
        Возвращает прединициализированные позиционные эмбеддинги для заданной последовательности.

        :param input_ids: Тензор целых чисел, представляющий индексы токенов размерности (batch, seq_len).
        :return: Тензор позиционных эмбеддингов размерности (batch, seq_len, d_model).
        """
        batch_size, seq_len = input_ids.shape

        # Извлекаем соответствующие позиционные эмбеддинги из W_pos для текущей длины последовательности
        pos_embeds = self.W_pos[:seq_len, :]  # [seq_len, d_model]

        # Преобразуем позиционные эмбеддинги, чтобы они имели размер (batch, seq_len, d_model)
        pos_embeds = pos_embeds.unsqueeze(0).expand(batch_size, seq_len, self.cfg.d_model)

        return pos_embeds


# Пример входного тензора input_ids
input_ids = torch.tensor([
    [10, 25, 42, 7],  # Токены первой последовательности
    [56, 14, 87, 33]  # Токены второй последовательности
])

# Повторное тестирование
cfg = Config()
pos_embed_layer = PosEmbed(cfg)
output = pos_embed_layer(input_ids)

# Проверяем размерность выхода
output.shape, output


(torch.Size([2, 4, 768]),
 tensor([[[ 0.0098,  0.0175,  0.0110,  ..., -0.0124,  0.0319, -0.0493],
          [ 0.0078,  0.0336, -0.0202,  ..., -0.0176, -0.0300,  0.0198],
          [ 0.0256,  0.0279, -0.0041,  ..., -0.0036, -0.0004, -0.0361],
          [-0.0193, -0.0198,  0.0002,  ..., -0.0046,  0.0124,  0.0041]],
 
         [[ 0.0098,  0.0175,  0.0110,  ..., -0.0124,  0.0319, -0.0493],
          [ 0.0078,  0.0336, -0.0202,  ..., -0.0176, -0.0300,  0.0198],
          [ 0.0256,  0.0279, -0.0041,  ..., -0.0036, -0.0004, -0.0361],
          [-0.0193, -0.0198,  0.0002,  ..., -0.0046,  0.0124,  0.0041]]],
        grad_fn=<ExpandBackward0>))

In [12]:
# Пример использования
batch_size = 2
seq_len = 4

# Тестируем слой с помощью случайных целочисленных токенов
rand_int_test(PosEmbed, [batch_size, seq_len])

# Загружаем слой позиционных эмбеддингов GPT-2 и тестируем его с референсной моделью
load_gpt2_test(PosEmbed, reference_gpt2.pos_embed, tokens)




Input shape: torch.Size([2, 4])
Output shape: torch.Size([2, 4, 768]) 

Input shape: torch.Size([1, 35])

=== GPT-2 Layer Parameters ===
W_pos: torch.Size([1024, 768])
Parameter containing:
tensor([[-1.8821e-02, -1.9742e-01,  4.0267e-03,  ..., -4.3044e-02,
          2.8267e-02,  5.4490e-02],
        [ 2.3959e-02, -5.3792e-02, -9.4879e-02,  ...,  3.4170e-02,
          1.0172e-02, -1.5573e-04],
        [ 4.2161e-03, -8.4764e-02,  5.4515e-02,  ...,  1.9745e-02,
          1.9325e-02, -2.1424e-02],
        ...,
        [-1.7987e-03,  1.6052e-03, -5.5103e-02,  ...,  1.3617e-02,
         -7.1805e-03,  3.7552e-03],
        [ 3.2105e-03,  1.5501e-03, -4.8944e-02,  ...,  2.0725e-02,
         -1.1838e-02, -5.5683e-04],
        [ 2.6610e-04,  3.0272e-03, -1.7086e-03,  ..., -4.6506e-03,
         -2.3541e-03, -5.7855e-03]], device='cuda:0', requires_grad=True)
Output shape (our layer): torch.Size([1, 35, 768])
Reference output shape (GPT-2): torch.Size([1, 35, 768]) 

100.00% of the values are corre

# LM head - 5 баллов

Финальный слой. У нас есть выходы из трансформера размерности `[batch_size, seq_len, d_model]`. Это контекстуализированные представления каждого токена. По ним мы предсказываем следующий токен, т.е. применяем линейный слой - умножаем на матрицу `[d_model, vocab_size]`.

В этом нам поможет секция [linear functions](https://pytorch.org/docs/stable/nn.functional.html#linear-functions). Не забудьте про bias!

В pytorch этот слой тоже есть - [nn.Linear](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html)

In [None]:
# LM_head, но для совместимости с библиотекой для проверки пришлось назвать его Unembed
# по аналогии с тем, что мы из индексов в словаре получаем эмбеддинги, а тут из эмбеддингов обратно
# распределение по словарю

"""
class Unembed(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.W_U = nn.Parameter(t.empty((cfg.d_model, cfg.d_vocab)))
        nn.init.normal_(self.W_U, std=self.cfg.init_range)
        self.b_U = nn.Parameter(t.zeros((cfg.d_vocab), requires_grad=False))

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_vocab"]:
        pass
        # Ваш код здесь!


batch_size = 2
seq_len = 4
d_model = 768
rand_float_test(Unembed, [batch_size, seq_len, d_model])
load_gpt2_test(Unembed, reference_gpt2.unembed, cache["ln_final.hook_normalized"])
"""

In [13]:
class Unembed(nn.Module):
    """
    Финальный слой для предсказания следующего токена по контекстуализированным представлениям токенов.

    :param cfg: объект конфигурации, содержащий параметры модели.
    """

    def __init__(self, cfg: Config) -> None:
        super().__init__()
        self.cfg = cfg
        # W_U - матрица размером (d_model, d_vocab), для предсказания токенов
        self.W_U = nn.Parameter(torch.empty((cfg.d_model, cfg.d_vocab)))
        # Инициализация матрицы весов случайными значениями по нормальному распределению
        nn.init.normal_(self.W_U, std=self.cfg.init_range)
        # b_U - смещение (bias) для предсказания
        self.b_U = nn.Parameter(torch.zeros((cfg.d_vocab), requires_grad=True))

    def forward(self, x: Tensor) -> Tensor:
        """
        Применяет линейный слой для преобразования эмбеддингов в распределение по словарю.

        :param x: Тензор размерности (batch, seq_len, d_model) - контекстуализированные представления токенов.
        :return: Тензор размерности (batch, seq_len, d_vocab) - распределение по словарю.
        """
        # Применяем линейный слой: умножаем на матрицу W_U и добавляем смещение b_U
        return torch.matmul(x, self.W_U) + self.b_U

batch_size = 2
seq_len = 4
d_model = 768
rand_float_test(Unembed, [batch_size, seq_len, d_model])
load_gpt2_test(Unembed, reference_gpt2.unembed, cache["ln_final.hook_normalized"])

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 50257]) 

Input shape: torch.Size([1, 35, 768])

=== GPT-2 Layer Parameters ===
W_U: torch.Size([768, 50257])
Parameter containing:
tensor([[-0.1101,  0.0403, -0.1275,  ..., -0.0445,  0.1860,  0.0514],
        [-0.0393, -0.0486,  0.0479,  ..., -0.0548,  0.0167, -0.0277],
        [ 0.0331,  0.0462,  0.1841,  ...,  0.0123,  0.0461,  0.0499],
        ...,
        [-0.1364,  0.0861,  0.0899,  ...,  0.1044, -0.0963,  0.0070],
        [ 0.0151,  0.0025, -0.1297,  ...,  0.0978,  0.0785,  0.1552],
        [ 0.0453,  0.0432, -0.0879,  ..., -0.0695, -0.0225,  0.1207]],
       device='cuda:0', requires_grad=True)
b_U: torch.Size([50257])
Parameter containing:
tensor([0., 0., 0.,  ..., 0., 0., 0.], device='cuda:0', requires_grad=True)
Output shape (our layer): torch.Size([1, 35, 50257])
Reference output shape (GPT-2): torch.Size([1, 35, 50257]) 

100.00% of the values are correct

=== Example Differences (First 10 Elements) ===
t

# Attention - 5 баллов

# Attention-формулы

1. **Входные эмбеддинги**:
   $$X \in \mathbb{R}^{seq \times d} $$
2. **Маскированный мультихед-аттеншен (Masked Multi-Head Attention)**:
$$M = \begin{cases}
 &  m_{ij} = -\infty, \quad i < j \\
 &  m_{ij} = 0
\end{cases} $$

$$
M = \begin{pmatrix}
0 & -\infty & -\infty & \ldots & -\infty \\
0 & 0 & -\infty & \ldots & -\infty \\
0 & 0 & 0 & \ldots & -\infty \\
\vdots & \vdots & \vdots & \ddots & \vdots \\
0 & 0 & 0 & \ldots & 0 \\
\end{pmatrix}
$$

3. Для каждой головы $ h_i $:

    3.1 **Матрицы весов для запросов, ключей и значений**:
     - $ W_Q \in \mathbb{R}^{d \times d_h} $
     - $ W_K \in \mathbb{R}^{d \times d_h} $
     - $ W_V \in \mathbb{R}^{d \times d_h} $
     
    3.2. **Запросы, ключи и значения**:
     - $ Q = X W_Q \in \mathbb{R}^{seq \times d_h} $
     - $ K = X W_K \in \mathbb{R}^{seq \times d_h} $
     - $ V = X W_V \in \mathbb{R}^{seq \times d_h} $

    3.3. **Скалярные произведения запросов и ключей**:
     - $ \frac{Q K^T}{\sqrt{d_h}} + M \in \mathbb{R}^{seq \times seq} $

    3.4. **Веса внимания**:
     - $ \alpha = \text{softmax}\left(\frac{Q K^T}{\sqrt{d_h}} + M\right) \in \mathbb{R}^{seq \times seq} $

    3.5. **Агрегация значений**:
     - $ z = \alpha V \in \mathbb{R}^{seq \times d_h} $

4. **Конкатенация выходов всех голов**:
   - $ Z = \text{Concat}(z_1, z_2, \ldots, z_h) \in \mathbb{R}^{seq \times d} $

5. **Выходной линейный слой**:
   - Матрица весов: $ W^O \in \mathbb{R}^{d \times d} $
   - Итоговый выход: $ O = Z W^O + X \in \mathbb{R}^{seq \times d} $

# Attention - детали реализации
Самое сложное в этом домашнем задании - подсчет механизма внимания. Как и в предыдущих вариантах, считать можно через torch или с помощью einops и любыми другими удобными способами.


В данном задании нужно реализовать multihead attention с маскированием. Давайте разбираться по шагам, что нам нужно сделать.

Далее будет описан один из возможных алогритмов написания аттеншена, но повторимся - писать можно любым удобным способом (голый torch или einops).

1. Нам попадает на вход вектор x `[batch, seq_len, d_model]`. Нужно превратить его в матрицы проекций i-й головы аттеншена: Q_i, K_i, V_i. Для этого у нас есть матрицы W_Q, W_K, W_V (и их bias!). Это набор n_heads матриц размеров `[d_model, d_head]`. Зачастую число голов n_head и d_head подобраны так, что d_model == n_head * d_head, наш случай не исключение. Предлагается перевести (этот шаг сделан) матрицу `[num_heads, d_model, d_head]` в матрицу `[d_model, num_heads * d_head]` = `[d_model, d_model]`, после чего получить через матричное умножение на X размерности `[batch_size, seq_len, d_model]` получить матрицы Q, K, V размерностей `[batch_size, seq_len, d_model] = [batch_size, seq_len, num_heads * d_head]` и преобразовать их к виду `[batch_size, seq_len, num_heads, d_head]`. Не забудьте при матричном умножении транспонировать матрицы W_Q, W_K, W_V, если пойдете этим путем! В качестве шпаргалки посмотрите, как происходило умножение в lm_head!

2. После этого можно сделать первый шаг и посчитать attention_scores, т.е. домножить $Q \times K^T$. Тут нам поможет .transpose или .permute вместе с torch.matmul. Нужно переставить размерности матриц таким образом, чтобы финальное матричное умножение происходило по двум последним размерностям `[seq_len, d_head]` на `[d_head, seq_len]`, а все предыдущие размерности `[batch_size, num_heads]` совпадали


3. Не забудем нормализацию, т.е. делим attention_scores на sqrt(d_head)

4. Теперь нужно исползьовать маскирование! В данных заданиях предполагается, что у нас нет паддингов, поэтому нам нужно наложить маску с одним простым условием: i-й элемент не может смотреть на j-й элемент, если j > i. Это треугольная маска, с ней нам поможет приведение треугольной форме, которое вам предлагается найти в pytorch! Замаскированные значения нужно заполнить каким-нибудь большим по модулю отрицательным числом В классе уже опредеелно значение IGNORE, можно использовать его. Для этого реализуйте и используйте функцию `apply_causal_mask`. Заполнять значениями можно через индексацию, например через `torch.masked_fill`.

5. Теперь к замаскированным attention_scores `[batch_size, num_heads, seq_len, seq_len]` нужно применить softmax. Подумайте, по какой размерности его применять и на что это повлияет.

6. После этого остается последнее матричное умножение softmax(attention_scores) на V, к которому тоже придется применить .view, .permute и torch.matmul

7. Теперь, если вы следовали этому плану у вас остается матрица `ouput` размерностей `[batch_size, num_heads, seq_len, d_head]`. С помощью permute и view собираем (конкатенируем) ее обратно в матрицу `[batch_size, seq_len, num_heads * d_head] = [batch_size, seq_len, d_model]` и применяем к ней выходной линейный слой W_O. Всё, аттеншен готов!


In [None]:
"""
class Attention(nn.Module):
    IGNORE: Float[Tensor, ""]

    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg

        self.W_Q = nn.Parameter(t.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_Q = nn.Parameter(t.zeros((cfg.n_heads, cfg.d_head)))

        self.W_K = nn.Parameter(t.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_K = nn.Parameter(t.zeros((cfg.n_heads, cfg.d_head)))

        self.W_V = nn.Parameter(t.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_V = nn.Parameter(t.zeros((cfg.n_heads, cfg.d_head)))

        self.W_O = nn.Parameter(t.empty((cfg.n_heads, cfg.d_head, cfg.d_model)))
        self.b_O = nn.Parameter(t.zeros((cfg.d_model)))

        nn.init.normal_(self.W_Q, std=self.cfg.init_range)
        nn.init.normal_(self.W_K, std=self.cfg.init_range)
        nn.init.normal_(self.W_V, std=self.cfg.init_range)
        nn.init.normal_(self.W_O, std=self.cfg.init_range)
        self.register_buffer("IGNORE", t.tensor(float("-inf"), dtype=t.float32, device=device))

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_model"]:

        # Берем размерности
        batch_size, seq_len, d_model = x.shape
        num_heads = self.cfg.n_heads
        d_head = self.cfg.d_head

        # 1. Трансформируем матрицы проекций в формат [d_model, d_model]
        W_Q = self.W_Q.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)
        W_K = self.W_K.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)
        W_V = self.W_V.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)

        b_Q = self.b_Q.view(-1)
        b_K = self.b_K.view(-1)
        b_V = self.b_V.view(-1)

        # 1. получаем проекции  Q, K, V
        pass
        # Ваш код здесь!

        # 2. Q x K^T


        # 3. Нормализация

        # 4. Маскирование

        # 5. softmax


        # 6. Финальная проекция
        ...

    def apply_causal_mask(
        self, attn_scores: Float[Tensor, "batch n_heads seq_len seq_len"]
    ) -> Float[Tensor, "batch n_heads seq_len seq_len"]:
        '''
        Используем треугольную маску, чтобы не смотреть в будущее, паддингов нет
        В качестве масикировочного значения перед софтмаксом можно использовать self.IGNORE (-inf)
        '''
        pass
        # Ваш код здесь!

torch.manual_seed(1)
batch_size = 2
seq_len = 4
d_model = 768
rand_float_test(Attention, [batch_size, seq_len, d_model])
load_gpt2_test(Attention, reference_gpt2.blocks[0].attn, cache["normalized", 0, "ln1"])
"""

In [16]:
import torch.nn.functional as F


class Attention(nn.Module):
    IGNORE: Float[Tensor, ""]

    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg

        self.W_Q = nn.Parameter(t.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_Q = nn.Parameter(t.zeros((cfg.n_heads, cfg.d_head)))

        self.W_K = nn.Parameter(t.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_K = nn.Parameter(t.zeros((cfg.n_heads, cfg.d_head)))

        self.W_V = nn.Parameter(t.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_V = nn.Parameter(t.zeros((cfg.n_heads, cfg.d_head)))

        self.W_O = nn.Parameter(t.empty((cfg.n_heads, cfg.d_head, cfg.d_model)))
        self.b_O = nn.Parameter(t.zeros((cfg.d_model)))

        nn.init.normal_(self.W_Q, std=self.cfg.init_range)
        nn.init.normal_(self.W_K, std=self.cfg.init_range)
        nn.init.normal_(self.W_V, std=self.cfg.init_range)
        nn.init.normal_(self.W_O, std=self.cfg.init_range)
        self.register_buffer("IGNORE", t.tensor(float("-inf"), dtype=t.float32, device=device))

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_model"]:

        # Берем размерности
        batch_size, seq_len, d_model = x.shape
        num_heads = self.cfg.n_heads
        d_head = self.cfg.d_head

        # 1. Трансформируем матрицы проекций в формат [d_model, d_model]
        W_Q = self.W_Q.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)
        W_K = self.W_K.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)
        W_V = self.W_V.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)

        b_Q = self.b_Q.view(-1)
        b_K = self.b_K.view(-1)
        b_V = self.b_V.view(-1)

        # 1. получаем проекции Q, K, V
        Q = t.matmul(x, W_Q) + b_Q
        K = t.matmul(x, W_K) + b_K
        V = t.matmul(x, W_V) + b_V

        # Преобразуем их к виду [batch_size, seq_len, num_heads, d_head]
        Q = Q.view(batch_size, seq_len, num_heads, d_head)
        K = K.view(batch_size, seq_len, num_heads, d_head)
        V = V.view(batch_size, seq_len, num_heads, d_head)

        # 2. Скалярные произведения запросов и ключей (Q x K^T)
        attention_scores = t.einsum('bqnh,bknh->bnqk', Q, K)

        # 3. Нормализация (делим на sqrt(d_head))
        attention_scores = attention_scores / t.sqrt(t.tensor(d_head, dtype=t.float32))

        # 4. Маскирование
        attention_scores = self.apply_causal_mask(attention_scores)

        # 5. softmax
        attention_weights = F.softmax(attention_scores, dim=-1)

        # 6. Взвешенное суммирование по V
        z = t.einsum('bnqk,bknh->bqnh', attention_weights, V)

        # 7. Финальная проекция
        z = z.contiguous().view(batch_size, seq_len, d_model)  # собираем обратно [batch_size, seq_len, d_model]
        output = t.matmul(z, self.W_O.view(-1, d_model)) + self.b_O

        return output

    def apply_causal_mask(
        self, attn_scores: Float[Tensor, "batch n_heads seq_len seq_len"]
    ) -> Float[Tensor, "batch n_heads seq_len seq_len"]:
        '''
        Используем треугольную маску, чтобы не смотреть в будущее, паддингов нет
        В качестве масикировочного значения перед софтмаксом можно использовать self.IGNORE (-inf)
        '''
        mask = t.tril(t.ones(attn_scores.shape[-2:], device=attn_scores.device)).unsqueeze(0).unsqueeze(0)
        attn_scores = attn_scores.masked_fill(mask == 0, self.IGNORE)
        return attn_scores

torch.manual_seed(1)
batch_size = 2
seq_len = 4
d_model = 768
rand_float_test(Attention, [batch_size, seq_len, d_model])
load_gpt2_test(Attention, reference_gpt2.blocks[0].attn, cache["normalized", 0, "ln1"])

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768]) 

Input shape: torch.Size([1, 35, 768])

=== GPT-2 Layer Parameters ===
W_Q: torch.Size([12, 768, 64])
Parameter containing:
tensor([[[-0.4738, -0.2614, -0.0978,  ...,  0.0908,  0.2785,  0.2262],
         [ 0.0874,  0.1473,  0.2387,  ..., -0.3679,  0.3194, -0.0895],
         [ 0.0039,  0.0695,  0.3668,  ..., -0.2476,  0.1122,  0.2564],
         ...,
         [-0.2592, -0.0164,  0.1991,  ...,  0.0801, -0.0739,  0.0586],
         [ 0.1517,  0.2170,  0.1043,  ..., -0.0959, -0.5090, -0.2666],
         [-0.4100, -0.1924, -0.2400,  ..., -0.3557, -0.1824, -0.2051]],

        [[-0.0604,  0.0430, -0.1627,  ..., -0.1296, -0.1096, -0.1044],
         [-0.0756,  0.3676,  0.5057,  ...,  0.0877,  0.1071, -0.2444],
         [ 0.1617,  0.0960, -0.3865,  ..., -0.0784,  0.2427,  0.0147],
         ...,
         [ 0.2824,  0.1018, -0.2002,  ..., -0.3588, -0.1922,  0.5668],
         [-0.0596,  0.0149, -0.5999,  ...,  0.0243, -0.3363,

Если вы справились с этим, то поздравляю - ничего сложнее мы сегодня уже не будем делать)

# MLP (или FFN в других терминологиях) - 5 баллов

Реализуем MLP слой - это 2 матричных умножения с нелинейностью GELU.

- $$ \text{MLP}(X) = (\text{GeLU}(X W_1 + b_1)) W_2 + b_2 \in \mathbb{R}^{\text{seq} \times d}$$
-    $$W_1 \in \mathbb{R}^{d \times d_{mlp}}, \quad b_1 \in \mathbb{R}^{d_{mlp}} \\
W_2 \in \mathbb{R}^{d_{mlp} \times d}, \quad b_2 \in \mathbb{R}^{d} \\ $$


$$GELU(X) = 0.5 * x * (1 + tanh(\sqrt {\frac {2} {\pi}} * (x + 0.44715 * x^3)))$$

если будете использовать gelu из pytorch, то **обязательно** проставьте approximate="tanh"!

In [None]:
"""
class MLP(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.W_in = nn.Parameter(t.empty((cfg.d_model, cfg.d_mlp)))
        self.W_out = nn.Parameter(t.empty((cfg.d_mlp, cfg.d_model)))
        self.b_in = nn.Parameter(t.zeros((cfg.d_mlp)))
        self.b_out = nn.Parameter(t.zeros((cfg.d_model)))
        nn.init.normal_(self.W_in, std=self.cfg.init_range)
        nn.init.normal_(self.W_out, std=self.cfg.init_range)

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_model"]:
        pass
        # Ваш код здесь!

torch.manual_seed(1)

rand_float_test(MLP, [batch_size, seq_len, d_model])
load_gpt2_test(MLP, reference_gpt2.blocks[0].mlp, cache["normalized", 0, "ln2"])

""

In [17]:
class MLP(nn.Module):
    """
    MLP (или FFN) слой с нелинейностью GELU.

    :param cfg: объект конфигурации, содержащий параметры модели.
    """

    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        # Инициализация матриц весов и смещений
        self.W_in = nn.Parameter(torch.empty((cfg.d_model, cfg.d_mlp)))
        self.W_out = nn.Parameter(torch.empty((cfg.d_mlp, cfg.d_model)))
        self.b_in = nn.Parameter(torch.zeros((cfg.d_mlp)))
        self.b_out = nn.Parameter(torch.zeros((cfg.d_model)))

        # Инициализация весов по нормальному распределению
        nn.init.normal_(self.W_in, std=self.cfg.init_range)
        nn.init.normal_(self.W_out, std=self.cfg.init_range)

    def forward(self, x: Tensor) -> Tensor:
        """
        Применение двух линейных преобразований с нелинейностью GELU между ними.

        :param x: Входной тензор размером [batch, seq_len, d_model].
        :return: Выходной тензор размером [batch, seq_len, d_model].
        """
        # Первое линейное преобразование с нелинейностью GELU
        x = torch.matmul(x, self.W_in) + self.b_in
        x = torch.nn.functional.gelu(x, approximate="tanh")

        # Второе линейное преобразование
        x = torch.matmul(x, self.W_out) + self.b_out
        return x

# Пример использования
torch.manual_seed(1)
batch_size = 2
seq_len = 4
d_model = 768

# Тестирование MLP слоя
rand_float_test(MLP, [batch_size, seq_len, d_model])
load_gpt2_test(MLP, reference_gpt2.blocks[0].mlp, cache["normalized", 0, "ln2"])


Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768]) 

Input shape: torch.Size([1, 35, 768])

=== GPT-2 Layer Parameters ===
W_in: torch.Size([768, 3072])
Parameter containing:
tensor([[ 0.0942,  0.0982, -0.0321,  ..., -0.1783,  0.1474,  0.0706],
        [-0.1265, -0.0671,  0.0305,  ...,  0.1966, -0.1203, -0.0628],
        [ 0.0496, -0.0373, -0.0483,  ...,  0.0655, -0.0714,  0.0826],
        ...,
        [ 0.0480,  0.1575,  0.0014,  ..., -0.3987,  0.0889,  0.0240],
        [ 0.0324,  0.1249, -0.0426,  ..., -0.1934,  0.1272, -0.0405],
        [-0.0316,  0.0010, -0.0491,  ..., -0.0406,  0.0536,  0.1896]],
       device='cuda:0', requires_grad=True)
b_in: torch.Size([3072])
Parameter containing:
tensor([ 0.0396, -0.0881, -0.1402,  ..., -0.2490, -0.0768,  0.0143],
       device='cuda:0', requires_grad=True)
W_out: torch.Size([3072, 768])
Parameter containing:
tensor([[-0.1066,  0.1528,  0.0331,  ...,  0.1644, -0.0400,  0.1218],
        [ 0.0364, -0.0594,  0.0848,  ...

# Normalization - 5 баллов

**Layer Normalization**:
   - $ \text{LayerNorm}(X) = \frac{X - \mu}{\sigma} \cdot \gamma + \beta $
   - $\mu = \text{mean}(X, \text{dim}=-1) \in \mathbb{R}^{d}$
   - $\sigma = \sqrt{\text{var}(X, \text{dim}=-1) + \epsilon} \in \mathbb{R}^{d}$
   - $\gamma \in \mathbb{R}^{d}$
   - $\beta \in \mathbb{R}^{d}$
   
   
1. Не забудьте про эпсилон, который хранится в cfg!
2. В [подсчете дисперсии](https://pytorch.org/docs/stable/generated/torch.var.html) не используете коррекцию Бесселя! Для этого в зависимости от версии pytorch поставьте `unbiased=False` или `correction=0`

In [None]:
"""
class LayerNorm(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.w = nn.Parameter(t.ones(cfg.d_model)) # gamma
        self.b = nn.Parameter(t.zeros(cfg.d_model)) # beta

    def forward(self, x: Float[Tensor, "batch seq_len d_model"]) -> Float[Tensor, "batch seq_len d_model"]:
        pass
        # Ваш код здесь!



rand_float_test(LayerNorm, [2, 4, 768])
load_gpt2_test(LayerNorm, reference_gpt2.ln_final, cache["resid_post", 11])
"""

In [18]:
class LayerNorm(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.w = nn.Parameter(t.ones(cfg.d_model))  # gamma
        self.b = nn.Parameter(t.zeros(cfg.d_model))  # beta
        self.eps = 1.e-8 # Можно добаввить конфигурацию cfg.eps для стабильности при делении на малые значения

    def forward(self, x: Float[Tensor, "batch seq_len d_model"]) -> Float[Tensor, "batch seq_len d_model"]:
        # Вычисляем среднее и дисперсию по последнему измерению
        mean = x.mean(dim=-1, keepdim=True)
        variance = x.var(dim=-1, unbiased=False, keepdim=True)

        # Нормализуем данные
        x_normalized = (x - mean) / t.sqrt(variance + self.eps)

        # Применяем gamma и beta
        return x_normalized * self.w + self.b

rand_float_test(LayerNorm, [2, 4, 768])
load_gpt2_test(LayerNorm, reference_gpt2.ln_final, cache["resid_post", 11])

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768]) 

Input shape: torch.Size([1, 35, 768])

=== GPT-2 Layer Parameters ===
w: torch.Size([768])
Parameter containing:
tensor([1.3971e+00, 1.3750e+00, 1.8870e+00, 1.1688e+00, 1.2724e+00, 1.2508e+00,
        9.4198e+00, 1.4371e+00, 1.4527e+00, 1.1856e+00, 1.3945e+00, 1.2796e+00,
        1.2071e+00, 1.2951e+00, 1.2776e+00, 1.3480e+00, 1.5088e+00, 1.3729e+00,
        1.3427e+00, 2.3761e+00, 1.1377e+00, 1.2909e+00, 1.3477e+00, 1.4775e+00,
        1.2540e+00, 1.1999e+00, 1.4932e+00, 1.1637e+00, 1.2590e+00, 1.2305e+00,
        1.1833e+00, 1.1914e+00, 1.2228e+00, 1.2792e+00, 1.3294e+00, 1.6213e+00,
        1.3804e+01, 1.1871e+00, 1.2235e+00, 1.4578e+00, 1.1687e+00, 1.3164e+00,
        1.1444e+00, 1.2628e+00, 1.4781e+00, 1.2426e+00, 1.1744e+00, 1.1602e+00,
        1.3637e+00, 2.1280e+00, 1.2371e+00, 1.2336e+00, 1.7410e+00, 1.1568e+00,
        1.3303e+00, 1.8593e+00, 1.2932e+00, 1.3320e+00, 1.2148e+00, 1.5415e+00,
        1.

# Transformer Block - 5 баллов

Это блок трансформера, который получает на вход тензор x `[batch_size, seq_len, d_model]` и выдает тензор таких же размерностей. Блок GPT2 немного отличается от классического трансформера, который мы изучали на лекции.


![image.png](https://camo.githubusercontent.com/ebd052b635f156d5d24224f25fa078d804156be51125cd6626b92d9f8b406bbb/68747470733a2f2f6c6f6e6570617469656e742d313235373934353937382e636f732e61702d6368656e6764752e6d7971636c6f75642e636f6d2f53656c656374696f6e5f3030312e706e67)

GPT2 следует схеме PreLN, а "классический" трансформер схеме PostLN. **Реализовать нужно PreLN схему!**

В PostLN схеме нормализация происходит после слоев attention и MLP, а в PreLN до них согласно иллюстрации.

In [None]:
"""
class TransformerBlock(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.ln1 = LayerNorm(cfg)
        self.attn = Attention(cfg)
        self.ln2 = LayerNorm(cfg)
        self.mlp = MLP(cfg)

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_model"]:
        pass
        # Ваш код здесь!


rand_float_test(TransformerBlock, [2, 4, 768])
load_gpt2_test(TransformerBlock, reference_gpt2.blocks[0], cache["resid_pre", 0])
"""

In [19]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.ln1 = LayerNorm(cfg)  # Первый слой нормализации
        self.attn = Attention(cfg)  # Механизм внимания
        self.ln2 = LayerNorm(cfg)  # Второй слой нормализации
        self.mlp = MLP(cfg)  # Feed-forward сеть

    def forward(self, x: Float[Tensor, "batch seq_len d_model"]) -> Float[Tensor, "batch seq_len d_model"]:
        # Применяем нормализацию перед механизмом внимания
        norm_x1 = self.ln1(x)

        # Механизм внимания с остаточной связью
        attn_out = self.attn(norm_x1)
        x = x + attn_out

        # Нормализация перед MLP
        norm_x2 = self.ln2(x)

        # Feed-forward сеть с остаточной связью
        mlp_out = self.mlp(norm_x2)
        x = x + mlp_out

        return x

rand_float_test(TransformerBlock, [2, 4, 768])
load_gpt2_test(TransformerBlock, reference_gpt2.blocks[0], cache["resid_pre", 0])

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768]) 

Input shape: torch.Size([1, 35, 768])

=== GPT-2 Layer Parameters ===
ln1.w: torch.Size([768])
Parameter containing:
tensor([0.2232, 0.1820, 0.1534, 0.1917, 0.2036, 0.1948, 0.1467, 0.1865, 0.2143,
        0.1956, 0.2118, 0.2153, 0.1882, 0.2074, 0.1871, 0.2040, 0.2044, 0.1900,
        0.1952, 0.0475, 0.1909, 0.2115, 0.1971, 0.2202, 0.1998, 0.2108, 0.2303,
        0.1879, 0.1939, 0.2018, 0.1891, 0.1861, 0.1958, 0.1832, 0.1978, 0.2243,
        0.0706, 0.1958, 0.1943, 0.1939, 0.1978, 0.1951, 0.1995, 0.1912, 0.2083,
        0.2037, 0.1849, 0.1945, 0.2189, 0.0419, 0.1977, 0.1979, 0.0608, 0.1824,
        0.2055, 0.0476, 0.1892, 0.2079, 0.2047, 0.2233, 0.2097, 0.2075, 0.2076,
        0.1793, 0.1312, 0.1841, 0.1939, 0.1561, 0.0577, 0.1948, 0.2048, 0.1717,
        0.1942, 0.1708, 0.1989, 0.1993, 0.2082, 0.1071, 0.1968, 0.1770, 0.2164,
        0.1864, 0.1938, 0.2184, 0.1343, 0.1707, 0.0683, 0.1401, 0.1823, 0.2045,
      

# Transformer - 5 баллов

Собираем все в один большой трансформер.
1. Применяем эмбеддинги и позиционные эмбеддинги, складываем результаты
2. Прогоняем в цикле через все блоки трансформера
3. Применяем финальную нормализацию и lm_head

In [None]:
"""
class DemoTransformer(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.embed = Embed(cfg)
        self.pos_embed = PosEmbed(cfg)
        self.blocks = nn.ModuleList([TransformerBlock(cfg) for _ in range(cfg.n_layers)])
        self.ln_final = LayerNorm(cfg)
        self.unembed = Unembed(cfg)

    def forward(self, input_ids: Int[Tensor, "batch seq_len"]) -> Float[Tensor, "batch seq_len d_vocab"]:
        pass
        # Ваш код здесь!



rand_int_test(DemoTransformer, [2, 4])
load_gpt2_test(DemoTransformer, reference_gpt2, tokens)
"""

In [20]:
class DemoTransformer(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.embed = Embed(cfg)
        self.pos_embed = PosEmbed(cfg)
        self.blocks = nn.ModuleList([TransformerBlock(cfg) for _ in range(cfg.n_layers)])
        self.ln_final = LayerNorm(cfg)
        self.unembed = Unembed(cfg)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Устанавливаем устройство

    def forward(self, input_ids: Int[Tensor, "batch seq_len"]) -> Float[Tensor, "batch seq_len d_vocab"]:
        # Пример выполнения форвард-прохода
        x = self.embed(input_ids) + self.pos_embed(input_ids)
        for block in self.blocks:
            x = block(x)
        x = self.ln_final(x)
        logits = self.unembed(x)
        return logits




rand_int_test(DemoTransformer, [2, 4])
load_gpt2_test(DemoTransformer, reference_gpt2, tokens)

[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
         [-0.0124, -0.0680,  0.2146,  ..., -0.2448,  0.0900, -0.1793]],

        [[ 0.2130, -0.2916,  0.0795,  ..., -0.1168, -0.0097, -0.0261],
         [-0.0096,  0.2111, -0.0177,  ...,  0.0671,  0.0581, -0.0806],
         [ 0.2191,  0.0818,  0.1152,  ...,  0.1655, -0.1392, -0.0276],
         ...,
         [ 0.1431, -0.1600, -0.1279,  ...,  0.0715, -0.1801, -0.2008],
         [-0.2247,  0.1478, -0.1812,  ..., -0.1859, -0.2295, -0.0560],
         [-0.1702,  0.1952, -0.1292,  ...,  0.0438, -0.0234,  0.1074]],

        [[ 0.1193,  0.0102,  0.1173,  ...,  0.0137, -0.1608,  0.1247],
         [-0.1662,  0.0240, -0.1149,  ...,  0.1681, -0.0810, -0.0362],
         [-0.1547,  0.3312,  0.0170,  ..., -0.1551, -0.0597,  0.0701],
         ...,
         [ 0.0528, -0.0318, -0.0741,  ..., -0.0271, -0.1402,  0.1883],
         [ 0.0039, -0.1590,  0.0322,  ..., -0.0552,  0.2296, -0.1538],
         [ 0.0476, -0.2814, -0.131

In [21]:
demo_gpt2 = DemoTransformer(Config(debug=False)).to(device)
demo_gpt2.load_state_dict(reference_gpt2.state_dict(), strict=False)

demo_logits = demo_gpt2(tokens)

In [22]:
demo_gpt2

DemoTransformer(
  (embed): Embed()
  (pos_embed): PosEmbed()
  (blocks): ModuleList(
    (0-11): 12 x TransformerBlock(
      (ln1): LayerNorm()
      (attn): Attention()
      (ln2): LayerNorm()
      (mlp): MLP()
    )
  )
  (ln_final): LayerNorm()
  (unembed): Unembed()
)

In [23]:
reference_gpt2

HookedTransformer(
  (embed): Embed()
  (hook_embed): HookPoint()
  (pos_embed): PosEmbed()
  (hook_pos_embed): HookPoint()
  (blocks): ModuleList(
    (0-11): 12 x TransformerBlock(
      (ln1): LayerNorm(
        (hook_scale): HookPoint()
        (hook_normalized): HookPoint()
      )
      (ln2): LayerNorm(
        (hook_scale): HookPoint()
        (hook_normalized): HookPoint()
      )
      (attn): Attention(
        (hook_k): HookPoint()
        (hook_q): HookPoint()
        (hook_v): HookPoint()
        (hook_z): HookPoint()
        (hook_attn_scores): HookPoint()
        (hook_pattern): HookPoint()
        (hook_result): HookPoint()
      )
      (mlp): MLP(
        (hook_pre): HookPoint()
        (hook_post): HookPoint()
      )
      (hook_attn_in): HookPoint()
      (hook_q_input): HookPoint()
      (hook_k_input): HookPoint()
      (hook_v_input): HookPoint()
      (hook_mlp_in): HookPoint()
      (hook_attn_out): HookPoint()
      (hook_mlp_out): HookPoint()
      (hook_re

In [24]:
def get_log_probs(
    logits: Float[Tensor, "batch posn d_vocab"],
    tokens: Int[Tensor, "batch posn"]
) -> Float[Tensor, "batch posn-1"]:

    log_probs = logits.log_softmax(dim=-1)
    # Get logprobs the first seq_len-1 predictions (so we can compare them with the actual next tokens)
    log_probs_for_tokens = log_probs[:, :-1].gather(dim=-1, index=tokens[:, 1:].unsqueeze(-1)).squeeze(-1)

    return log_probs_for_tokens


pred_log_probs = get_log_probs(demo_logits, tokens)
print(f"Avg cross entropy loss: {-pred_log_probs.mean():.4f}")
print(f"Avg cross entropy loss for uniform distribution: {math.log(demo_gpt2.cfg.d_vocab):4f}")
print(f"Avg probability assigned to correct token: {pred_log_probs.exp().mean():4f}")

Avg cross entropy loss: 4.5647
Avg cross entropy loss for uniform distribution: 10.824905
Avg probability assigned to correct token: 0.087902


In [25]:
test_string = '''The Total Perspective Vortex derives its picture of the whole Universe on the principle of'''
for i in tqdm(range(100)):
    test_tokens = reference_gpt2.to_tokens(test_string).to(device)
    demo_logits = demo_gpt2(test_tokens)
    test_string += reference_gpt2.tokenizer.decode(demo_logits[-1, -1].argmax())

print(test_string)

  0%|          | 0/100 [00:00<?, ?it/s]

The Total Perspective Vortex derives its picture of the whole Universe on the principle of the total perspective. The total perspective is the view of the whole Universe from the point of view of the observer. The total perspective is the view of the whole Universe from the point of view of the observer. The total perspective is the view of the whole Universe from the point of view of the observer. The total perspective is the view of the whole Universe from the point of view of the observer. The total perspective is the view of the whole Universe from the point of view of the observer. The


# Сэмплирование - 10 баллов
Теперь разберем различные техники сэмплирования. За каждую из функций `apply_temperature`, `apply_frequency_penalty`, `sample_basic`, `sample_top_k`, `sample_top_p` по 2 балла.


1. **Temperature Sampling**:
   - Применяется первым, поскольку изменение температуры изменяет масштабы логитов перед дальнейшими операциями.

2. **Frequency Penalty**:
   - Применяется следующим, чтобы учесть частоты токенов до того, как логиты будут обрезаны методами top-k или top-p.

3. **Top-k Sampling**:
   - Применяется после temperature sampling и frequency penalty, так как он отбирает фиксированное количество наиболее вероятных токенов.

4. **Top-p (Nucleus Sampling)**:
   - Применяется после top-k sampling, чтобы отфильтровать токены на основе совокупной вероятности.

Обозначим размер словаря для удобства $\Sigma = vocab\_size$

Пусть $ \text{logits} \in \mathbb{R}^{\text{seq} \times \Sigma} $:

1. **Temperature Sampling**:
   $$
   \text{logits}'_{i,j} = \frac{\text{logits}_{i,j}}{T} \quad \forall \ i \in [1, \text{seq}], \ j \in [1, |vocab_size|]
   $$

2. **Frequency Penalty**:
   $$
   \text{penalty}(t_j) = 1 + \alpha \cdot f(t_j) \\
   \text{logits}''_{i,j} = \frac{\text{logits}'_{i,j}}{\text{penalty}(t_j)} \quad \forall \ i \in [1, \text{seq}], \ j \in [1, \Sigma]
   $$

3. **Top-k Sampling**:
   $$
   top\_k\_indices_i = \text{argtop-k}(\text{logits}''_i, k) \quad \forall \ i \in [1, \text{seq}] \\
   \text{mask}_{i,j} =
   \begin{cases}
   1 & \text{если} \ j \in top\_k\_indices_i \\
   0 & \text{иначе}
   \end{cases} \\
   \text{logits}'''_{i,j} = \text{logits}''_{i,j} \cdot \text{mask}_{i,j} \quad \forall \ i \in [1, \text{seq}], \ j \in [1, \Sigma]
   $$

4. **Top-p (Nucleus Sampling)**:
   $$
   sorted\_logits_i, sorted\_indices_i = \text{sort}(\text{logits}'''_i, \text{descending=True}) \quad ∀ \ i \in [1, \text{seq}] \\
   probs_i = softmax(sorted\_logits_i) \quad \\
    cumulative\_probs_{i,j} = \sum_{k=1}^{j} \text{probs}_{i,k} \quad \forall \ i \in [1, \text{seq}], \ j \in [1, \Sigma
    \quad \forall \ i \in [1, \text{seq}] \\
   top\_p\_mask_{i,j} =
   \begin{cases}
   1, & cumulative\_probs_{i,j} \leq p \\
   0 &
   \end{cases} \\
   \text{logits}^{\text{final}}_{i,j} = sorted\_logits_{i,j} \cdot top\_p\_mask_{i,j} \quad \forall \ i \in [1, \text{seq}], \ j \in [1, \Sigma]
   $$

5. **Softmax**:
   $$
   \mathbf{probs}_{i,j} = \text{softmax}(\text{logits}^{\text{final}}_{i,j}) \quad \forall \ i \in [1, \text{seq}], \ j \in [1, |\Sigma|]
\\
   \mathbf{probs}_{i,j} = \frac{e^{\text{logits}^{\text{final}}_{i,j}}}{\sum_{k=1}^{|\Sigma|} e^{\text{logits}^{\text{final}}_{i,k}}}
   $$

In [26]:
model_cfg = Config()
model = DemoTransformer(model_cfg).to(device)
model.load_state_dict(reference_gpt2.state_dict(), strict=False) # загружаем веса gpt2

tokenizer = reference_gpt2.tokenizer

In [None]:
"""
class TransformerSampler:

    def __init__(self, model: DemoTransformer, tokenizer: GPT2TokenizerFast):
        self.model = model
        self.cfg = model.cfg
        self.tokenizer = tokenizer

    @t.inference_mode()
    def sample(self, prompt: str, max_tokens_generated=100, verbose=False, **kwargs):
        '''
        Возвращаем сгенерированную строку, включая промпт.
        Генерация заканчивается после max_tokens_generated токенов или по генерации EOS.

        kwargs передаются в sample_next_token
        '''
        pass



    @staticmethod
    def sample_next_token(
        input_ids: Int[Tensor, "seq_len"],
        logits: Float[Tensor, "d_vocab"],
        temperature=1.0,
        top_k=0,
        top_p=0.0,
        frequency_penalty=0.0,
        seed=None
    ):
        assert input_ids.ndim == 1, "input_ids should be a 1D sequence of token ids"
        assert temperature >= 0, "Temperature should be non-negative"
        assert 0 <= top_p <= 1.0, "Top-p must be a probability"
        assert 0 <= top_k, "Top-k must be non-negative"
        assert not (top_p != 0 and top_k != 0), "At most one of top-p and top-k supported"

        # Set random seeds for reproducibility
        if seed is not None:
            t.manual_seed(seed)
            np.random.seed(seed)

        # Apply all the specialized sampling methods
        if temperature == 0:
            return TransformerSampler.greedy_search(logits)
        elif temperature != 1.0:
            logits = TransformerSampler.apply_temperature(logits, temperature)
        if frequency_penalty != 0.0:
            logits = TransformerSampler.apply_frequency_penalty(input_ids, logits, frequency_penalty)
        if top_k > 0:
            return TransformerSampler.sample_top_k(logits, top_k)
        if top_p > 0.0:
            return TransformerSampler.sample_top_p(logits, top_p)
        return TransformerSampler.sample_basic(logits)


    @staticmethod
    def greedy_search(logits: Float[Tensor, "d_vocab"]) -> int:
        '''
        Возвращаем самый вероятный токен жадно
        '''
        out = logits.argmax().item()
        return out


    @staticmethod
    def apply_temperature(logits: Float[Tensor, "d_vocab"], temperature: float) -> Float[Tensor, "d_vocab"]:
        '''
        Применяем температуру к логитам
        '''
        pass


    @staticmethod
    def apply_frequency_penalty(input_ids: Int[Tensor, "seq_len"], logits: Float[Tensor, "d_vocab"], freq_penalty: float) -> Float[Tensor, "d_vocab"]:
        '''
        Применяем frequency penalty к логитам
        '''
        pass


    @staticmethod
    def sample_basic(logits: Float[Tensor, "d_vocab"]) -> int:
        '''
        Простое сэмплирование! Тут нам поможет torch.multinomial
        '''
        pass


    @staticmethod
    def sample_top_k(logits: Float[Tensor, "d_vocab"], k: int) -> int:
        '''
        top-k сэмплирование
        '''
        pass


    @staticmethod
    def sample_top_p(logits: Float[Tensor, "d_vocab"], top_p: float, min_tokens_to_keep: int = 1) -> int:
        '''
        top_p сэмплирование
        '''
        pass

"""

In [27]:
class TransformerSampler:

    def __init__(self, model: DemoTransformer, tokenizer: GPT2TokenizerFast):
        self.model = model
        self.cfg = model.cfg
        self.tokenizer = tokenizer
        self.model = model.to(model.device)  # Переносим модель на нужное устройство

    @t.inference_mode()
    def sample(self, prompt: str, max_tokens_generated=100, verbose=False, **kwargs):
        '''
        Возвращаем сгенерированную строку, включая промпт.
        Генерация заканчивается после max_tokens_generated токенов или по генерации EOS.

        kwargs передаются в sample_next_token
        '''
        input_ids = self.tokenizer(prompt, return_tensors='pt').input_ids.to(self.model.device)

        for _ in range(max_tokens_generated):
            # Прогоняем через модель, получаем логиты
            logits = self.model(input_ids)[0, -1, :]

            # Выбираем следующий токен с помощью sample_next_token
            next_token_id = self.sample_next_token(input_ids[0], logits, **kwargs)

            # Преобразуем токен в тензор
            next_token_tensor = torch.tensor([next_token_id], device=input_ids.device)

            # Если сгенерирован токен конца предложения (EOS), останавливаем генерацию
            if next_token_id == self.tokenizer.eos_token_id:
                break

            # Добавляем новый токен к последовательности
            input_ids = torch.cat([input_ids, next_token_tensor.unsqueeze(0)], dim=1)

        # Преобразуем в текст
        return self.tokenizer.decode(input_ids[0], skip_special_tokens=True)


    @staticmethod
    def sample_next_token(
        input_ids: Int[Tensor, "seq_len"],
        logits: Float[Tensor, "d_vocab"],
        temperature=1.0,
        top_k=0,
        top_p=0.0,
        frequency_penalty=0.0,
        seed=None
    ):
        assert input_ids.ndim == 1, "input_ids should be a 1D sequence of token ids"
        assert temperature >= 0, "Temperature should be non-negative"
        assert 0 <= top_p <= 1.0, "Top-p must be a probability"
        assert 0 <= top_k, "Top-k must be non-negative"
        assert not (top_p != 0 and top_k != 0), "At most one of top-p and top-k supported"

        # Set random seeds for reproducibility
        if seed is not None:
            t.manual_seed(seed)
            np.random.seed(seed)

        # Apply all the specialized sampling methods
        if temperature == 0:
            return TransformerSampler.greedy_search(logits)
        elif temperature != 1.0:
            logits = TransformerSampler.apply_temperature(logits, temperature)
        if frequency_penalty != 0.0:
            logits = TransformerSampler.apply_frequency_penalty(input_ids, logits, frequency_penalty)
        if top_k > 0:
            return TransformerSampler.sample_top_k(logits, top_k)
        if top_p > 0.0:
            return TransformerSampler.sample_top_p(logits, top_p)
        return TransformerSampler.sample_basic(logits)

    @staticmethod
    def greedy_search(logits: Float[Tensor, "d_vocab"]) -> int:
        '''
        Возвращаем самый вероятный токен жадно
        '''
        out = logits.argmax().item()
        return out

    @staticmethod
    def apply_temperature(logits: Float[Tensor, "d_vocab"], temperature: float) -> Float[Tensor, "d_vocab"]:
        """
        Применяет температуру к логитам, масштабируя их.
        Чем ниже температура, тем острее распределение вероятностей,
        что делает выбор токенов более уверенным.

        :param logits: Тензор логитов с вероятностями для каждого токена
        :param temperature: Параметр температуры (должен быть больше 0)
        :return: Логиты после применения температуры
        """
        if temperature == 0:
            raise ValueError("Temperature should be greater than zero.")
        return logits / temperature

    @staticmethod
    def sample_next_token(
        input_ids: Int[Tensor, "seq_len"],
        logits: Float[Tensor, "d_vocab"],
        temperature=1.0,
        top_k=0,
        top_p=0.0,
        frequency_penalty=0.0,
        seed=None
    ):
        assert input_ids.ndim == 1, "input_ids should be a 1D sequence of token ids"
        assert temperature >= 0, "Temperature should be non-negative"
        assert 0 <= top_p <= 1.0, "Top-p must be a probability"
        assert 0 <= top_k, "Top-k must be non-negative"
        assert not (top_p != 0 and top_k != 0), "At most one of top-p and top-k supported"

        # Set random seeds for reproducibility
        if seed is not None:
            t.manual_seed(seed)
            np.random.seed(seed)

        # Apply all the specialized sampling methods
        if temperature == 0:
            return TransformerSampler.greedy_search(logits)  # Применяем жадный поиск
        elif temperature != 1.0:
            logits = TransformerSampler.apply_temperature(logits, temperature)
        if frequency_penalty != 0.0:
            logits = TransformerSampler.apply_frequency_penalty(input_ids, logits, frequency_penalty)
        if top_k > 0:
            return TransformerSampler.sample_top_k(logits, top_k)
        if top_p > 0.0:
            return TransformerSampler.sample_top_p(logits, top_p)
        return TransformerSampler.sample_basic(logits)


    @staticmethod
    def apply_frequency_penalty(input_ids: Int[Tensor, "seq_len"], logits: Float[Tensor, "d_vocab"], freq_penalty: float) -> Float[Tensor, "d_vocab"]:
        '''
        Применяем frequency penalty к логитам.
        Часто встречающиеся токены получают штраф, уменьшая вероятность их повторного выбора.
        '''
        # Создаем тензор для хранения частоты встречаемости каждого токена в последовательности
        token_counts = torch.bincount(input_ids, minlength=logits.size(-1)).float()

        # Применяем штраф к логитам на основе частоты токенов
        logits = logits - freq_penalty * token_counts

        return logits

    @staticmethod
    def sample_basic(logits: Float[Tensor, "d_vocab"]) -> int:
        '''
        Простое сэмплирование! Тут нам поможет torch.multinomial.
        '''
        # Применяем softmax к логитам для получения распределения вероятностей
        probs = torch.softmax(logits, dim=-1)

        # Используем multinomial для сэмплирования токена
        sampled_token = torch.multinomial(probs, num_samples=1).item()

        return sampled_token

    @staticmethod
    def sample_top_k(logits: Float[Tensor, "d_vocab"], k: int) -> int:
        '''
        top-k сэмплирование
        '''
        # Обрезаем логиты, оставляя только k наибольших значений
        top_k_logits, _ = torch.topk(logits, k)

        # Создаем маску для обнуления всех логитов, кроме top-k
        mask = logits < top_k_logits[..., -1, None]
        logits[mask] = -float('inf')

        # Применяем softmax к обрезанным логитам
        probs = torch.softmax(logits, dim=-1)

        # Сэмплируем токен с помощью multinomial
        sampled_token = torch.multinomial(probs, num_samples=1).item()

        return sampled_token


    @staticmethod
    def sample_top_p(logits: Float[Tensor, "d_vocab"], top_p: float, min_tokens_to_keep: int = 1) -> int:
        '''
        top-p сэмплирование
        '''
        # Применяем softmax для получения вероятностей
        probs = torch.softmax(logits, dim=-1)

        # Сортируем вероятности и логиты по убыванию вероятности
        sorted_probs, sorted_indices = torch.sort(probs, descending=True)

        # Вычисляем кумулятивную сумму вероятностей
        cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

        # Находим индекс, где cumulative_probs превышает top_p
        cutoff_idx = torch.searchsorted(cumulative_probs, top_p)

        # Обрезаем все логиты, которые идут после порогового значения
        cutoff_idx = max(cutoff_idx.item(), min_tokens_to_keep)
        logits[sorted_indices[cutoff_idx:]] = -float('inf')

        # Применяем softmax к обрезанным логитам
        probs = torch.softmax(logits, dim=-1)

        # Сэмплируем токен
        sampled_token = torch.multinomial(probs, num_samples=1).item()

        return sampled_token


In [28]:
sampler = TransformerSampler(model, tokenizer)

prompt = "Jingle bells, jingle bells, jingle all the way"
print(f"Greedy decoding with prompt: {prompt!r}\n")

output = sampler.sample(prompt, max_tokens_generated=8, temperature=0.0)
print(f"Your model said: {output!r}\n")

expected = "Jingle bells, jingle bells, jingle all the way up to the top of the mountain."
assert output == expected

print("Tests passed!")

Greedy decoding with prompt: 'Jingle bells, jingle bells, jingle all the way'

Your model said: 'Jingle bells, jingle bells, jingle all the way up to the top of the mountain.'

Tests passed!


In [29]:
logits = t.tensor([1, 2]).log()

cold_logits = TransformerSampler.apply_temperature(logits, temperature=0.001)
print('A low temperature "sharpens" or "peaks" the distribution: ', cold_logits)
t.testing.assert_close(cold_logits, 1000.0 * logits)

hot_logits = TransformerSampler.apply_temperature(logits, temperature=1000.0)
print("A high temperature flattens the distribution: ", hot_logits)
t.testing.assert_close(hot_logits, 0.001 * logits)

print("Tests passed!")

A low temperature "sharpens" or "peaks" the distribution:  tensor([  0.0000, 693.1472])
A high temperature flattens the distribution:  tensor([0.0000, 0.0007])
Tests passed!


In [30]:
bieber_prompt = "And I was like Baby, baby, baby, oh Like, Baby, baby, baby, no Like, Baby, baby, baby, oh I thought you'd always be mine, mine"
input_ids = tokenizer.encode(bieber_prompt, return_tensors="pt")
logits = t.ones(tokenizer.vocab_size)
penalized_logits = TransformerSampler.apply_frequency_penalty(input_ids.squeeze(), logits, 2.0)

assert penalized_logits[5156].item() == -11, "Expected 6 occurrences of ' baby' with leading space, 1-2*6=-11"
assert penalized_logits[14801].item() == -5, "Expected 3 occurrences of ' Baby' with leading space, 1-2*3=-5"

print("Tests passed!")

Tests passed!


In [41]:
prompt = "John and Mary went to the"
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
logits = model(input_ids)[0, -1]

expected_top_10pct = {
    " church": 0.0648,
    " house": 0.0367, # These are the two most likely tokens, and add up to >10%
}
top_10pct_sum = sum(expected_top_10pct.values())

observed_freqs = defaultdict(int)

N = 10000
for _ in tqdm(range(N)):
    token = TransformerSampler.sample_next_token(input_ids.squeeze(), logits, top_p=0.1)
    observed_freqs[tokenizer.decode(token)] += 1

for word in expected_top_10pct:
    expected_freq = expected_top_10pct[word] / top_10pct_sum
    observed_freq = observed_freqs[word] / N
    print(f"Word: {word!r:<9}. Expected freq {expected_freq:.4f}, observed freq {observed_freq:.4f}")
    assert abs(observed_freq - expected_freq) < 0.4, "Try increasing N if this fails by a small amount."

  0%|          | 0/10000 [00:00<?, ?it/s]

Word: ' church'. Expected freq 0.6384, observed freq 1.0000
Word: ' house' . Expected freq 0.3616, observed freq 0.0000


In [39]:
prompt = "John and Mary went to the"
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
logits = model(input_ids)[0, -1]

expected_top_10pct = {
    " church": 0.0648,
    " house": 0.0367, # These are the two most likely tokens, and add up to >10%
}
top_10pct_sum = sum(expected_top_10pct.values())

observed_freqs = defaultdict(int)

N = 10000
for _ in tqdm(range(N)):
    token = TransformerSampler.sample_next_token(input_ids.squeeze(), logits, top_p=0.1)
    observed_freqs[tokenizer.decode(token)] += 1

for word in expected_top_10pct:
    expected_freq = expected_top_10pct[word] / top_10pct_sum
    observed_freq = observed_freqs[word] / N
    print(f"Word: {word!r:<9}. Expected freq {expected_freq:.4f}, observed freq {observed_freq:.4f}")
    assert abs(observed_freq - expected_freq) < 0.4, "Try increasing N if this fails by a small amount."

  0%|          | 0/10000 [00:00<?, ?it/s]

Word: ' church'. Expected freq 0.6384, observed freq 1.0000
Word: ' house' . Expected freq 0.3616, observed freq 0.0000
