<a href="https://colab.research.google.com/github/Kostia9/Data-Analysis-2025/blob/main/Lab4/Lab4_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Генерація тексту за допомогою моделі-трансформера (GPT-подібна архітектура)

In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import os
import math, numpy as np
import re, string
from tqdm import tqdm
from tokenizers import ByteLevelBPETokenizer
from datasets import load_dataset

# --- Гіперпараметри (налаштування) ---
batch_size = 64        # скільки незалежних послідовностей обробляємо паралельно
block_size = 256       # максимальна довжина контексту для передбачення
max_iters = 20000      # кількість кроків навчання
eval_interval = 1000   # як часто оцінювати втрати (loss)
learning_rate = 6e-4   # швидкість навчання
device = 'cuda' if torch.cuda.is_available() else 'cpu' # використовуємо GPU, якщо є
eval_iters = 100       # скільки батчів використовувати для оцінки втрат
n_embd = 768           # розмірність векторів (embedding dimension)
n_head = 12            # кількість голів (heads) у self-attention
n_layer = 12           # кількість блоків трансформера
dropout = 0            # регуляризація (for pretraining 0 is good)
vocab_size = 32768     # розмір словника: кількість можливих токенів
# -----------------------------------

torch.manual_seed(1337)

<torch._C.Generator at 0x7bbda0af5f90>

# Download dataset
Dataset contains fiction (novels, prose, and some poetry) scraped from two public libraries;

In [3]:
!wget https://lang.org.ua/static/downloads/ubertext2.0/fiction/cleansed/ubertext.fiction.filter_rus_gcld+short.text_only.txt.bz2
!bunzip2 -f ubertext.fiction.filter_rus_gcld+short.text_only.txt.bz2

CORPUS_FILE = "ubertext.fiction.filter_rus_gcld+short.text_only.txt"

--2025-12-03 14:16:14--  https://lang.org.ua/static/downloads/ubertext2.0/fiction/cleansed/ubertext.fiction.filter_rus_gcld+short.text_only.txt.bz2
Resolving lang.org.ua (lang.org.ua)... 65.21.91.242
Connecting to lang.org.ua (lang.org.ua)|65.21.91.242|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 433084760 (413M) [application/octet-stream]
Saving to: ‘ubertext.fiction.filter_rus_gcld+short.text_only.txt.bz2’


2025-12-03 14:16:46 (13.1 MB/s) - ‘ubertext.fiction.filter_rus_gcld+short.text_only.txt.bz2’ saved [433084760/433084760]



In [2]:
CORPUS_FILE = "ubertext.fiction.filter_rus_gcld+short.text_only.txt"
TOKENIZER_PATH = "uk_bpe" # Папка для токенізатора

# Train tokenizer

In [4]:
print("\n--- Тренування токенізатора ---")

if not os.path.exists(os.path.join(TOKENIZER_PATH, "vocab.json")):
    os.makedirs(TOKENIZER_PATH, exist_ok=True)

    tokenizer = ByteLevelBPETokenizer()

    print("Починаємо навчання BPE...")
    tokenizer.train(
        files=[CORPUS_FILE],
        vocab_size=vocab_size,
        min_frequency=5,
        special_tokens=["<|endoftext|>", "<|pad|>"]
    )

    vocab_size = tokenizer.get_vocab_size()
    tokenizer.save_model(TOKENIZER_PATH)
    print(f"Токенізатор збережено в {TOKENIZER_PATH}, vocab_size = {vocab_size}")
else:
    print("Токенізатор уже існує, пропускаємо навчання.")



--- Тренування токенізатора ---
Починаємо навчання BPE...
Токенізатор збережено в uk_bpe, vocab_size = 32768


In [5]:
# 3. Підготовка даних (Tokenization -> .bin)

train_bin_path = "train.bin"
val_bin_path = "val.bin"

print("\n--- Підготовка даних (Tokenization & Binaries) ---")

