# Домашнее задание: реализовать GPT-модель с Mixture of Experts слоями.

## Домашнее задание

- Расширьте данный пример, добавив реализацию RMSNorm и rotary embeddings. **(4 балла)**
- Проведите эксперимент по изменению числа экспертов и размера модели. **(4 балла)**
- Опишите, как scaling laws влияют на производительность модели. **(2 балла)**


## Обзор Mixture of Experts (MoE)

Mixture of Experts (MoE) – это подход, позволяющий масштабировать модели путём распределения вычислительных задач между несколькими "экспертами". Для каждого входа вычисляется распределение вероятностей по экспертам с помощью "гейтинговой" сети, и итоговый выход получается как взвешенная сумма выходов экспертов.

### Задачи метода MoE:
1. Увеличение параметров модели без линейного роста вычислительной нагрузки.
2. Обучение специализированных подсетей для различных аспектов данных.

В следующих ячейках представлен упрощённый пример реализации MoE-слоя и его интеграции в трансформер-блок.

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataclasses import dataclass

## Реализация MoE-слоя

В этой части мы создадим упрощённый класс MoE, где каждый эксперт – это простая линейная трансформация, а гейтинговая сеть определяет веса для каждого эксперта.

In [2]:
# Пример шаблонной реализации MoE (требует доработки)

class MixtureOfExperts(nn.Module):
    def __init__(self, input_dim, output_dim, num_experts=4):
        super(MixtureOfExperts, self).__init__()
        self.num_experts = num_experts

        # создаь список экспертов (каждый эксперт - линейное преобразование)
        self.experts = nn.ModuleList(
            [nn.Linear(input_dim, output_dim) for _ in range(num_experts)]
        )

        # Гейтинговая сеть для определения весов
        self.gate = nn.Linear(input_dim, num_experts)

    def forward(self, x):
        # вычислите gate_scores, gate_probs
        # примените экспертов к входу x и объедините результаты с использованием весов
        # подсказка: используйте torch.stack и torch.sum
        # например, gate_scores = self.gate(x)

        #[batch_size, input_dim] -> [batch_size, num_experts]
        gate_scores = self.gate(x)
        gate_probs = F.softmax(gate_scores, dim=-1)

        # Применяем каждого эксперта
        #[batch_size, output_dim, num_experts]
        expert_outputs = torch.stack([expert(x) for expert in self.experts], dim=-1)
        # Расширьте размеры gate_probs для совместимости
        gate_probs = gate_probs.unsqueeze(2)

        # Взвешенная сумма выходов экспертов
         #[batch_size, output_dim]
        output = torch.sum(expert_outputs * gate_probs, dim=-1)
        return output

#### Проверьте работу MoE-слоя на тестовом входе (при необходимости)

In [3]:
@torch.no_grad
def test_moe_out():
    moe = MixtureOfExperts(10, 15, num_experts=4)
    moe.gate.weight.data.fill_(1.0)
    moe.gate.bias.data.fill_(0.0)
    for expert in moe.experts:
        expert.weight.data.fill_(1.0)
        expert.bias.data.fill_(0.0)
    x = torch.ones(2, 2, 10)
    out = moe(x)
    assert out.shape == (2, 2, 15), "Output shape mismatch"
    assert torch.allclose(out, 10 * torch.ones((2, 2, 15))), "Wrong output"
test_moe_out()

## Объяснение реализации MoE-слоя

- **self.experts:** Список линейных слоёв, каждый из которых является "экспертом".
- **self.gate:** Линейный слой, выдающий веса для каждого эксперта.
- **forward:** Вычисляется softmax по выходу гейта, затем каждое линейное преобразование применяется к входу и комбинируется с помощью весов.


In [4]:
# класс TransformerBlockMoE (требует доработки)

