# Как работают трансформеры?

## Подготовка данных:

In [None]:
try:
    !nvidia-smi
    !pip install cupy-cuda12x
    import cupy as np
    if not np.cuda.runtime.getDeviceCount():
        raise ImportError
    print("Using GPU")
except:
    import numpy as np
    print("Using CPU")

import random
from typing import Union, Optional

/bin/bash: line 1: nvidia-smi: command not found
Collecting cupy-cuda12x
  Downloading cupy_cuda12x-13.6.0-cp312-cp312-manylinux2014_x86_64.whl.metadata (2.4 kB)
Collecting fastrlock>=0.5 (from cupy-cuda12x)
  Downloading fastrlock-0.8.3-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_28_x86_64.whl.metadata (7.7 kB)
Downloading cupy_cuda12x-13.6.0-cp312-cp312-manylinux2014_x86_64.whl (112.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.9/112.9 MB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fastrlock-0.8.3-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_28_x86_64.whl (53 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.9/53.9 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastrlock, cupy-cuda12x
Successfully installed cupy-cuda12x-13.6.0 fastrlock-0.8.3
Using CPU


In [None]:
# !pip install cupy-cuda12x

## **Создание функционального трансформера:**

In [None]:
#@title Глобальные параметры:
batch = 64
std = .05
heads = 4
d_model = 32
part_size = int(d_model / heads)  # Не менять
ff_multiply = 4
vocab_size = 12                   # Включая токены Start и Finish
max_tokens = 32

In [None]:
#@title Инициализация обучаемых параметров:

# Параметры внимания (для энкодера и декодера):
size1 = (2, heads, part_size, part_size)
size2 = (2, d_model, d_model)
MWQ = np.random.normal(size=size1, scale=std).astype(np.float32)
MWK = np.random.normal(size=size1, scale=std).astype(np.float32)
MWV = np.random.normal(size=size1, scale=std).astype(np.float32)
MWO = np.random.normal(size=size2, scale=std).astype(np.float32)

# Параметры перекрестного внимания:
size1 = (heads, part_size, part_size)
size2 = (d_model, d_model)
CWQ = np.random.normal(size=size1, scale=std).astype(np.float32)
CWK = np.random.normal(size=size1, scale=std).astype(np.float32)
CWV = np.random.normal(size=size1, scale=std).astype(np.float32)
CWO = np.random.normal(size=size2, scale=std).astype(np.float32)

# Параметры прямого прохода:
size1 = (2, d_model, d_model * ff_multiply)
size2 = (2, d_model * ff_multiply, d_model)
size3 = (2, d_model * ff_multiply)
size4 = (2, d_model)
FW1 = np.random.normal(size=size1, scale=std).astype(np.float32)
FW2 = np.random.normal(size=size2, scale=std).astype(np.float32)
FB1 = np.zeros(size3).astype(np.float32)
FB2 = np.zeros(size4).astype(np.float32)

# Параметры выходного слоя:
size1 = (d_model, vocab_size)
size2 = (vocab_size)
WO = np.random.normal(size=size1, scale=std).astype(np.float32)
BO = np.zeros(size2).astype(np.float32)

# Обучаемые параметры (для оптимизатора):
parameters = [
    MWQ[0], MWK[0], MWV[0], MWO[0],
    FW1[0], FB1[0], FW2[0], FB2[0],
    MWQ[1], MWK[1], MWV[1], MWO[1],
    CWQ, CWK, CWV, CWO,
    FW1[1], FB1[1], FW2[1], FB2[1],
    WO, BO,
]

In [None]:
#@title Работа с вокабуляром:
def build_fixed_embeddings(vocab_size: int, random_state: int = 13) -> np.ndarray:
    """Ортогональные эмбеддинги"""
    rng = np.random.default_rng(random_state)
    mat = rng.standard_normal(size=(d_model, d_model))
    Q, _ = np.linalg.qr(mat)
    emb = Q[:vocab_size]
    return emb.astype(np.float32)


vocab = build_fixed_embeddings(vocab_size)
Start = 10
Finish = 11

In [None]:
#@title Вспомогательные функции:
def softmax(X: np.ndarray) -> np.ndarray:
    """Softmax по последней оси"""
    shifted = X - np.max(X, axis=-1, keepdims=True)
    exp = np.exp(shifted)
    return exp / np.sum(exp, axis=-1, keepdims=True)


def leakyrelu(X: np.ndarray, rate: float = .1) -> np.ndarray:
    """Leaky ReLU"""
    return np.where(X > 0, X, rate * X).astype(np.float32)


def add_start_token(data: np.ndarray) -> np.ndarray:
    """Присоединение стартового токена в начало последовательности"""
    size = (data.shape[0], 1)
    start = np.tile(Start, size).astype(np.int32)  # Стартовый токен (B, 1)
    return np.concatenate((start, data), axis=1)   # Стартовый набор (B, 1+T)


def add_finish_token(data: np.ndarray) -> np.ndarray:
    """Присоединение финишного токена в конец последовательности"""
    size = (data.shape[0], 1)
    finish = np.tile(Finish, size).astype(np.int32)  # Финишный токен (B, 1)
    return np.concatenate((data, finish), axis=1)    # Финишный набор (B, T+1)

def predict(data: np.ndarray, dtype=np.int32) -> np.ndarray:
    """Выдает предсказание для данных"""
    predictions = []
    for batch in data:
        predictions.append(call(batch))
    return np.array(predictions, dtype=dtype)

In [None]:
#@title Основные слои:
def positional_encoding(X: np.ndarray) -> tuple[np.ndarray, dict]:
    """Позиционное кодирование токенов (наложение частот)"""
    B, T, D = X.shape

    # Номера позиций токенов:
    positions = np.arange(T, dtype=np.float32)[:, None]  # (T, 1)

    # Матрица частот:
    i = np.arange(D // 2, dtype=np.float32)         # Индексы эмбеддингов
    angles = positions * 10000 ** (-2 * i / D)      # (1, D)

    pe = np.zeros((T, D), dtype=np.float32)         # (T, D)
    pe[:, 0::2] = np.sin(angles)                    # Для чётных индексов
    pe[:, 1::2] = np.cos(angles)                    # Для нечётных

    pe = np.tile(pe, (B, 1, 1)).astype(np.float32)  # (B, T, D)

    return X + pe                                   # Наложение частот

def normalize(X: np.ndarray, eps: float = 1e-8):
    """Простая нормализация без масштабирования и обучаемых параметров"""
    rms = np.sqrt(np.mean(X**2, axis=-1, keepdims=True) + eps)  # (B T 1)
    cache = {
        "X": X,
        "rms": rms
    }
    return X / rms, cache

def MHA(X: np.ndarray, is_decoder: bool = False, index: int = 0, neg_inf: float = -1e9) -> tuple[np.ndarray, dict]:
    """Многоголовое внимание"""
    T = X.shape[1]

    # Разделение входной матрицы для мультивнимания:
    X = X.reshape(X.shape[0], T, heads, part_size)     # (B T H P)

    # Матрицы проекций:
    Q = np.einsum('bthd, hdp -> bhtp', X, MWQ[index])  # (B T H P)(H P P) (B H T P)
    K = np.einsum('bthd, hdp -> bhtp', X, MWK[index])  # (B T H P)(H P P) (B H T P)
    V = np.einsum('bthd, hdp -> bhtp', X, MWV[index])  # (B T H P)(H P P) (B H T P)

    # Матрица внимания:
    A = np.einsum('bhtp, bhfp -> bhtf', Q, K)          # (B H T P)(B H T P) (B H T T)
    A = A / np.sqrt(part_size, dtype=np.float32)       # Масштабирование

    # Маскирование внимания:
    if is_decoder:
        size = (X.shape[0], heads, T, T)
        mask = np.triu(
            np.full(size, neg_inf),
            k=1
        ).astype(np.float32)
        A = A + mask

    # Матрица взвешенны значений:
    A_softmax = softmax(A)
    Z = np.einsum('bhtf, bhfp -> bhtp', A_softmax, V)  # (B H T T)(B H T P) (B H T P)

    # Объединение голов:
    shape = (X.shape[0], T, d_model)
    Zo = Z.transpose(0, 2, 1, 3)                       # (B H T P) (B T H P)
    Zo = Zo.reshape(shape)                             # (B T H P) (B T D)
    Zo = np.einsum('btd, df -> btf', Zo, MWO[index])   # (B T D)(D D) (B T D)

    cache = {
        "X": X, "Q": Q, "K": K, "V": V,
        "A": A, "attn": A_softmax, "Z": Z,
        "index": index, "mask": mask if is_decoder else None,
    }
    return Zo, cache


def MHCA(X: np.ndarray, context: np.ndarray) -> tuple[np.ndarray, dict]:
    """Многоголовое перекрестное внимание"""

    # Разделение входной матрицы для мультивнимания:
    new_shape_X = (X.shape[0], X.shape[1], heads, part_size)
    new_shape_Z = (context.shape[0], context.shape[1], heads, part_size)
    X = X.reshape(new_shape_X)                         # (B Tx H P)
    context = context.reshape(new_shape_Z)             # (B Tz H P)

    # Матрицы проекций:
    Q = np.einsum('bthd, hdp -> bhtp', X, CWQ)         # (B Tx H P)(H P P) (B H Tx P)
    K = np.einsum('bthd, hdp -> bhtp', context, CWK)   # (B T H P)(H P P) (B H Tz P)
    V = np.einsum('bthd, hdp -> bhtp', context, CWV)   # (B T H P)(H P P) (B H Tz P)

    # Матрица внимания:
    A = np.einsum('bhxp, bhtp -> bhxt', Q, K)          # (B H Tx P)(B H Tz P) (B H Tx Tz)
    A = A / np.sqrt(part_size, dtype=np.float32)
    A_softmax = softmax(A)

    # Матрица взвешенных значений:
    Z = np.einsum('bhxt, bhtp -> bhxp', A_softmax, V)  # (B H Tx Tz)(B H Tz P) (B H Tx P)

    # Объединение голов:
    Zo = Z.transpose(0, 2, 1, 3)                       # (B H Tx P) (B Tx H P)
    shape = (X.shape[0], X.shape[1], d_model)
    Zo = Zo.reshape(shape)                             # (B Tx H P) (B Tx D)
    Zo = np.einsum('btd, df -> btf', Zo, CWO)          # (B Tx D)(D D) (B Tx D)

    cache = {
        "X": X, "context": context, "Q": Q, "K": K, "V": V,
        "A": A, "attn": A_softmax, "Z": Z,
    }
    return Zo, cache

def FF(X: np.ndarray, index: int = 0) -> tuple[np.ndarray, dict]:
    """Полносвязный блок с расширением"""
    X1 = np.einsum('BTD, DP -> BTP', X, FW1[index])   # (B T D)(D D) (B T D*ff)
    X2 = X1 + FB1[index]                              # (D*ff,) (B T D*ff)
    X3 = leakyrelu(X2)
    X4 = np.einsum('BTD, DP -> BTP', X3, FW2[index])  # (B T D)(D*ff D) (B T D)
    X5 = X4 + FB2[index]                              # (D,) (B T D)
    return X5, {
        "X": X, "W1": X1, "B1": X2,
        "act": X3, "W2": X4, "index": index
    }

def output(X: np.ndarray) -> tuple[np.ndarray, dict]:
    """Конечный линейный слой проекции словаря"""
    Y = np.einsum('BTD, DV -> BTV', X, WO)  # (B T D)((D V) (B T V)
    Y = Y + BO                              # (B T V)
    return Y, {"X": X}

In [None]:
#@title Обратные функции слоев:
def backward_normalize(dY: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
    """
    Форма dY: (B, T, D)
    Возврат:
        dX: (B, T, D)
    """
    X = cache["X"]
    rms = cache["rms"]
    D = X.shape[-1]
    return (dY / rms - X * np.sum(dY * X, axis=-1, keepdims=True) / (D * rms**3))

def backward_MHA(dZ: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
    """
    Форма dZo: (B, Tx, D)
    Возврат:
        dX: (B, T, D)
        grads: словарь градиентов
    """
    X = cache["X"]              # (B T H P)
    Q = cache["Q"]              # (B H T P)
    K = cache["K"]              # (B H T P)
    V = cache["V"]              # (B H T P)
    A_softmax = cache["attn"]   # (B H T T)
    Z = cache["Z"]              # (B H T P)
    index = cache["index"]

    B, T, _, _ = X.shape

    # Производная общей матрицы WO:
    dMWO = np.einsum(
        'btd, btf -> df',
        Z.transpose(0, 2, 1, 3).reshape(B, T, d_model),
        dZ
    )                                                         # (B T D1)(B T D2) (D1 D2)
    dZ = np.einsum(
        'btd, fd -> btf',
        dZ,
        MWO[index]
    ).reshape(B, T, heads, part_size).transpose(0, 2, 1, 3)   # (B T D)(D1 D2) (B T D1) (B H T P)

    # Производная для проекции значений V:
    dA = np.einsum('bhtp, bhfp -> bhtf', dZ, V)               # (B H T P)(B H T P) (B H T T)
    dV = np.einsum('bhit, bhtp -> bhip', A_softmax, dZ)       # (B H T1 T2)(B H T P) (B H T1 P)

    # Обнуление градиентов (если есть маска):
    if (mask := cache["mask"]) is not None:
        dA = np.where(mask == 0, dA, 0.0).astype(np.float32)

    # Производная софтмакса:
    tmp = np.sum(dA * A_softmax, axis=-1, keepdims=True)
    dA = A_softmax * (dA - tmp)
    dA = dA / np.sqrt(part_size, dtype=np.float32)

    # Производная внимания:
    dQ = np.einsum('bhtf, bhtp -> bhfp', dA, K)              # (B H T1 T2)(B H T1 P) (B H T2 P)
    dK = np.einsum('bhtf, bhfp -> bhtp', dA, Q)              # (B H T1 T2)(B H T2 P) (B H T1 P)

    # Производная по проекции входных данных:
    dMWQ = np.einsum('bthp, bhtf -> hfp', X, dQ)             # (B T H P)(B H T P) (H P P)
    dMWK = np.einsum('bthp, bhtf -> hfp', X, dK)             # (B T H P)(B H T P) (H P P)
    dMWV = np.einsum('bthp, bhtf -> hfp', X, dV)             # (B T H P)(B H T P) (H P P)

    # Производная по входу:
    dX = (
        np.einsum('bhtp, hfp -> bthf', dQ, MWQ[index]) +     # (B T H P)(H P1 P2) (B T H P1)
        np.einsum('bhtp, hfp -> bthf', dK, MWK[index]) +     # (B T H P)(H P1 P2) (B T H P1)
        np.einsum('bhtp, hfp -> bthf', dV, MWV[index])       # (B T H P)(H P1 P2) (B T H P1)
    ).reshape(B, T, d_model)

    return dX, {
        "MWQ": dMWQ,
        "MWK": dMWK,
        "MWV": dMWV,
        "MWO": dMWO
    }


def backward_MHCA(dZ: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
    """
    Форма dZo: (B, Tx, D)
    Возврат:
        dX: (B, Tx, D)
        dContext: (B, Tz, D)
        grads: словарь градиентов
    """

    X = cache["X"]                 # (B Tx H P)
    context = cache["context"]     # (B Tz H P)
    Q = cache["Q"]                 # (B H Tx P)
    K = cache["K"]                 # (B H Tz P)
    V = cache["V"]                 # (B H Tz P)
    A_softmax = cache["attn"]      # (B H Tx Tz)
    Z = cache["Z"]                 # (B H Tx P)

    B, Tx, H, P = X.shape
    Tz = context.shape[1]

    # Производная общей матрицы WO:
    dCWO = np.einsum(
        'btd, btf -> df',
        Z.transpose(0, 2, 1, 3).reshape(B, Tx, d_model),
        dZ
    )                                                     # (B T D1)(B T D2) (D1 D2)
    dZ = np.einsum(
        'btd, fd -> btf',
        dZ,
        CWO
    ).reshape(B, Tx, H, P).transpose(0, 2, 1, 3)          # (B Tx D)(D1 D2) (B Tx D1) (B H Tx P)

    # Производная для проекции значений V:
    dA = np.einsum('bhtp, bhfp -> bhtf', dZ, V)           # (B H Tx P)(B H Tz P) (B H Tx Tz)
    dV = np.einsum('bhtf, bhtp -> bhfp', A_softmax, dZ)   # (B H Tx Tz)(B H Tx P) (B H Tz P)

    # Производная софтмакса:
    tmp = np.sum(dA * A_softmax, axis=-1, keepdims=True)
    dA = A_softmax * (dA - tmp)
    dA = dA / np.sqrt(part_size, dtype=np.float32)

    # Производная внимания:
    dQ = np.einsum('bhtf, bhfp -> bhtp', dA, K)           # (B H Tx Tz)(B H Tz P) (B H Tx P)
    dK = np.einsum('bhtf, bhtp -> bhfp', dA, Q)           # (B H Tx Tz)(B H Tx P) (B H Tz P)

    # Производная по проекциям входных данных:
    dCWQ = np.einsum('bthp, bhtf -> hpf', X, dQ)          # (B Tx H P)(B H Tx P) (H P P)
    dCWK = np.einsum('bthp, bhtf -> hpf', context, dK)    # (B Tx H P)(B H Tz P) (H P P)
    dCWV = np.einsum('bthp, bhtf -> hpf', context, dV)    # (B Tx H P)(B H Tz P) (H P P)

    # Производная по входу (X и Z):
    dX = np.einsum(
        'bhtp, hfp -> bthf', dQ, CWQ
    ).reshape(B, Tx, d_model)                             # (B Tx H P)(H P1 P2) (B Tx H P1)
    dZ = (
        np.einsum('bhtp, hfp -> bthf', dK, CWK) +         # (B Tz H P)(H P1 P2) (B Tz H P1)
        np.einsum('bhtp, hfp -> bthf', dV, CWV)           # (B Tz H P)(H P1 P2) (B Tz H P1)
    ).reshape(B, Tz, d_model)

    grads = {
        "CWQ": dCWQ,
        "CWK": dCWK,
        "CWV": dCWV,
        "CWO": dCWO
    }

    return dX, dZ, grads


def backward_FF(dX5: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
    """
    Форма dX5: (B, T, D)
    Возврат:
        dX: (B, T, D)
        grads: словарь градиентов
    """
    X  = cache["X"]         # (B T D)
    X1 = cache["W1"]        # (B T D*ff)
    X2 = cache["B1"]        # (D*ff,)
    X3 = cache["act"]       # (B T D*ff)
    index = cache["index"]  # (i)

    # Градиент для Байес 2:
    dFB2 = np.sum(dX5, axis=(0, 1))                            # (B T D) (D,)

    # Производная линейного слоя 2:
    dFW2 = np.einsum('btd, btp -> dp', X3, dX5)                # (B T D*ff)(B T D) (D*ff D)
    dX3 = np.einsum('btp, dp -> btd', dX5, FW2[index])         # (B T D)(D*ff D) (B T D*ff)

    # Производная нелинейного слоя:
    dX2 = dX3 * np.where(X2 > 0, 1.0, 0.1).astype(np.float32)  # (B T D*ff)

    # Градиент для Байес 1:
    dFB1 = np.sum(dX2, axis=(0, 1))                            # (D*ff,)

    # Производная линейного слоя 1:
    dFW1 = np.einsum('btd, btp -> dp', X, dX2)                 # (B T D)(D D*ff) (D, D*ff)
    dX = np.einsum('btp, dp -> btd', dX2, FW1[index])          # (B T D*ff)(D D*ff) (B T D)

    grads = {
        "FW1": dFW1,
        "FB1": dFB1,
        "FW2": dFW2,
        "FB2": dFB2
    }

    return dX, grads


def backward_output(dY: np.ndarray, cache: dict) -> tuple[np.ndarray, dict]:
    """
    Форма dY: (B, T, V)
    Возврат:
        dX: (B, T, D)
        grads: словарь градиентов
    """
    X = cache["X"]

    # Производная сдвига:
    dBO = np.sum(dY, axis=(0, 1))             # (V,)

    # Производная по весам:
    dWO = np.einsum('btd, btv -> dv', X, dY)  # (B T D)(B T V) (D V)

    # Производная по входу:
    dX = np.einsum('btv, dv -> btd', dY, WO)  # (B T V)(D V) (B T D)

    grads = {
        "WO": dWO,
        "BO": dBO
    }

    return dX, grads

In [None]:
#@title Функция ошибки:
def SparseCrossEntropy(prediction: np.ndarray, target: np.ndarray, eps: float = 1e-8) -> tuple[int, np.ndarray]:
    """
    prediction: [B, T, V] — логиты
    target: [B, T] — индексы правильных токенов
    """
    probs = softmax(prediction)
    B, T, V = probs.shape

    correct_probs = probs[np.arange(B)[:, None], np.arange(T)[None, :], target]

    loss_value = -np.sum(np.log(correct_probs + eps), dtype=np.float32)
    loss_value = loss_value / (B * T)
    loss_value = np.array(loss_value, dtype=np.float32)  # ensure scalar-array of correct dtype

    grad_prediction = probs.copy().astype(np.float32)
    grad_prediction[np.arange(B)[:, None], np.arange(T)[None, :], target] -= 1.0
    grad_prediction /= (B * T)

    return loss_value, grad_prediction

In [None]:
#@title Прямой проход:
def call(X: np.ndarray, Y: np.ndarray = None) -> np.ndarray:
    """Проход по блокам"""
    # Энкодер:
    training = True if Y is not None else False
    X = np.take(vocab, X, axis=0)                   # Получение эмбеддингов (B, T, D)
    Xr = positional_encoding(X)                      # Позиционный энкодинг

    X, cache_Norm1 = normalize(Xr)
    X, cache_MHAe = MHA(X, index=0)
    Xr = Xr + X
    X, cache_Norm2 = normalize(Xr)
    X, cache_FFe = FF(X, index=0)
    Z = Xr + X

    # Декодер:
    if training:                                    # Режим обучения
        X = add_start_token(Y)                      # Присоединение стартового набора (B, 1+T)
        X = np.take(
            vocab,
            X,
            axis=0
        ).astype(np.float32)                         # Получение эмбеддингов (B, 1+T, D)
        Xr = positional_encoding(X)                  # Позиционное кодирование

        X, cache_Norm3 = normalize(Xr)
        X, cache_MHAd = MHA(X, index=1, is_decoder=True)
        Xr = Xr + X
        X, cache_Norm4 = normalize(Xr)
        X, cache_MHCA = MHCA(X, Z)
        Xr = Xr + X
        X, cache_Norm5 = normalize(Xr)
        X, cache_FFd = FF(X, index=1)
        Xr = Xr + X
        X, cache_Norm6 = normalize(Xr)
        X, cache_out = output(X)
        cache = {
            "Norm1": cache_Norm1, "MHA-E": cache_MHAe,
            "Norm2": cache_Norm2, "FF-E": cache_FFe,
            "Norm3": cache_Norm3, "MHA-D": cache_MHAd,
            "Norm4": cache_Norm4, "MHCA": cache_MHCA,
            "Norm5": cache_Norm5, "FF-D": cache_FFd,
            "Norm6": cache_Norm6, "Out": cache_out
        }
        return X, cache
    else:                                          # Режим генерации
        seq = np.tile(
            Start, (Z.shape[0], 1)
        ).astype(np.int32)                         # Стартовый набор (B, 1)
        for _ in range(max_tokens):                # Цикл токенов
            X = np.take(vocab, seq, axis=0)        # Получение эмбеддингов (B, 1, D)
            Xr = positional_encoding(X)            # Позиционное кодирование

            X, _ = normalize(Xr)
            X, _ = MHA(X, index=1, is_decoder=True)
            Xr = Xr + X
            X, _ = normalize(Xr)
            X, _ = MHCA(X, Z)
            Xr = Xr + X
            X, _ = normalize(Xr)
            X, _ = FF(X, index=1)
            X = Xr + X
            X, _ = output(X)

            X = softmax(X)                         # Вероятности

            indexes = np.argmax(
                X[:, -1, :], axis=-1
                )[..., None]                       # Получение индексов новых токенов
            seq = np.concatenate(
                (seq, indexes)
                , axis=1
            )                                      # Добавление токенов
        return seq

In [None]:
#@title Обратный проход:
def backward(loss: np.ndarray, cache) -> tuple[dict, ...]:
    """
    Обратный проход.
    Возвращает наборы градиентов для каждого слоя.
    """
    # Декодер:
    dX, grads1 = backward_output(loss, cache['Out'])
    dXr = backward_normalize(dX, cache['Norm6'])

    dX, grads2 = backward_FF(dXr, cache['FF-D'])
    dX = backward_normalize(dX, cache['Norm5'])
    dXr = dXr + dX

    dX, dZ, grads3 = backward_MHCA(dXr, cache['MHCA'])
    dX = backward_normalize(dX, cache['Norm4'])
    dXr = dXr + dX

    dX, grads4 = backward_MHA(dXr, cache['MHA-D'])
    # dX = backward_normalize(dX, cache['Norm3'])
    # dXr = dXr + dX

    # Энкодер:
    dX, grads5 = backward_FF(dZ, cache['FF-E'])
    dX = backward_normalize(dX, cache['Norm2'])
    dXr = dX + dZ

    dX, grads6 = backward_MHA(dXr, cache['MHA-E'])
    # dX = backward_normalize(dX, cache['Norm1'])
    # dXr = dXr + dX

    return grads6, grads5, grads4, grads3, grads2, grads1

In [None]:
#@title Проверочный код:
size = (batch, max_tokens, d_model)
data = np.random.random(size).astype(np.float32)

# Проверка основных слоев:
X, cache_MHA = MHA(data, index=0)
assert X.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("MHA", X.shape)

X, cache_MHCA = MHCA(data, X)
assert X.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("MHCA", X.shape)

X, cache_FF = FF(X, index=0)
assert X.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("FF", X.shape)

X, cache_out = output(X)
assert (size[0], size[1], vocab_size) == X.shape, "Формы данных входа и выхода не совпали!"
print("Out", X.shape, '\n')

# Проверка обратных слоев:
dX, grads = backward_output(X, cache_out)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dOut", dX.shape)

dX, cache = backward_FF(dX, cache_FF)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dFF", dX.shape)

dX, dZ, cache = backward_MHCA(dX, cache_MHCA)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
assert dZ.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dMHCA", dX.shape, dZ.shape)

dX, grads = backward_MHA(dX, cache_MHA)
assert dX.shape == data.shape, "Формы данных входа и выхода не совпали!"
print("dMHA", dX.shape, '\n')

# Проверка верхнеуровневых функций:
size = (batch, max_tokens)
data = np.random.randint(0, 9, size=size, dtype=np.int32)

# Инференс:
out = call(data)
assert out.shape == (batch, max_tokens + 1), f"Выходной размер не соответствует ожидаемому!"
print("Call generate", out.shape)

# Режим обучения:
out, cache = call(data, data)
assert out.shape == (batch, max_tokens + 1, vocab_size), f"Выходной размер не соответствует ожидаемому!"
print("Call training", out.shape)

# Проверка Loss-функции:
target = np.argmax(softmax(out), axis=-1)
print(target.shape)
loss, grad = SparseCrossEntropy(out, target)
# assert round(float(loss)) == round(np.log(vocab_size)), f"Лосс не совпал с ожидаемым!"
print("Loss ", loss)

# Обратный проход:
grads = backward(out, cache)
assert len(grads) == 6, f"Не все слои получили градиенты!"
print("Backward grads:")
for g in grads:
    print(g.keys())


MHA (64, 16, 32)
MHCA (64, 16, 32)
FF (64, 16, 32)
Out (64, 16, 12) 

dOut (64, 16, 32)
dFF (64, 16, 32)
dMHCA (64, 16, 32) (64, 16, 32)
dMHA (64, 16, 32) 

Call generate (64, 17)
Call training (64, 17, 12)
(64, 17)
Loss  2.1832407
Backward grads:
dict_keys(['MWQ', 'MWK', 'MWV', 'MWO'])
dict_keys(['FW1', 'FB1', 'FW2', 'FB2'])
dict_keys(['MWQ', 'MWK', 'MWV', 'MWO'])
dict_keys(['CWQ', 'CWK', 'CWV', 'CWO'])
dict_keys(['FW1', 'FB1', 'FW2', 'FB2'])
dict_keys(['WO', 'BO'])


## **Подготовка к обучению модели:**

In [None]:
#@title Генератор данных:
class Generator:
    def __init__(
            self, batch_size: int,
            number_tokens: int, number_batch: int,
            shuffle: bool = True, test_size: float = .2,
            dtype=np.int32
    ) -> None:
        self.batch_size = batch_size
        self.number_tokens = number_tokens
        self.number_batch = number_batch
        self.shuffle = shuffle
        self.test_size = test_size
        self.dtype = dtype

    def palindrome(self) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]:
        """Создание последовательностий с переворотом второй половины"""
        data, target = list(), list()
        self.number_tokens //= 2
        for _ in range(self.number_batch):
            data_batch, target_batch = list(), list()
            for num in range(self.batch_size):
                half = str(random.randint(10 ** (self.number_tokens - 1), 10 ** self.number_tokens - 1))
                reverse = list(half)
                reverse.reverse()
                data_batch.append([int(i) for i in "".join((half, half))])
                target_batch.append([int(i) for i in "".join((half, "".join(reverse)))])
            data.append(data_batch)
            target.append(target_batch)

        data = np.array((data, target), dtype=self.dtype).transpose(1, 0, 2, 3)
        return self.train_test_split(data, self.test_size) if self.shuffle else data

    def symmetric_sum(self) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]:
        """Симметричная сумма T[i] = (S[i] + S[N−1−i]) % 10"""
        data, target = [], []
        for _ in range(self.number_batch):
            batch_input, batch_target = [], []
            for _ in range(self.batch_size):
                # Случайная входная последовательность:
                S = [random.randint(0, 9) for _ in range(self.number_tokens)]
                T = [(S[i] + S[self.number_tokens - 1 - i]) % 10 for i in range(self.number_tokens)]
                batch_input.append(S)
                batch_target.append(T)
            data.append(batch_input)
            target.append(batch_target)

        data = np.array((data, target), dtype=self.dtype).transpose(1, 0, 2, 3)
        return self.train_test_split(data, self.test_size) if self.shuffle else data


    def train_test_split(self, data: np.ndarray, test_size: float = .2) -> tuple[np.ndarray, np.ndarray]:
        """Разбиение данных"""
        idx = np.random.permutation(len(data))  # Случайный порядок индексов
        data = data[idx]
        index = int(len(data) * (1 - test_size))
        return data[:index], data[index:]


In [None]:
#@title Функция обучения модели:
def fit(
        data_train: np.ndarray, data_valid: np.ndarray,
        epoch: int, shuffle: bool = False, step_per_epoch: int = None
) -> None:
    """"""
    # Проверка количества шагов:
    if not step_per_epoch:
        step_per_epoch = data_train.shape[0]

    # Цикл обучения:
    state = dict()
    for i in range(1, epoch + 1):
        # Метрики:
        general_loss_train = 0
        general_loss_valid = 0

        # Тасование батчей:
        if shuffle:
            idx = np.random.permutation(len(data_train))  # Случайный порядок индексов
            data_train = data_train[idx]

        # Обучающая часть:
        for step, (batch, target) in enumerate(data_train[:step_per_epoch], start=1):
            prediction, cache = call(batch, Y=target)

            loss, grad = SparseCrossEntropy(
                prediction,
                add_finish_token(target)
            )

            grad_parameters = backward(grad, cache)

            state = optimizer(parameters, grad_parameters, step, state)        # Оптимизация

            general_loss_train += float(loss)
            print('=', end='')

        # Валидационная часть:
        for batch, target in data_valid:
            prediction, cache = call(batch, Y=target)
            loss, grad = SparseCrossEntropy(prediction, add_finish_token(target))

            general_loss_valid += float(loss)

        # Вывод ошибки:
        general_loss_train /= step_per_epoch
        general_loss_valid /= data_valid.shape[0]

        print(f"\nЭпоха: {i}")
        print(f"Ошибка модели на обучении: {general_loss_train}")
        print(f"Ошибка модели на тесте: {general_loss_valid}")


In [None]:
# def sheduler(step):
#     """Вычисляет скорость на нужно шаге"""
#     lr = finish_rate + (start_rate - finish_rate) * (total_steps - step) / total_steps
#     return lr


In [None]:
def sheduler(step, warmup_steps=128):
    """Вычисляет скорость на нужно шаге"""
    if step <= warmup_steps:
        return start_rate * step / warmup_steps
    else:
        return (start_rate - finish_rate) * (total_steps - step) / total_steps
    return lr

In [None]:
#@title Оптимизатор Адам:
def optimizer(parameters: list, gradients: dict, step: int) -> None:
    """СГС"""
    gradients = [elem for values in gradients for elem in values.values()]
    for parameter, gradient in zip(parameters, gradients):
        global_g = sheduler(step)
        parameter -= global_g * gradient

def optimizer(
        parameters: list, gradients: list,
        step: int, state: dict,
        beta1=0.9, beta2=0.999, eps=1e-8
) -> dict:
    """Оптимизатор Адам"""

    if 'm' not in state:
        state['m'] = [np.zeros_like(p) for p in parameters]
        state['v'] = [np.zeros_like(p) for p in parameters]

    gradients = [elem for values in gradients for elem in values.values()]
    for i, (p, g) in enumerate(zip(parameters, gradients)):
        # Моменты:
        state['m'][i] = beta1 * state['m'][i] + (1 - beta1) * g
        state['v'][i] = beta2 * state['v'][i] + (1 - beta2) * (g * g)

        # Байес:
        m_hat = state['m'][i] / (1 - beta1 ** step)
        v_hat = state['v'][i] / (1 - beta2 ** step)

        # Обновление параметра:
        p -= sheduler(step) * m_hat / (np.sqrt(v_hat) + eps)

    return state


## **Обучение модели:**

In [None]:
#@title Создание данных:
gen = Generator(
    batch_size=batch,
    number_tokens=max_tokens,
    number_batch=256,
    test_size=.33
)

train, valid = gen.palindrome()   # symmetric_sum, palindrome

print("Train shape: ", train.shape)
print("Valid shape: ", valid.shape)

Train shape:  (171, 2, 64, 16)
Valid shape:  (85, 2, 64, 16)


In [None]:
#@title Прараметры обучения:
epoch = 10
steps_per_epoch = 64  # Количество подаваемых батчей за одну эпоху
start_rate = 1e-2
finish_rate = 1e-5
warmup_steps = 128    # Количество "шагов прогрева"
total_steps = epoch * steps_per_epoch - warmup_steps  # Общее количество шагов

In [None]:
#@title Обучение модели:
fit(
    data_train=train, data_valid=valid,
    shuffle=True, step_per_epoch=steps_per_epoch,
    epoch=epoch
)

## Тестирование модели:

In [None]:
#@title Проверка модели:
print("\nВывод: ")
request = np.array([[
    [1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8],
    [8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
    [0, 1, 0, 2, 0, 3, 0, 4, 0, 1, 0, 2, 0, 3, 0, 4],
    [1, 1, 2, 3, 1, 1, 4, 5, 1, 1, 2, 3, 1, 1, 4, 5],
    [0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1],
    [5, 5, 4, 4, 3, 3, 2, 2, 5, 5, 4, 4, 3, 3, 2, 2],
    [7, 7, 7, 7, 8, 8, 9, 9, 7, 7, 7, 7, 8, 8, 9, 9],
    [1, 5, 3, 7, 9, 2, 4, 6, 1, 5, 3, 7, 9, 2, 4, 6],
    [3, 9, 2, 6, 1, 2, 5, 6, 3, 9, 2, 6, 1, 2, 5, 6],
]])

prediction = predict(request)
for r, p in zip(request[0], prediction[0]):
    print(f"{r} -> {p}")


Вывод: 
[1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[0 1 0 2 0 3 0 4 0 1 0 2 0 3 0 4] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[1 1 2 3 1 1 4 5 1 1 2 3 1 1 4 5] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[0 0 0 0 1 1 1 1 0 0 0 0 1 1 1 1] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[5 5 4 4 3 3 2 2 5 5 4 4 3 3 2 2] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[7 7 7 7 8 8 9 9 7 7 7 7 8 8 9 9] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[1 5 3 7 9 2 4 6 1 5 3 7 9 2 4 6] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
[3 9 2 6 1 2 5 6 3 9 2 6 1 2 5 6] -> [10  9 11  9  8  8  8  8  8  8  8  8  9  9  3  9  2]