if not (os.path.exists(train_bin_path) and os.path.exists(val_bin_path)):

    # Завантажуємо токенізатор
    tokenizer = ByteLevelBPETokenizer(
        os.path.join(TOKENIZER_PATH, "vocab.json"),
        os.path.join(TOKENIZER_PATH, "merges.txt"),
    )
    tokenizer.add_special_tokens(["<|endoftext|>", "<|pad|>"])

    print("Читання файлу та токенізація корпусу...")

    # Використовуємо uint16, бо vocab_size <= 65535
    dtype = np.uint16
    all_tokens = []

    # Читаємо chunk-ами, щоб не з'їсти всю RAM
    with open(CORPUS_FILE, "r", encoding="utf-8") as f:
        while True:
            chunk = f.read(1_000_000)  # 1M символів за раз
            if not chunk:
                break
            ids = tokenizer.encode(chunk).ids
            all_tokens.extend(ids)

    print(f"Всього токенів у корпусі: {len(all_tokens):,}")

    arr = np.array(all_tokens, dtype=dtype)

    # train / val split 95/5
    n = int(0.95 * len(arr))
    train_data = arr[:n]
    val_data = arr[n:]

    print(f"Train tokens: {len(train_data):,}")
    print(f"Val tokens:   {len(val_data):,}")

    print(f"Збереження {train_bin_path}...")
    train_data.tofile(train_bin_path)

    print(f"Збереження {val_bin_path}...")
    val_data.tofile(val_bin_path)

    del arr, train_data, val_data
else:
    print("train.bin і val.bin вже існують, пропускаємо токенізацію.")


--- Підготовка даних (Tokenization & Binaries) ---
Читання файлу та токенізація корпусу...
Всього токенів у корпусі: 342,858,420
Train tokens: 325,715,499
Val tokens:   17,142,921
Збереження train.bin...
Збереження val.bin...


In [7]:
# --- 5. Завантаження даних ---

# Завантажуємо токенізатор
tokenizer = ByteLevelBPETokenizer(
    os.path.join(TOKENIZER_PATH, "vocab.json"),
    os.path.join(TOKENIZER_PATH, "merges.txt"),
)
tokenizer.add_special_tokens(["<|endoftext|>", "<|pad|>"])

train_bin_path = "train.bin"
val_bin_path = "val.bin"

# np.memmap дозволяє працювати з файлом на диску як з масивом у пам'яті
train_data = np.memmap(train_bin_path, dtype=np.uint16, mode='r')
val_data = np.memmap(val_bin_path, dtype=np.uint16, mode='r')

print(f"Memmap Train size: {len(train_data):,} tokens")
print(f"Memmap Val size: {len(val_data):,} tokens")

Memmap Train size: 325,715,499 tokens
Memmap Val size: 17,142,921 tokens


In [None]:
def get_batch(split):
    data = train_data if split == 'train' else val_data

    # Генеруємо випадкові індекси
    ix = torch.randint(len(data) - block_size, (batch_size,))

    x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])
    y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])

    # Переміщення на GPU
    if device == 'cuda':
        # pin arrays x,y, which allows us to move them to GPU asynchronously (non_blocking=True)
        x = x.pin_memory().to(device, non_blocking=True)
        y = y.pin_memory().to(device, non_blocking=True)
    else:
        x, y = x.to(device), y.to(device)
    return x, y

# Model

In [6]:
class FeedFoward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.GELU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class CausalSelfAttention(nn.Module):
    def __init__(self, n_embd, n_head, block_size, dropout):
        super().__init__()
        assert n_embd % n_head == 0

        # key, query, value projections for all heads, but in a batch
        self.c_attn = nn.Linear(n_embd, 3 * n_embd, bias=False)
        # output projection
        self.c_proj = nn.Linear(n_embd, n_embd)

        # regularization
        self.attn_dropout = nn.Dropout(dropout)
        self.resid_dropout = nn.Dropout(dropout)

        self.n_head = n_head
        self.n_embd = n_embd
        self.dropout = dropout

    def forward(self, x):
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k, v  = self.c_attn(x).split(self.n_embd, dim=2)

        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        # flash attention
        y = torch.nn.functional.scaled_dot_product_attention(
            q, k, v,
            attn_mask=None,
            dropout_p=self.dropout if self.training else 0,
            is_causal=True
        )

        # re-assemble all head outputs side by side
        y = y.transpose(1, 2).contiguous().view(B, T, C)

        # output projection with dropout
        y = self.resid_dropout(self.c_proj(y))
        return y

