In [1]:
# model

import torch
import torch.nn as nn
import math


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=256):
        super().__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)

        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer("pe", pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


class DecoderOnlyTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=128, nhead=4, num_layers=4, max_len=256):
        super().__init__()

        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=4 * d_model,
            dropout=0.0,
            batch_first=True
        )

        self.transformer = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers
        )

        self.fc_out = nn.Linear(d_model, vocab_size)

    def generate_square_subsequent_mask(self, size, device):
        mask = torch.triu(torch.ones(size, size, device=device), diagonal=1)
        mask = mask.masked_fill(mask == 1, float("-inf"))
        return mask

    def forward(self, x):
        """
        x: (B, T)
        returns: (B, T, vocab_size)
        """
        B, T = x.size()
        mask = self.generate_square_subsequent_mask(T, x.device)
        x = self.token_embedding(x)
        x = self.pos_encoding(x)

        x = self.transformer(x, mask=mask)

        logits = self.fc_out(x)            # (B, T, vocab_size)
        return logits

In [3]:
# overfit

import torch
import torch.nn as nn
import numpy as np

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

BATCH_SIZE = 40   # full batch for overfit
LR = 3e-4
EPOCHS = 2000

train_data = np.load("train.npy")

train_tensor = torch.tensor(train_data[:40], dtype=torch.long)

val_tensor = train_tensor.clone()

vocab_size = int(train_tensor.max()) + 1

model = DecoderOnlyTransformer(vocab_size).to(DEVICE)

optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()


def run_epoch(data, train=True):
    if train:
        model.train()
    else:
        model.eval()

    total_loss = 0

    if train:
        perm = torch.randperm(len(data))
        data = data[perm]

    for i in range(0, len(data), BATCH_SIZE):
        batch = data[i:i+BATCH_SIZE].to(DEVICE)

        x = batch[:, :-1]   # input
        y = batch[:, 1:]    # target

        logits = model(x)

        loss = criterion(
            logits.reshape(-1, vocab_size),
            y.reshape(-1)
        )

        if train:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        total_loss += loss.item()

    return total_loss / (len(data) / BATCH_SIZE)


for epoch in range(EPOCHS):
    train_loss = run_epoch(train_tensor, True)
    val_loss = run_epoch(val_tensor, False)

    train_ppl = torch.exp(torch.tensor(train_loss))
    val_ppl = torch.exp(torch.tensor(val_loss))

    print(f"Epoch {epoch}")
    print(f"Train Loss: {train_loss:.4f} | PPL: {train_ppl:.3f}")
    print(f"Val Loss: {val_loss:.4f} | PPL: {val_ppl:.3f}")