class TransformerBlockMoE(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_experts=4, is_causal=True):
        super(TransformerBlockMoE, self).__init__()
        # многоголовое внимание
        self.attn = nn.MultiheadAttention(
            embed_dim=d_model, num_heads=num_heads, batch_first=True
        )
        self.head_size = d_model // num_heads
        self.num_heads = num_heads
        self.qkv_projection = nn.Linear(d_model, 3 * d_model)
        # слои нормализации
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        # MoE слой вместо стандартного feed-forward слоя
        self.moe = MixtureOfExperts(d_model, d_ff, num_experts=num_experts)
        # приводим к исходной размерности
        self.fc = nn.Linear(d_ff, d_model)
        self.is_causal=is_causal

    def forward(self, x, attention_mask = None):
        if self.is_causal:
            attention_mask = torch.ones((x.size(0) * self.num_heads, x.size(1), x.size(1)))
            attention_mask = torch.triu(attention_mask, diagonal=1).bool()
        #[bc, seq_len, d_model] -> [bc, seq_len, head_size * num_heads * 3]
        q,k,v = self.qkv_projection(x).chunk(3, dim=-1)
        attn_output, _ = self.attn(
            q, k, v,
            attn_mask = attention_mask,
            is_causal = self.is_causal
            )
        x = self.norm1(x + attn_output)

        # шаг 2: Применение MoE слоя с активацией ReLU
        moe_output = self.moe(x)
        ff_output = self.fc(F.relu(moe_output))
        x = self.norm2(x + ff_output)

        return x

## Объяснение трансформер-блока с MoE

- **Многоголовое внимание:** Стандартное self-attention.
- **norm1 и norm2:** Слои нормализации для стабилизации обучения.
- **MoE-слой:** Заменяет обычный feed-forward слой.
- **fc:** Линейное преобразование для приведения размерности обратно к d_model.


## Тестирование трансформер-блока с MoE

В следующей ячейке создадим тестовый пример:
- Сгенерируем случайный вход (например, эмбеддинги токенов)
- Пропустим вход через блок и выведем форму выходного тензора.


In [5]:
# Тестовый пример использования TransformerBlockMoE

# Задайте параметры модели
batch_size = 2
seq_len = 10
d_model = 32
num_heads = 4
d_ff = 64

# Сгенерируйте случайный вход
x = torch.randn(batch_size, seq_len, d_model)

# Инициализируйте блок трансформера с MoE
transformer_block = TransformerBlockMoE(d_model, num_heads, d_ff, num_experts=4)

# Пропустите вход через блок
output = transformer_block(x)

# Выведите формы входного и выходного тензоров
print("Форма входного тензора:", x.shape)
print("Форма выходного тензора:", output.shape)

Форма входного тензора: torch.Size([2, 10, 32])
Форма выходного тензора: torch.Size([2, 10, 32])


## Дополнительные темы для изучения

- **RMSNorm:** Альтернатива стандартной нормализации для улучшения обучения.
- **Rotary embeddings:** Улучшают представление позиционной информации.
- **Grouped Query Attention:** Модификация механизма внимания для повышения эффективности.

*Попробуйте самостоятельно интегрировать эти элементы в модель.*


In [6]:
class RMSNorm(nn.Module):
    def __init__(self, d_model:int, eps:float=1e-8, elementwise_affine:bool=True):
        super().__init__()
        self.eps = eps
        self.gamma = nn.Parameter(
            torch.ones(d_model),
            requires_grad=True if elementwise_affine else None
            )
    
    def forward(self, x):
        #faster than .pow
        sqare = x * x
        denominator = (sqare.sum(dim=-1, keepdim=True) / x.size(dim=-1) + self.eps).pow(0.5)
        return x / denominator * self.gamma

@torch.no_grad
def test_rms_out():
    norm = RMSNorm(10)
    inp = torch.ones((2, 2, 10))
    out = norm(inp)
    assert out.shape == inp.shape, "Wrong shape impl"
    assert torch.allclose(out, inp), "Wrong numerical impl"
test_rms_out()