class Block(nn.Module):
    def __init__(self, n_embd, n_head, block_size, dropout):
        super().__init__()
        self.ln1 = nn.LayerNorm(n_embd)
        self.sa = CausalSelfAttention(n_embd, n_head, block_size, dropout)
        self.ln2 = nn.LayerNorm(n_embd)
        self.ffwd = FeedFoward(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class LanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        # embedding table for each token
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        # embedding table for positions
        self.position_embedding_table = nn.Embedding(block_size, n_embd)

        # блоки трансформера
        self.blocks = nn.Sequential(*[
            Block(n_embd, n_head=n_head, block_size=block_size, dropout=dropout)
            for _ in range(n_layer)
        ])

        self.ln_f = nn.LayerNorm(n_embd) # final normalization
        self.lm_head = nn.Linear(n_embd, vocab_size) # output layer

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets have shape (B, T)
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,C)

        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx це (B, T) масив індексів поточного контексту
        for _ in range(max_new_tokens):
            # if the sequence context is growing too long we must crop it at block_size
            idx_cond = idx[:, -block_size:]
            # forward the model to get the logits for the index in the sequence
            logits, _ = self(idx_cond)
            # pluck the logits at the final step
            logits = logits[:, -1, :] # (B, C)
            # apply softmax to convert logits to (normalized) probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence and continue
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

# Train

In [7]:
model = LanguageModel().to(device)

num_params = sum(p.numel() for p in model.parameters())
print(f"Кількість параметрів моделі: {num_params/1e6:.2f}M")

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, betas=(0.9, 0.99))

Кількість параметрів: 135.59M


In [8]:
# --- Додаткові налаштування ---
# Вибір типу даних для змішаної точності (економить пам'ять GPU)
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = torch.amp.autocast(device_type=device, dtype=ptdtype)

# Ініціалізація GradScaler для коректної роботи з float16
scaler = torch.amp.GradScaler('cuda', enabled=(dtype == 'float16'))

# Параметри планувальника швидкості навчання
warmup_iters = 1000 # Кількість кроків для "розігріву"
min_lr = learning_rate / 10 # Мінімальний LR в кінці навчання
grad_clip = 1.0 # Поріг для обрізання градієнтів

# Функція динамічної зміни learning rate (Cosine Decay)
def get_lr(it):
    # Етап лінійного розігріву
    if it < warmup_iters:
        return learning_rate * (it + 1) / (warmup_iters + 1)

    # Плавний спад за косинусоїдою
    decay_ratio = (it - warmup_iters) / (max_iters - warmup_iters)
    assert 0 <= decay_ratio <= 1
    coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
    return min_lr + coeff * (learning_rate - min_lr)

In [9]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)

        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()

        out[split] = losses.mean()
    model.train()
    return out

In [10]:
# Налаштування точності для TF32 (прискорює навчання)
torch.backends.cuda.matmul.allow_tf32 = True

print("Compiling model...")
model = torch.compile(model)

best_val_loss = 1e6

print(f"Починаємо навчання на пристрої: {device}")

for iter in tqdm(range(max_iters), desc="Навчання"):

    # 1. Оновлення швидкості навчання для поточної ітерації
    lr = get_lr(iter)
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

    # 2. Періодична оцінка моделі
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"\nstep {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}, lr {lr:.4e}")

        # Збереження найкращого чекпоінту
        if losses['val'] < best_val_loss:
            best_val_loss = losses['val']
            torch.save(model.state_dict(), 'best_model.pt')

    # 3. Отримання наступного батча
    xb, yb = get_batch('train')

    # 4. Forward pass з використанням змішаної точності
    with ctx:
        logits, loss = model(xb, yb)

    # 5. Backward pass з масштабуванням градієнтів
    scaler.scale(loss).backward()

    # 6. Обрізання градієнтів для запобігання нестабільності
    scaler.unscale_(optimizer)
    torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

    # 7. Оновлення ваг та скейлера
    scaler.step(optimizer)
    scaler.update()

    # 8. Очищення градієнтів
    optimizer.zero_grad(set_to_none=True)

