# Домашние задание: введение в LLM 2

В этом домашнем задании мы разберем более современные архитектурные модификации LLM такие как RoPE, RMSNorm и обучим свою мини-LLM с нуля

## Скачиваем данные

In [1]:
! wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2025-03-22 18:01:08--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2025-03-22 18:01:10 (2.13 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [2]:
! pip install jaxtyping==0.2.34 transformers==4.48.2

Collecting jaxtyping==0.2.34
  Downloading jaxtyping-0.2.34-py3-none-any.whl.metadata (6.4 kB)
Collecting transformers==4.48.2
  Downloading transformers-4.48.2-py3-none-any.whl.metadata (44 kB)
Collecting typeguard==2.13.3 (from jaxtyping==0.2.34)
  Downloading typeguard-2.13.3-py3-none-any.whl.metadata (3.6 kB)
Downloading jaxtyping-0.2.34-py3-none-any.whl (42 kB)
Downloading transformers-4.48.2-py3-none-any.whl (9.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, jaxtyping, transformers
  Attempting uninstall: typeguard
    Found existing installation: typeguard 4.4.2
    Uninstalling typeguard-4.4.2:
      Successfully uninstalled typeguard-4.4.2
  Attempting uninstall: jaxtyping
    Found existing installation: jaxtyping 0.2.38
    Uninstalling jaxtyping-0.2.38:
      Successfully uninstalled

In [1]:
import sys
import torch
from torch import Tensor
import torch.nn as nn
import numpy as np
import math
from tqdm.notebook import tqdm
from typing import Tuple, List, Optional, Dict, Callable
from jaxtyping import Float, Int
from transformers import AutoTokenizer

In [2]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

  return torch._C._cuda_getDeviceCount() > 0


# Подготовка данных - 15 баллов

У нас есть текст пьесы Шекспира

In [137]:
with open("input.txt") as fin:
    text = fin.read()

print(text[:200])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you


Создаем токенайзер, обратите внимание, что у токена there должен быть вначале спецсимвол, обозначающий, что это новое слово, а не часть предыдущего!

In [None]:
tokenizer = AutoTokenizer.from_pretrained("openai-community/gpt2")
print(tokenizer.tokenize("Hello there sometrashtoken"))
print(tokenizer.eos_token)

['Hello', 'Ġthere', 'Ġsomet', 'r', 'ash', 'token']
<|endoftext|>


In [143]:
tokenizer.decode(tokenizer.encode("Hello there sometrashtoken", add_special_tokens=False))

'Hello there sometrashtoken'

In [93]:
text = "The unhappiness is real or not."
tokens = tokenizer.tokenize(text)
ids = tokenizer(tokens)["input_ids"]

In [121]:
tokenizer.decode(tokenizer(''.join(tokens))["input_ids"])

'TheĠunhappinessĠisĠrealĠorĠnot.'

In [129]:
tokenizer.decode(tokenizer(tokens)['input_ids'][3])

'Ġis'

In [97]:
res = []
for id in ids:
    res.append(tokenizer.decode(id))
res

['The', 'Ġunh', 'appiness', 'Ġis', 'Ġreal', 'Ġor', 'Ġnot', '.']

In [101]:
tokenizer.decode(id)

'.'

В токенайзере нет спецтокена под паддинг, поэтому выставим PAD_TOKEN = EOS_TOKEN

In [192]:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id

In [193]:
tokenizer.eos_token_id

50256

In [194]:
tokenizer.pad_token_id

50256

## Датасет - 5 баллов

Нам нужен Dataset - что-то, что будет держать данные.
Почитать подробнее можно в [документации](https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset) или на [примерах](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html).


Если кратко:
* Dataset должен реализовывать 2 метода: `__getitem__` для получения сэмплов и `__len__` для получения длины датасета
* Нужна функция collate_fn - она будет собирать несколько сэмплов из датасета в один батч
* Нужен DataLoader - объект, который будет брать объекты из датасета и с помощью collate_fn возвращать батчи
* Нужен Sampler - объект, который помогает DataLoader выбирать батчи. В нашем случае это будет просто рандом, но можно собирать сэмплы по одинаковой длине или упорядочить в зависимости от задачи.


Начнем с Dataset. В нем нужно дописать 3 функции, самая важная конструктор `__init__`:
1. Принимает корпус текста
2. Токенизирует его весь
2. Бьем текст на непересекающиеся окна размером 200-300 токенов (длину определяем с помощью random.randint)
3. Кладет токены в self.texts полученный List\[int\], то есть уже векторизованные тексты

In [None]:
texts = list(range(30))
right = 0
sub_texts = []
while right <= (len(texts)):
    current_len = np.random.randint(low=3, high=7)
    current_text = texts[right : right + current_len]
    if len(current_text)>0:
        sub_texts.append(current_text)
    right += current_len

In [225]:
from typing import List
import random
from torch.utils.data import Dataset, DataLoader


class MyDataset(Dataset):

    def __init__(self, tokenizer: AutoTokenizer, text: str):
        self.tokenizer = tokenizer
        self.texts = self._break_text(text)
        random.seed(1)



    def __getitem__(self, index) -> List[int]:
        return self.texts[index]


    def __len__(self) -> int:
        return len(self.texts)

    def _break_text(self, text: str) -> list[list[str]]:
        '''Breaking down text into chunks.'''
        tokenized = self.tokenizer.encode(text, add_special_tokens=False)
        right = 0
        sub_texts = []
        while right <= (len(tokenized)):
            current_len = np.random.randint(low=200, high=300)
            current_text = tokenized[right : right + current_len]
            if len(current_text)>0:
                sub_texts.append(current_text)
            right += current_len
        return sub_texts
    


dataset = MyDataset(tokenizer, text)

sample_0 = dataset.tokenizer.decode(dataset[0])

assert sample_0.startswith(text[:100])

## Collate FN - 5 баллов
Функция сборки, она же collate_fn. Она принимает батч сэмплов, т.е. список объектов, которые нам возвращает датасет!
Она должна принимать `List[List[int]]` батч объектов и возвращать 2 тензора:

* input_ids - `[batch, seq_len]` - батч токенов, в котором добавлены паддинги до максимальной длины в **текущем батче**.
* mask - `[batch, seq_len]` - батч масок. На позиции `[i, j]` стоит 0, если токен является паддингом, иначе 1.

В качестве значения паддинга для input_ids используйте `tokenizer.pad_token_id`

In [224]:
def collate_fn(batch: List[List[int]]) -> Tuple[torch.LongTensor, torch.LongTensor]:
    max_len = max([len(el) for el in batch])
    all_vecs = []
    for vec in batch:
        cur_item = torch.tensor(vec)
        pad_num = max_len - cur_item.shape[0]
        padded_vec = torch.nn.functional.pad(torch.tensor(vec), pad=[0, pad_num], value=tokenizer.pad_token_id)
        all_vecs.append(padded_vec)

    input_ids = torch.vstack(all_vecs)
    mask = (input_ids != tokenizer.pad_token_id).long()

    return input_ids, mask



batch = [
    [1, 2, 3, 4],
    [1, 2],
    [1, 2, 3, 4, 5, 6, 7],
]
input_ids_ref = torch.LongTensor([
    [1, 2, 3, 4, 50256, 50256, 50256],
    [1, 2, 50256, 50256, 50256, 50256, 50256],
    [1, 2, 3, 4, 5, 6, 7],
])


mask_ref = torch.LongTensor([
    [1, 1, 1, 1, 0, 0, 0],
    [1, 1, 0, 0, 0, 0, 0],
    [1, 1, 1, 1, 1, 1, 1],
])

input_ids, mask = collate_fn(batch)

assert (input_ids == input_ids_ref).all()
assert (mask == mask_ref).all()
print("All good")

All good


## Соберем DataLoader - 5 баллов

Нужно заполнить пропущенные поля и убедиться, что в датасете есть замаскированные токены!

In [None]:
from torch.utils.data.sampler import RandomSampler
sampler = RandomSampler(data_source=dataset)

In [246]:
from torch.utils.data.sampler import RandomSampler

BATCH_SIZE = 16

# ---- Ваш код здесь ----
sampler = RandomSampler(data_source=dataset, num_samples=16)
train_loader = DataLoader(
    dataset=dataset,
    sampler=sampler,
    collate_fn=collate_fn,
    batch_size=BATCH_SIZE
    )

# ---- Конец кода ----


for input_ids, mask in train_loader:
    break

assert (mask.sum(dim=1) < mask.size(1)).sum() < mask.size(0)
assert input_ids.size(0) == 16
print("all good")

all good


# Transformer - 20 баллов

Немного модфицированный блок трансформера, который мы скопируем с предыдущего занятия!

In [248]:
import torch
import torch.nn as nn
from dataclasses import dataclass


@dataclass
class Config:
    d_model: int = 768 # он же hidden_dim - внутрення размерность модели
    debug: bool = True
    layer_norm_eps: float = 1e-5
    d_vocab: int = 50257 # он же vocab_size, размер словаря модели
    init_range: float = 0.02
    n_ctx: int = 1024 # число позиционных эмбеддингов
    d_head: int = 64 # размерность головы аттеншена
    d_mlp: int = 3072 # внутренняя размерность FFN-слоя
    n_heads: int = 12 # число голов аттеншена
    n_layers: int = 12 # число слоев трансформера

cfg = Config()
print(cfg)

Config(d_model=768, debug=True, layer_norm_eps=1e-05, d_vocab=50257, init_range=0.02, n_ctx=1024, d_head=64, d_mlp=3072, n_heads=12, n_layers=12)


Эти модули остаются без изменений!
Скопируйте их из предыдущего домашнего задания.

In [249]:
class Embed(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.W_E = nn.Parameter(torch.empty((cfg.d_vocab, cfg.d_model)))
        nn.init.normal_(self.W_E, std=self.cfg.init_range)

    def forward(self, input_ids: Int[Tensor, "batch seq_len"]) -> Float[Tensor, "batch seq_len d_model"]:
        return torch.nn.functional.embedding(input=input_ids, weight=self.W_E)



class Unembed(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.W_U = nn.Parameter(torch.empty((cfg.d_model, cfg.d_vocab)))
        nn.init.normal_(self.W_U, std=self.cfg.init_range)
        self.b_U = nn.Parameter(torch.zeros((cfg.d_vocab)))

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_vocab"]:
    
        # ---- Ваш код здесь ----
        return  torch.nn.functional.linear(input=x, weight=self.W_U.T, bias=self.b_U)
        # ---- Конец кода ----

class MLP(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.W_in = nn.Parameter(torch.empty((cfg.d_model, cfg.d_mlp)))
        self.W_out = nn.Parameter(torch.empty((cfg.d_mlp, cfg.d_model)))
        self.b_in = nn.Parameter(torch.zeros((cfg.d_mlp)))
        self.b_out = nn.Parameter(torch.zeros((cfg.d_model)))
        nn.init.normal_(self.W_in, std=self.cfg.init_range)
        nn.init.normal_(self.W_out, std=self.cfg.init_range)

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Float[Tensor, "batch seq_len d_model"]:
        
        # ---- Ваш код здесь ----
        dense_res = torch.nn.functional.linear(input=x, weight=self.W_in.T, bias=self.b_in)
        gelu_res = torch.nn.functional.gelu(input=dense_res, approximate='tanh')
        return torch.nn.functional.linear(input=gelu_res, weight=self.W_out.T, bias=self.b_out)
        # ---- Конец кода ----



## RMSNorm - 5 баллов

Здесь нужно написать RMSNorm. В качестве формулы стоит ориентироваться на формулу 4 из [статьи RMSNorm](https://arxiv.org/pdf/1910.07467)


$$\bar{x}_i = \frac{x_i}{\text{RMS}(\mathbf{x})} w_i, \quad \text{where} \quad \text{RMS}(\mathbf{x}) = \sqrt{\frac{1}{n} \sum_{i=1}^{n} x_i^2}$$



In [262]:
class RMSNorm(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.w = nn.Parameter(torch.ones(cfg.d_model)) # gamma

    def forward(self, x: Float[Tensor, "batch seq_len d_model"]) -> Float[Tensor, "batch seq_len d_model"]:
        # ---- Ваш код здесь ----
        rms = torch.sqrt(torch.square(x).sum(dim=-1) / x.shape[-1])
        return (x/rms) * self.w
        # ---- Конец кода ----



cfg_rmsnorm = Config(d_model=5)
x = torch.Tensor([[[0.1, 0.2, 0.3, 0.4, 0.5]]]).to(device)
layer = RMSNorm(cfg_rmsnorm).to(device)
y = torch.Tensor([[[0.3015, 0.6030, 0.9045, 1.2060, 1.5076]]]).to(device)
assert torch.allclose(y, layer(x), atol=1e-4, rtol=1e-3)
print("OK")

OK


## Rotary Embeddings - 5 баллов

Нужно написать роторные эмбеддинги из [статьи](https://arxiv.org/pdf/2104.09864). В качестве формулы нужно взять пункт 3.4.2!


In [None]:
class RotaryPositionalEmbeddings(nn.Module):

    def __init__(self, cfg: Config, theta: int = 10_000):
        super().__init__()
        self.cfg = cfg
        self.max_seq_len = cfg.n_ctx
        self.theta = theta
        self.d = cfg.d_head
        
        # ---- Ваш код здесь ----
        # Углы theta_i. Смотрите секуцию 2.2 статьи для формулы!
        freqs = ...
        position_id = torch.arange(0, self.max_seq_len).float()
        
        # нужно получить матрицу m theta_i размера [max_seq_len, self.d] вида m theta_i
        # где m берется из position_id, а theta из freqs
        
        idx_theta = ...
        
        # max_seq_len, d_head
        cos = idx_theta.cos()
        sin = idx_theta.sin()
        
        # нужно продублировать размерности для формулы 34. theta_i встерчается два раза подряд в синусах и косинуса
        # тут нам поможет torch.repeat_interleave
        cos = ...
        sin = ...
        # ---- Конец кода ----
        
        

        # 1, max_seq_len, 1, d_head
        self.register_buffer("sin", sin.view(1, self.max_seq_len, 1, self.d))
        self.register_buffer("cos", cos.view(1, self.max_seq_len, 1, self.d))

    @staticmethod
    def rotate_neg_vector(x: Float[torch.Tensor, "batch seq_len num_heads d_head"]):
        # На входе x = [x1, x2, x3, x4, ... x_{n-1}, x_n]
        # На выходе x' = [-x2, x1, -x4, x3, ..., -x_n, x_{n-1}]
        x_new = torch.empty_like(x)
        
        
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----
        
        return x_new

    def forward(self, x: Float[torch.Tensor, "batch seq_len num_heads d_head"]):
        seq_len = x.size(1)
        x_rot = self.rotate_neg_vector(x)
        
        # ---- Ваш код здесь ----
        x_rope = ...
        # ---- Конец кода ----
        
        return x_rope




batch_size = 1
seq_len = 3
num_heads = 1
d_head = 16

torch.manual_seed(1)
x = torch.rand(batch_size, seq_len, num_heads, d_head)

rope_config = Config(
    n_heads=2,
    d_head=16,
)

rope_layer = RotaryPositionalEmbeddings(rope_config)
y = rope_layer(x)


from math import sin, cos


thetas = [10_000 ** (-2 * (i - 1) / rope_config.d_head) for i in range(1, rope_config.d_head // 2 + 1)]
all_good = True
for batch_idx in range(batch_size):
    for m in range(seq_len):
        if not all_good:
            break
        for head_idx in range(num_heads):
            if not all_good:
                break
            for d_idx in range(d_head):
                # 0, 2, 4
                if d_idx % 2 == 0:
                    val = x[batch_idx, m, head_idx, d_idx] * cos(m * thetas[d_idx // 2]) - x[batch_idx, m, head_idx, d_idx + 1] * sin(m * thetas[d_idx // 2])
                else:
                    val = x[batch_idx, m, head_idx, d_idx] * cos(m * thetas[d_idx // 2]) + x[batch_idx, m, head_idx, d_idx - 1] * sin(m * thetas[d_idx // 2])
                if abs(y[batch_idx, m, head_idx, d_idx] - val) > 1e-3:
                    print(f"Ошибка на позиции {m} и размерности {d_idx} в голове {head_idx}")
                    print(f"Полученное значение {y[batch_idx, m, head_idx, d_idx]}, референс {val}")
                    all_good = False
                    break


if all_good:
    print("Тесты прошли успешно!")


##  Attention masking - 3 балла

Копируем имлементацию из предыдущего домашнего задания, но теперь нужно учесть и маски с паддингами.
Для этого в `forward` и `apply_causal_mask` подана mask.

В оригинальном задании 3 мы считали, что паддингов нет, поэтому делали маску нижней треугольной, чтобы токен i смотрел на токен j только тогда, когда `i >= j`, т.е. токен i мог смотреть все токены до него.

Теперь же нужно сверх этого добавить еще и паддинг, т.е:

1. Нам дается маска `[batch_size, seq_len]` из `collate_fn`. Напомню, что на позиции `[batch_idx, m]` стоит 1, если токен настоящий или 0, если это паддинг
2. Мы должны модифицировать нашу нижнюю треугольную маску таким образом, чтобы не только не смотреть в будущее, но и не смотреть на паддинг.


## Attention Rotary Embedding - 2 балла
Также нужно вставить в attention слой роторные эмбеддинги:
1. Нужно добавить их в init метод модели, в качестве theta можно оставить 10000
2. Нужно применять их к матрицам Q, K перед матричным умножением $Q K^T$ в функции _get_qkv

In [None]:
class Attention(nn.Module):
    IGNORE: Float[Tensor, ""]

    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg

        self.W_Q = nn.Parameter(torch.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_Q = nn.Parameter(torch.zeros((cfg.n_heads, cfg.d_head)))

        self.W_K = nn.Parameter(torch.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_K = nn.Parameter(torch.zeros((cfg.n_heads, cfg.d_head)))

        self.W_V = nn.Parameter(torch.empty((cfg.n_heads, cfg.d_model, cfg.d_head)))
        self.b_V = nn.Parameter(torch.zeros((cfg.n_heads, cfg.d_head)))

        self.W_O = nn.Parameter(torch.empty((cfg.n_heads, cfg.d_head, cfg.d_model)))
        self.b_O = nn.Parameter(torch.zeros((cfg.d_model)))
        
        
        
        # ---- Ваш код здесь ----
        self.rope = ...
        # ---- Конец кода ----
        

        nn.init.normal_(self.W_Q, std=self.cfg.init_range)
        nn.init.normal_(self.W_K, std=self.cfg.init_range)
        nn.init.normal_(self.W_V, std=self.cfg.init_range)
        nn.init.normal_(self.W_O, std=self.cfg.init_range)
        self.register_buffer("IGNORE", torch.tensor(float("-inf"), dtype=torch.float32, device=device))

    def _get_qkv(
        self, x: Float[Tensor, "batch seq_len d_model"]
    ) -> Tuple[Float[Tensor, "batch seq_len num_heads d_head"]]:
        """1. Трансформируем матрицы проекций в формат [d_model, d_model] и получаем проекции  Q, K, V"""
        # Берем размерности
        batch_size, seq_len, d_model = x.shape
        num_heads = self.cfg.n_heads
        d_head = self.cfg.d_head

        W_Q = self.W_Q.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)
        W_K = self.W_K.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)
        W_V = self.W_V.permute(1, 0, 2).reshape(self.cfg.d_model, self.cfg.d_model)

        b_Q = self.b_Q.view(-1)
        b_K = self.b_K.view(-1)
        b_V = self.b_V.view(-1)
        
        
        # ---- Ваш код здесь ----
        Q = ...
        K = ...
        V = ...
        # не забудьте применить self.rotary после проекций!
        # ---- Конец кода ----


        return Q, K, V

    def _get_attention_dotprod(
        self,
        Q: Float[Tensor, "batch seq_len num_heads d_head"],
        K: Float[Tensor, "batch seq_len num_heads d_head"]
    ) -> Float[Tensor, "batch num_heads seq_len seq_len"]:
        """Q x K^T"""
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----

    def _get_attention_scores(
        self,
        attention_scores: Float[Tensor, "batch num_heads seq_len seq_len"],
        mask: Int[Tensor, "batch seq_len"]
    ) -> Float[Tensor, "batch num_heads seq_len seq_len"]:
        """Нормализация, маскирование и softmax"""
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----

    def _get_final_projection(
        self,
        V: Float[Tensor, "batch seq_len num_heads d_head"],
        attn_probs: Float[Tensor, "batch num_heads seq_len seq_len"]
    ) -> Float[Tensor, "batch seq_len d_model"]:
        """Финальная проекция
        permute [ batch, num_heads, seq_len, d_head]"""
        batch_size, seq_len = V.shape[0], V.shape[1]
        d_model = self.cfg.d_model
        num_heads = self.cfg.n_heads
        d_head = self.cfg.d_head
        
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"],  mask: Int[Tensor, "batch seq_len"]
    ) -> Float[Tensor, "batch seq_len d_model"]:
        # 1. получаем проекции  Q, K, V
        Q, K, V = self._get_qkv(x)
        # 2. Q x K^T
        attention_scores = self._get_attention_dotprod(Q, K)

        # 3. Нормализация, маскирование и softmax
        attn_probs = self._get_attention_scores(attention_scores, mask)

        # 6. Финальная проекция
        # permute [ batch, num_heads, seq_len, d_head]
        res = self._get_final_projection(V, attn_probs)
        return res

    def apply_causal_mask(
        self, attn_scores: Float[Tensor, "batch n_heads seq_len seq_len"], mask: Int[Tensor, "batch seq_len"]
    ) -> Float[Tensor, "batch n_heads seq_len seq_len"]:
        '''
        Applies a causal mask to attention scores, and returns masked scores.
        Используем треугольную маску, чтобы не смотреть в будущее!
        В качестве масикировочного значения перед софтмаксом можно использовать self.IGNORE (-inf)

        В дополнение к предыдущему заданию используйте аргумент mask, чтобы не смотреть не только на будущие токены,
        но и на паддинги.
        Сами паддинги могут смотреть на любые токены.
        '''
        seq_len = mask.size(1)
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----


mask_padding = torch.LongTensor([
    [1, 1, 1, 1, 0, 0, 0],
    [1, 1, 0, 0, 0, 0, 0],
    [1, 1, 1, 1, 1, 1, 1],
]).to(device)

lengths = mask_padding.sum(dim=1).tolist()


batch_size = 3
seq_len = 7
d_head = 8
n_heads = 4
torch.manual_seed(1)
x = torch.rand(batch_size, n_heads, seq_len, seq_len).to(device)

attn = Attention(cfg).to(device)
softmax_res = torch.softmax(attn.apply_causal_mask(x, mask_padding), dim=-1)

for batch_idx in range(batch_size):
    for head_idx in range(n_heads):
        sm = softmax_res[batch_idx, head_idx]
        l = lengths[batch_idx]
        for i in range(seq_len):
            for j in range(seq_len):
                # i < j - Causal mask, проверяем, что не смотрим в будущее!
                # j >= l - проверяем, что не смотрим на паддинги!
                if i < j or j >= l:
                    assert sm[i, j] == 0, (batch_idx, head_idx, i, j, sm[i, j])

_ = attn(torch.rand(batch_size, seq_len, 768).to(device), mask_padding.to(device))
print("All good")

## Собираем Transformer - 5

1. В TransformerBlock и DemoTransformer немного модифицируем код из предыдущего задания, чтобы передавать mask в слои аттеншена.
2. Не используем позиционные эмбеддинги, т.к. кодирование позиционной информации уже заложено в роторные эмбеддинги, которые являются частью attention слоя


In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.ln1 = RMSNorm(cfg)
        self.attn = Attention(cfg)
        self.ln2 = RMSNorm(cfg)
        self.mlp = MLP(cfg)

    def forward(
        self, x: Float[Tensor, "batch seq_len d_model"], mask: Float[Tensor, "batch seq_len"]
    ) -> Float[Tensor, "batch seq_len d_model"]:
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----
    

class DemoTransformer(nn.Module):
    def __init__(self, cfg: Config):
        super().__init__()
        self.cfg = cfg
        self.embed = Embed(cfg)
        self.blocks = nn.ModuleList([TransformerBlock(cfg) for _ in range(cfg.n_layers)])
        self.ln_final = RMSNorm(cfg)
        self.unembed = Unembed(cfg)

    def forward(self, input_ids: Int[Tensor, "batch seq_len"], mask: Int[Tensor, "batch seq_len"]) -> Float[Tensor, "batch seq_len d_vocab"]:
        # ---- Ваш код здесь ----
        raise NotImplemented()
        # ---- Конец кода ----

In [None]:
train_config = Config(
    d_model=128,
    n_ctx=512,
    n_heads=8,
    d_head=16,
    d_mlp=512,
    n_layers=12
)
model = DemoTransformer(train_config).to(device)

for input_ids, mask in train_loader:
    break

p = model(input_ids.to(device), mask.to(device))


assert list(p.shape) == [input_ids.size(0), input_ids.size(1), train_config.d_vocab]
p.sum().backward()

del model
del p
print("all good")

# Обучение - 15 баллов

## calculate_loss - 5

Здесь нужно написать обычный training loop. Вначале напишем функцию для подсчета функции потерь `calculate_loss`. Функция принимает выходы модели logits размерности \[batch_size, seq_len, vocab_size\], input_ids размерности \[batch_size, seq_len\] и attention_mask размерности \[batch_size, seq_len\].

Так как мы хотим учиться на задаче языкового моделирования, в logits на позиции \[i, j\] находится распределение токенов по словарю для токена на позиции \[i, j + 1\] (мы предсказываем следующий токен). Каждое такое предсказание следующего токена мы будем рассматривать как задачу классификации и учить с помощью кроссэнтропийной функции потерь.

Алгоритм:
1. Обрезаем logits по размерности seq_len справа на 1: последний токен на позиции N у нас предсказывает токен на позиции N + 1, однако (N + 1)-го токена у нас нет, поэтому использовать эти предсказания для обучения мы не сможем.
2. Заводим переменную labels - для этого обрезаем input_ids слева на 1. Это будет наш массив меток. Мы обрезаем его слева на 1 по размерности seq_len, т.е. по сути сдвигагем этот массив таким образом, что на j-й позиции теперь стоит (j + 1)-й токен. Это очень важно для подсчета функции потерь, т.к. мы предсказываем следующий токен
3. Аналогично labels обрезаем attention_mask и переводим маску в `.bool()`
4. На позициях, где attention_mask == 0 (паддинги) проставляем в labels значение -100. Это дефолтное значение [ignore_index](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html) из кроссэнтропийной функции потерь, означающее, что для этой метки не будет считаться функция потерь. Таким образом мы не будем учиться предсказывать паддинги
5. Объеднияем в logits и labels размерности batch и seqlen с помощью view и подаем это в кроссэнтропийную функцию потерь, считаем loss


In [None]:
from math import log


criterion = nn.CrossEntropyLoss()
pad_id = tokenizer.pad_token_id

def calculate_loss(logits, input_ids, attention_mask):
    labels = input_ids.detach().clone()
    # ---- Ваш код здесь ----
    raise NotImplemented()
    # ---- Конец кода ----



batch_size = 2
seq_len = 4
num_classes = 7

input_ids = torch.LongTensor(
    [
        [0, 1,  pad_id, pad_id],
        [0, 1, 2, 3]
    ]
)

attention_mask = torch.LongTensor(
    [
        [1, 1, 0, 0],
        [1, 1, 1, 1]
    ]
)


# batch_size, seq_len, num_classes
logits = torch.Tensor(
    [[[0.7576, 0.2793, 0.4031, 0.7347, 0.0293, 0.7999, 0.3971],
         [0.7544, 0.5695, 0.4388, 0.6387, 0.5247, 0.6826, 0.3051],
         [0.4635, 0.4550, 0.5725, 0.4980, 0.9371, 0.6556, 0.3138],
         [0.1980, 0.4162, 0.2843, 0.3398, 0.5239, 0.7981, 0.7718]],

        [[0.0112, 0.8100, 0.6397, 0.9743, 0.8300, 0.0444, 0.0246],
         [0.2588, 0.9391, 0.4167, 0.7140, 0.2676, 0.9906, 0.2885],
         [0.8750, 0.5059, 0.2366, 0.7570, 0.2346, 0.6471, 0.3556],
         [0.4452, 0.0193, 0.2616, 0.7713, 0.3785, 0.9980, 0.9008]]]
)
logits.requires_grad=True

loss = calculate_loss(logits, input_ids, attention_mask)

assert abs(loss.item() - 1.934269905) < 1e-3
print(loss.item())

## Training loop - 5


Давайте теперь напишем training loop:
1. Перемещаем input_ids и mask на правильный device
2. Зануляем градиенты модели
3. Считаем выходы модели (logits)
4. Считаем функцию потерь с помощью функции calculate_loss
5. Делаем backward и обновляем веса оптимизатором

Учить модель лучше 10+ эпох.

Также предлагается добавлять значения функции потерь в массив losses, чтобы изобразить её изменения в следующей клетке на графике

In [None]:
import torch.optim as optim

model = DemoTransformer(cfg).to(device)

model = model.train()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

losses = []
for epoch in range(15):
    for input_ids, mask in tqdm(train_loader):
        # 1. перемещаем входы на device
        input_ids = input_ids.to(device)
        mask = mask.to(device)
        # ---- Ваш код здесь ----
        # 2. Обнуляем градиенты
        ...
        # 3. Считаем выходы модели
        ...
        # 4. считаем функцию потерь
        loss = ...
        # 5. Делаем backward и шаг оптимизации
        
        losses.append(loss.item())
        # ---- Конец кода ----

In [None]:
import matplotlib.pyplot as plt
plt.plot(losses)

## Генерация - 5 баллов
Давайте теперь попробуем посмотреть, что у нас обучилось! Для этого проверим себя на жадной генерации.

Для этого:
1. Подаем входы в модель
2. Берем последний элемент в logits по размерности seq_len и argmax по нему. Это сгенерированный токен, полученный жадным сэмплингом.
3. Конкатенируем его ко входам, конкатенируем \[\[1\]\] в маску
4. Генерируем так 30 токенов

In [None]:
input_text = text[:13]
inputs = tokenizer(input_text, return_tensors="pt")

input_ids = inputs["input_ids"].to(device)
mask = inputs["attention_mask"].to(device)

orig_size = input_ids.size(1)

num_tokens_to_generate = 30

with torch.no_grad():
    for i in range(num_tokens_to_generate):
        
        # ---- Ваш код здесь ----
        logits = ...
        next_token = ...
        input_ids = ...
        mask = ...
        # ---- Конец кода ----

print("Input text:\n", input_text)
print()
print("Generated text:\n", tokenizer.decode(input_ids[0]))

Если все прошло успешно, то мы увидим какой-то небольшой, возможно,  повторяющийся текст. Смысла в нем скорее всего будет немного, но издалека он будет выглядеть вполне реалистично.

Осталось отмашстабировать модель, накинуть данных и получится настоящий pretrain!