In [7]:
class RoPE(nn.Module):
    def __init__(self, d_model:int, max_len:int, base:int=100_000):
        super().__init__()
        assert d_model % 2 == 0, "Embedding size must be divisible by 2"
        self.register_buffer(
            "bias", self._build_bias(d_model, max_len, base), persistent=False
            )

    def forward(self, x):
        """
        Make rope rotation in complex space for simplicity
        """
        #x.shape -> [bc, seq, head_size]
        orig_dtype = x.dtype
        orig_shape = x.shape

        #[bc, seq, head_size//2, 2]
        x = x.view(*orig_shape[:2], -1, 2).contiguous()
        x = torch.view_as_complex(x)
        x = self.bias[:orig_shape[1]] * x
        x = torch.view_as_real(x)
        x = x.view(orig_shape)
        return x.to(orig_dtype)

    def _build_bias(self, d_model, max_len, base):
        seq_idx = torch.arange(0, max_len)
        theta = 1.0 / base ** (torch.arange(0, d_model, 2) / d_model)
        position_matrix = torch.outer(seq_idx, theta)
        bias = torch.polar(torch.ones_like(position_matrix), position_matrix)
        return bias

@torch.no_grad
def test_rope_out():
    rope = RoPE(10, 4)
    inp = torch.ones((2, 4, 10))
    out = rope(inp)
    assert out.shape == inp.shape, "Wrong shape impl"
    assert torch.allclose(out[0, 0], torch.ones((10)))
test_rope_out()

In [8]:
class GQAsdpa(nn.Module):
    def __init__(
            self, d_model:int, num_q_heads:int,
            num_kv_heads:int, drop_p:float=.0, bias:bool=False, norm=None
            ):
        super().__init__()
        assert num_q_heads % num_kv_heads == 0, "Num q heads must be divisible by kv heads"
        assert d_model % num_q_heads == 0, "Model size must be divisible by num heads"
        self.head_size = d_model // num_q_heads
        self.num_q_heads = num_q_heads
        self.num_kv_heads = num_kv_heads
        self.group_size = num_q_heads // num_kv_heads
        self.norm = torch.sqrt(
            torch.tensor(self.head_size)) if norm is None else norm
        self.kv_proj = nn.Linear(
            d_model, 2 * self.head_size * num_kv_heads, bias=bias
            )
        self.q_proj = nn.Linear(
            d_model, self.head_size * num_q_heads, bias=bias
            )
        self.out_proj = nn.Linear(
            d_model, d_model, bias=bias
            )
        self.act = nn.Softmax(dim=-1)
        self.dropout = nn.Dropout(drop_p)

    def forward(self, x, attention_mask = None, is_causal=True):
        if attention_mask is not None:
            attention_mask = torch.where(
                attention_mask == 1, 1, torch.finfo(x.dtype).min
                )
        if is_causal:
            causal_mask = torch.full(
                (x.size(0), self.num_kv_heads, self.group_size, x.size(1), x.size(1)),
                fill_value=torch.finfo(x.dtype).min
                )
            causal_mask = torch.triu(causal_mask, diagonal=1)
            attention_mask = causal_mask if attention_mask is None else (attention_mask * causal_mask)

        #[bc, seq, num_kv_heads, 1, head_size]
        k,v = self.kv_proj(x).reshape(*x.shape[:2], self.num_kv_heads, 1, 2 * self.head_size).chunk(2, dim=-1)

        #[bc, seq, num_kv_heads, heads_per_group, head_size]
        q = self.q_proj(x).reshape(*x.shape[:2], self.num_kv_heads, -1, self.head_size)

        #[bc, num_groups, group_size, head_size, seq]
        q = q.permute(0, 2, 3, 1, 4).contiguous()
        #[bc, num_groups, group_size, seq, head_size]
        k = k.permute(0, 2, 3, 4, 1).contiguous()
        v = v.permute(0, 2, 3, 1, 4).contiguous()
        #[bc, num_groups, heads_per_group, seq, seq]
        attn_weight = q @ k + attention_mask
        #[bc, num_groups, heads_per_group, seq, head_size]
        out = self.act(self.dropout(attn_weight / self.norm)) @ v
        out = out.permute(0, 3, 1, 2, 4).reshape(*x.shape[:2], -1)
        out = self.out_proj(out)
        return out, attn_weight