# Завантаження найкращої версії моделі після завершення
model.load_state_dict(torch.load('best_model.pt', map_location=device))
model.eval()
print("Done!")

Compiling model...
Починаємо навчання на пристрої: cuda


  return torch._C._get_cublas_allow_tf32()
W1203 12:46:18.740000 13046 torch/_inductor/utils.py:1558] [0/0] Not enough SMs to use max_autotune_gemm mode



step 0: train loss 10.5603, val loss 10.5615, lr 5.9940e-07


Навчання:  10%|█         | 1000/10000 [07:57<59:58,  2.50it/s]


step 1000: train loss 5.7672, val loss 5.8521, lr 6.0000e-04


Навчання:  20%|██        | 2000/10000 [15:21<53:19,  2.50it/s]


step 2000: train loss 5.0241, val loss 5.1333, lr 5.8372e-04


Навчання:  30%|███       | 3000/10000 [22:45<46:36,  2.50it/s]


step 3000: train loss 4.7173, val loss 4.8402, lr 5.3683e-04


Навчання:  40%|████      | 4000/10000 [30:09<40:00,  2.50it/s]


step 4000: train loss 4.5337, val loss 4.6690, lr 4.6500e-04


Навчання:  50%|█████     | 5000/10000 [37:33<33:20,  2.50it/s]


step 5000: train loss 4.3850, val loss 4.5299, lr 3.7689e-04


Навчання:  60%|██████    | 6000/10000 [44:57<26:39,  2.50it/s]


step 6000: train loss 4.2607, val loss 4.4285, lr 2.8311e-04


Навчання:  70%|███████   | 7000/10000 [52:21<19:59,  2.50it/s]


step 7000: train loss 4.1763, val loss 4.3520, lr 1.9500e-04


Навчання:  80%|████████  | 8000/10000 [59:45<13:19,  2.50it/s]


step 8000: train loss 4.1106, val loss 4.2918, lr 1.2317e-04


Навчання:  90%|█████████ | 9000/10000 [1:07:09<06:39,  2.50it/s]


step 9000: train loss 4.0607, val loss 4.2596, lr 7.6283e-05


Навчання: 100%|█████████▉| 9999/10000 [1:14:33<00:00,  2.50it/s]


step 9999: train loss 4.0413, val loss 4.2291, lr 6.0000e-05


Навчання: 100%|██████████| 10000/10000 [1:15:18<00:00,  2.21it/s]


Done!


In [11]:
# # --- Генерація ---
print("\n--- Генерація тексту ---\n")

start_text = """Це основа всього"""
start_ids = tokenizer.encode(start_text).ids
context = torch.tensor([start_ids], dtype=torch.long, device=device)

generated_ids = model.generate(context, max_new_tokens=200)[0].tolist()

print(tokenizer.decode(generated_ids))


--- Генерація тексту ---

Це основа всього
життя. Димиємо й сутнім, ми готові заглушити один про одного. Ти будеш
порядний,— Там вирваними нас від ціле життя.
— Прошу не чергувати! Я прошу вас! Я вас розведу на каменях,— вклонився
Блеймасом і показав такому чоловікові, що зразу ж спинився.
Блеймотом ми опинилися перед Марсельовою приготованою хатою і попрямували до
нього.
Тарас Іванович поволі, жадно й весело кружляючи по землі, виступав біля
американців. Думки його поволі зближувалися з бастіонами божевільного
Степа. Перед ними заволокла веселенька русалка, Маша підбігла до дівчини-відзиву,
 гукнула:
— Прощайте!
— Прощай.
— Прощайте, прощайте.— І ми вийшли за місто. Я поцілувала його в руки,
поцілувала, поцілувала, сказала:
— Велике діло. — Він терпеливо Пальмачку


# References
* https://github.com/karpathy/nanoGPT