[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
Train Loss: 1.2293 | PPL: 3.419
Val Loss: 1.2314 | PPL: 3.426
Epoch 334
Train Loss: 1.2314 | PPL: 3.426
Val Loss: 1.2354 | PPL: 3.440
Epoch 335
Train Loss: 1.2354 | PPL: 3.440
Val Loss: 1.2261 | PPL: 3.408
Epoch 336
Train Loss: 1.2261 | PPL: 3.408
Val Loss: 1.2189 | PPL: 3.383
Epoch 337
Train Loss: 1.2189 | PPL: 3.383
Val Loss: 1.2074 | PPL: 3.345
Epoch 338
Train Loss: 1.2074 | PPL: 3.345
Val Loss: 1.2252 | PPL: 3.405
Epoch 339
Train Loss: 1.2252 | PPL: 3.405
Val Loss: 1.2274 | PPL: 3.413
Epoch 340
Train Loss: 1.2274 | PPL: 3.413
Val Loss: 1.2542 | PPL: 3.505
Epoch 341
Train Loss: 1.2542 | PPL: 3.505
Val Loss: 1.2401 | PPL: 3.456
Epoch 342
Train Loss: 1.2401 | PPL: 3.456
Val Loss: 1.2225 | PPL: 3.396
Epoch 343
Train Loss: 1.2225 | PPL: 3.396
Val Loss: 1.1856 | PPL: 3.273
Epoch 344
Train Loss: 1.1856 | PPL: 3.273
Val Loss: 1.1739 | PPL: 3.234
Epoch 345
Train Loss: 1.1739 | PPL: 3.234
Val Loss: 1.1875 | PPL

In [6]:
# train

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pickle

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

BATCH_SIZE = 64
SEQ_LENGTH = 256
LR = 3e-4
EPOCHS = 50  # increase to get better model
PRINT_EVERY = 1

train_data = np.load("train.npy")
val_data = np.load("val.npy")

with open("char_to_idx.pkl", "rb") as f:
    char_to_idx = pickle.load(f)
with open("idx_to_char.pkl", "rb") as f:
    idx_to_char = pickle.load(f)

vocab_size = len(char_to_idx)

train_tensor = torch.tensor(train_data, dtype=torch.long)
val_tensor = torch.tensor(val_data, dtype=torch.long)

model = DecoderOnlyTransformer(vocab_size).to(DEVICE)
optimizer = optim.AdamW(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()


def run_epoch(data, train=True):
    if train:
        model.train()
    else:
        model.eval()

    total_loss = 0.0
    perm = torch.randperm(len(data)) if train else torch.arange(len(data))
    data = data[perm]

    for i in range(0, len(data), BATCH_SIZE):
        batch = data[i:i + BATCH_SIZE].to(DEVICE)
        x = batch[:, :-1]
        y = batch[:, 1:]

        logits = model(x)
        loss = criterion(logits.reshape(-1, vocab_size), y.reshape(-1))

        if train:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        total_loss += loss.item()

    return total_loss / (len(data) / BATCH_SIZE)


for epoch in range(1, EPOCHS + 1):
    train_loss = run_epoch(train_tensor, train=True)
    val_loss = run_epoch(val_tensor, train=False)

    train_ppl = torch.exp(torch.tensor(train_loss))
    val_ppl = torch.exp(torch.tensor(val_loss))

    if epoch % PRINT_EVERY == 0:
        print(f"Epoch {epoch:02d}: Train Loss {train_loss:.4f}, PPL {train_ppl:.3f} | Val Loss {val_loss:.4f}, PPL {val_ppl:.3f}")

torch.save(model.state_dict(), "greedy_model.pth")

Epoch 01: Train Loss 3.0386, PPL 20.876 | Val Loss 2.8446, PPL 17.194
Epoch 02: Train Loss 2.5846, PPL 13.259 | Val Loss 2.7243, PPL 15.246
Epoch 03: Train Loss 2.5071, PPL 12.270 | Val Loss 2.6706, PPL 14.449
Epoch 04: Train Loss 2.4560, PPL 11.658 | Val Loss 2.6165, PPL 13.688
Epoch 05: Train Loss 2.3985, PPL 11.007 | Val Loss 2.5574, PPL 12.902
Epoch 06: Train Loss 2.3276, PPL 10.253 | Val Loss 2.4884, PPL 12.041
Epoch 07: Train Loss 2.2659, PPL 9.639 | Val Loss 2.4396, PPL 11.469
Epoch 08: Train Loss 2.2199, PPL 9.207 | Val Loss 2.3938, PPL 10.955
Epoch 09: Train Loss 2.1797, PPL 8.844 | Val Loss 2.3537, PPL 10.524
Epoch 10: Train Loss 2.1384, PPL 8.486 | Val Loss 2.3101, PPL 10.075
Epoch 11: Train Loss 2.0919, PPL 8.100 | Val Loss 2.2591, PPL 9.575
Epoch 12: Train Loss 2.0461, PPL 7.738 | Val Loss 2.2216, PPL 9.222
Epoch 13: Train Loss 2.0047, PPL 7.424 | Val Loss 2.1827, PPL 8.870
Epoch 14: Train Loss 1.9653, PPL 7.137 | Val Loss 2.1536, PPL 8.616
Epoch 15: Train Loss 1.9323, PPL

In [None]:
# greedy_sampling

import torch
import pickle
import numpy as np

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


def load_vocab():
    with open("char_to_idx.pkl", "rb") as f:
        char_to_idx = pickle.load(f)
    with open("idx_to_char.pkl", "rb") as f:
        idx_to_char = pickle.load(f)
    return char_to_idx, idx_to_char


def greedy_decode(model, start_text, char_to_idx, idx_to_char, max_length=500):
    """
    Generate text using greedy decoding.

    Args:
        model: trained DecoderOnlyTransformer
        start_text: string prompt
        char_to_idx: dict mapping char -> index
        idx_to_char: dict mapping index -> char
        max_length: total length of generated sequence

    Returns:
        generated string
    """
    model.eval()
    generated = [char_to_idx[ch] for ch in start_text]
    input_ids = torch.tensor([generated], dtype=torch.long, device=DEVICE)

    for _ in range(max_length):
        with torch.no_grad():
            logits = model(input_ids)
            next_token_logits = logits[0, -1]
            next_token = torch.argmax(next_token_logits).item()

        generated.append(next_token)
        input_ids = torch.tensor([generated[-256:]], dtype=torch.long, device=DEVICE)  # keep context window

    return "".join([idx_to_char[i] for i in generated])


if __name__ == "__main__":
    char_to_idx, idx_to_char = load_vocab()

    # Example: load trained model
    vocab_size = len(char_to_idx)
    model = DecoderOnlyTransformer(vocab_size).to(DEVICE)
    model.load_state_dict(torch.load("greedy_model.pth", map_location=DEVICE))

    prompt = "First Citizen: Before we proceed any further, hear me speak."
    generated_text = greedy_decode(model, prompt, char_to_idx, idx_to_char, max_length=500)

    print("=== Generated Text ===")
    print(generated_text)

=== Generated Text ===
First Citizen: Before we proceed any further, hear me speak.

CAPULET:
I have you shall the state the state thee thee
That the state the so the state of the state,
That the state the sorrow the shall be so thee
That the stand the shall be so the seast of thee
That I shame the warrs of the warrs.

ISABELLA:
What I was not the send the send of the warrs
To the warr of the warrs of the warr
That I shalll the way I shame a the warr
That I shall the warrs of the warr
That I shalll the way I shame a the warr
That I shall the warrs of the warr
That I shalll the


In [10]:
# evaluate

import torch
import torch.nn as nn
import numpy as np
import pickle
import re

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

with open("char_to_idx.pkl", "rb") as f:
    char_to_idx = pickle.load(f)
with open("idx_to_char.pkl", "rb") as f:
    idx_to_char = pickle.load(f)

vocab_size = len(char_to_idx)

model = DecoderOnlyTransformer(vocab_size).to(DEVICE)
model.load_state_dict(torch.load("greedy_model.pth", map_location=DEVICE))
model.eval()

val_data = np.load("val.npy")
val_tensor = torch.tensor(val_data, dtype=torch.long)

def compute_perplexity(model, data):
    criterion = nn.CrossEntropyLoss()
    total_loss = 0.0
    for batch in data:
        x = batch[:-1].unsqueeze(0).to(DEVICE)
        y = batch[1:].unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            logits = model(x)
            loss = criterion(logits.reshape(-1, vocab_size), y.reshape(-1))
        total_loss += loss.item()
    avg_loss = total_loss / len(data)
    ppl = torch.exp(torch.tensor(avg_loss))
    return avg_loss, ppl.item()


def compute_cer(pred_text, target_text):
    """Character Error Rate (CER)"""
    import editdistance
    return editdistance.eval(pred_text, target_text) / max(len(target_text), 1)


def compute_ttr(text):
    """Type-Token Ratio (TTR)"""
    tokens = list(text)
    types = set(tokens)
    return len(types) / len(tokens) if len(tokens) > 0 else 0.0


def shakespeare_line_score(text):
    """
    Rough Shakespearean line structure score.
    Checks percentage of lines matching ~iambic pentameter
    For simplicity: 10 syllable lines (approx. 10 words)
    """
    lines = text.split("\n")
    valid_lines = 0
    for line in lines:
        words = line.strip().split()
        if 5 <= len(words) <= 12:
            valid_lines += 1
    return valid_lines / max(len(lines), 1) * 100


# --- Evaluate validation set perplexity ---
val_loss, val_ppl = compute_perplexity(model, val_tensor)
print(f"Validation Loss: {val_loss:.4f}, Perplexity: {val_ppl:.3f}")

# --- Generate qualitative text ---
prompt = "First Citizen: Before we proceed any further, hear me speak."
generated_text = greedy_decode(model, prompt, char_to_idx, idx_to_char, max_length=500)
print("\n=== Generated Text ===")
print(generated_text)

# --- Compute TTR and Shakespearean line score on generated text ---
ttr = compute_ttr(generated_text)
line_score = shakespeare_line_score(generated_text)
print(f"\nType-Token Ratio (TTR): {ttr:.4f}")
print(f"Shakespearean Line Structure Score: {line_score:.2f}%")

# --- CER with validation target ---
# Join all validation sequences into one string
val_text = "".join([idx_to_char[int(i)] for seq in val_tensor for i in seq])
cer = compute_cer(generated_text[:len(val_text)], val_text[:len(generated_text)])
print(f"Character Error Rate (CER) vs. validation: {cer:.4f}")

Validation Loss: 1.6502, Perplexity: 5.208

=== Generated Text ===
First Citizen: Before we proceed any further, hear me speak.

CAPULET:
I have you shall the state the state thee thee
That the state the so the state of the state,
That the state the sorrow the shall be so thee
That the stand the shall be so the seast of thee
That I shame the warrs of the warrs.

ISABELLA:
What I was not the send the send of the warrs
To the warr of the warrs of the warr
That I shalll the way I shame a the warr
That I shall the warrs of the warr
That I shalll the way I shame a the warr
That I shall the warrs of the warr
That I shalll the

Type-Token Ratio (TTR): 0.0696
Shakespearean Line Structure Score: 70.59%
Character Error Rate (CER) vs. validation: 0.8000