@torch.no_grad
def test_gqa_out():
    attn_fn = GQAsdpa(16, 8, 2)
    x = torch.ones((2, 5, 16))
    out, _ = attn_fn(x)
    assert x.shape == out.shape
test_gqa_out()

In [9]:
class TransformerBlockGQA(nn.Module):
    def __init__(
            self, d_model, d_ff, num_q_heads, num_kv_heads, num_experts=4, is_causal=True
            ):
        super().__init__()
        # многоголовое внимание
        self.attn = GQAsdpa(
            d_model, num_q_heads, num_kv_heads
        )
        self.head_size = d_model // num_heads
        self.num_heads = num_heads
        # слои нормализации
        self.norm1 = RMSNorm(d_model)
        self.norm2 = RMSNorm(d_model)
        # MoE слой вместо стандартного feed-forward слоя
        self.moe = MixtureOfExperts(d_model, d_ff, num_experts=num_experts)
        # приводим к исходной размерности
        self.fc = nn.Linear(d_ff, d_model)
        self.is_causal = is_causal

    def forward(self, x, attention_mask = None):
        # шаг 1: Многоголовое внимание
        #[bc, seq_len, d_model] -> [bc, seq_len, head_size * num_heads * 3]
        attn_output, _ = self.attn(x, attention_mask, self.is_causal)
        x = self.norm1(x + attn_output)

        # шаг 2: Применение MoE слоя с активацией ReLU
        moe_output = self.moe(x)
        ff_output = self.fc(F.relu(moe_output))
        x = self.norm2(x + ff_output)
        return x

@torch.no_grad
def test_block_gqu_out():
    batch_size = 2
    seq_len = 10
    d_model = 32
    num_q_heads = 8
    num_kv_heads = 2
    d_ff = 64

    # Сгенерируйте случайный вход
    x = torch.randn(batch_size, seq_len, d_model)

    # Инициализируйте блок трансформера с MoE
    transformer_block = TransformerBlockGQA(d_model, d_ff, num_q_heads, num_kv_heads, num_experts=4)

    # Пропустите вход через блок
    output = transformer_block(x)

    # Выведите формы входного и выходного тензоров
    print("Форма входного тензора:", x.shape)
    print("Форма выходного тензора:", output.shape)

test_block_gqu_out()

Форма входного тензора: torch.Size([2, 10, 32])
Форма выходного тензора: torch.Size([2, 10, 32])


In [10]:
@dataclass
class TransformerConfig:
    num_layers: int = 8
    vocab_size: int = 1024
    d_model: int = 256
    d_ff: int = 1024
    use_gqa: bool = True
    num_heads: int = 9
    num_q_heads: int = 9
    num_kv_heads: int = 3
    num_experts: int = 4
    tie_embeddings: bool = True

In [11]:
class Decoder(nn.Module):
    def __init__(self, cfg: TransformerConfig):
        super().__init__()
        self.embedding = nn.Embedding(cfg.vocab_size, cfg.d_model)
        self.layers = nn.ModuleList([
            TransformerBlockGQA(cfg.d_model, cfg.d_ff, cfg.num_q_heads, cfg.num_kv_heads, cfg.num_experts)
            if cfg.use_gqa else TransformerBlockMoE(cfg.d_model, cfg.num_heads, cfg.d_ff, cfg.num_experts)
            for _ in range(cfg.num_layers)
        ])
        self.lm_head = nn.Linear(cfg.d_ff, cfg.vocab_size, bias=False)
        if cfg.tie_embeddings:
            self.lm_head.weight = self.embedding.weight
        
    def forward(self, x, attention_mask=None):
        out = self.embedding(x)
        for layer in self.layers:
            out = layer(out, attention_mask)
        out = self.lm_head(out)
        return out

@torch.no_grad
def test_block_gqu_out():
    config_gqa = TransformerConfig(
        num_layers=2,
        vocab_size=1024,
        d_model=36,
        d_ff=1024,
    )
    config = TransformerConfig(
        num_layers=2,
        vocab_size=1024,
        d_model=36,
        d_ff=1024,
        use_gqa=False
    )

    # Инициализируйте блок трансформера с MoE
    transformer = Decoder(config)
    transformer_gqa = Decoder(config_gqa)
    x = torch.arange(1, 26).long().expand(2, -1)
    out = transformer(x)
    out_gqa = transformer_gqa(x)
    # Выведите формы входного и выходного тензоров
    print("Форма входного тензора:", x.shape)
    print("Форма выходного тензора:", out.shape)
    print("Форма выходного тензора gqa:", out_gqa.shape)
    assert out.shape == (2, 25, 1024)
    assert out_gqa.shape == (2, 25, 1024)

test_block_gqu_out()

Форма входного тензора: torch.Size([2, 25])
Форма выходного тензора: torch.Size([2, 25, 1024])
Форма выходного тензора gqa: torch.Size([2, 25, 1024])


### Изменение числа параметров модели при изменении конфигурации

#### Изменение числа параметров с ростом числа блоков

Без GQA

In [12]:
from rich import print
model_config = TransformerConfig(
        num_layers=10,
        vocab_size=1024,
        d_model=576,
        d_ff=2304,
        use_gqa=False,
        num_heads=9,
        num_q_heads=9,
        num_kv_heads=3,
        num_experts=4,
        tie_embeddings=False
    )

def change_num_layers(base_config):
    min_params = 1
    for i in range(1, 50, 5):
        base_config.num_layers = i
        model = Decoder(base_config)
        num_params = sum(p.numel() for p in model.parameters()) / 1e6
        if i == 1:
            min_params = num_params    
        print(f"d_model: {base_config.d_model}, num_layers: {i} params: {num_params} М\n" +
              f"Прирост числа параметров: {num_params / min_params}%")
change_num_layers(model_config)

С GQA

In [13]:
model_config = TransformerConfig(
        num_layers=10,
        vocab_size=1024,
        d_model=576,
        d_ff=2304,
        use_gqa=True,
        num_heads=9,
        num_q_heads=9,
        num_kv_heads=3,
        num_experts=4,
        tie_embeddings=False
    )

def change_num_layers(base_config):
    min_params = 1
    for i in range(1, 50, 5):
        base_config.num_layers = i
        model = Decoder(base_config)
        num_params = sum(p.numel() for p in model.parameters()) / 1e6
        if i == 1:
            min_params = num_params    
        print(f"d_model: {base_config.d_model}, num_layers: {i} params: {num_params} М\n" +
              f"Прирост числа параметров: {num_params / min_params}%")
change_num_layers(model_config)

#### Изменение числа параметров с ростом числа экспертов

In [14]:
model_config = TransformerConfig(
        num_layers=10,
        vocab_size=1024,
        d_model=576,
        d_ff=2304,
        use_gqa=False,
        num_heads=12,
        num_q_heads=9,
        num_kv_heads=3,
        num_experts=4,
        tie_embeddings=False
    )
def change_num_experts(base_config):
    min_params = 1
    for i in range(1, 40, 5):
        base_config.num_experts = i
        model = Decoder(base_config)
        num_params = sum(p.numel() for p in model.parameters()) / 1e6
        if i == 1:
            min_params = num_params    
        print(f"d_model: {base_config.d_model}, num_experts: {i} params: {num_params} М\n" +
              f"Прирост числа параметров: {num_params / min_params}%")
change_num_experts(model_config)

### Выводы

Из полученных данных можно сделать вывод, что использование большего числа экспертов значительно увеличивает размер модели в ширину, при этом сохраняя ту же вычислительную сложность (подразумевается реализация moe, через top k) что и у базовой модели.

С другой же стороны при росте числа блоков увеличивается вычислительная сложность модели с увеличением числа параметров, так же можно заметить что использование GQA значительно снижает вычислительную сложность модели.