In [230]:
import re
import os
import random
from tokenizers import ByteLevelBPETokenizer
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

In [231]:
path_to_text = "/content/drive/MyDrive/bulgakov.txt"
path_to_preprocess_text = "/content/drive/MyDrive/preprocess_bulgakov.txt"

In [232]:
"""
Preprocesses text by splitting sentences and saving with end-of-sentence markers.
"""
class TextPreprocessor():
  def __init__(self, input_path, output_path):
    self.input_path = input_path
    self.output_path = output_path

  def _load(self, num_chars=None):
      with open(self.input_path, 'r', encoding='utf-8') as f:
          text = f.read() if num_chars is None else f.read(num_chars)
      return text

  def _split(self, text):
    sentences = re.split(r'(?<=[\.\!\?])\s+', text)
    return [s.strip() for s in sentences if s.strip()]

  def _write(self, sentences):
    with open(self.output_path, 'w', encoding='utf-8') as f:
        for s in sentences:
            f.write(s + ' ' + "<EOS>" + '\n')

  def preprocess(self):
    text = self._load()
    sentences = self._split(text)
    self._write(sentences)
    return sentences

In [233]:
"""
Tokenizes text using Byte-level BPE (Byte Pair Encoding) algorithm.
"""
class Tokenizer():
  def __init__(self, input_file, text):
    self.input_file = input_file
    self.text = text
    self.tokenizer = self._train()
    self.tokens = self._generate_tokens()
    self.vocab_size = self.tokenizer.get_vocab_size()

  def get_vocab_size(self):
    return self.vocab_size

  def get_tokens(self):
    return self.tokens

  def encode(self, text):
    return self.tokenizer.encode(text)

  def decode(self, ids):
    return self.tokenizer.decode(ids)

  def _train(self, vocab_size=15000, min_freq=4):
    tokenizer = ByteLevelBPETokenizer()
    tokenizer.train(files=[self.input_file], vocab_size=vocab_size,
                    min_frequency=min_freq, special_tokens=["<EOS>"])
    return tokenizer

  def _generate_tokens(self):
    return self.tokenizer.encode('\n'.join(s + ' <EOS>' for s in self.text)).ids

  class NextTokenDataset(Dataset):
        """
        Dataset for the task of predicting the next token:
        given a prefix of length seq_length, predicts the token immediately after it.
        """
        def __init__(self, token_ids, seq_length):
            self.seq_length = seq_length
            self.data = []
            for i in range(len(token_ids) - seq_length):
                x = token_ids[i:i+seq_length]
                y = token_ids[i+1:i+seq_length+1]
                self.data.append((x, y))

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            x, y = self.data[idx]
            return (
                torch.tensor(x, dtype=torch.long),
                torch.tensor(y, dtype=torch.long)
            )

  def _make_dataset(self, seq_length):
        return self.NextTokenDataset(self.tokens, seq_length)


In [235]:
class DummyLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim=32, hidden_dim=128, num_layers=2):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden=None):
        emb = self.embedding(x)
        out, hidden = self.lstm(emb, hidden)
        logits = self.fc(out)
        return logits, hidden

In [236]:
def train_model(model, dataloader, epochs=10, lr=1e-3, device='cuda'):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(1, epochs+1):
        total_loss = 0.0
        for x_batch, y_batch in dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            logits, _ = model(x_batch)
            batch_size, seq_len, vocab_size = logits.size()
            loss = criterion(
                logits.view(batch_size*seq_len, vocab_size),
                y_batch.view(batch_size*seq_len)
            )
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch}/{epochs} — Loss: {avg_loss:.4f}')

In [237]:
def generate_lstm(model, tokenizer, prefix, max_len=50, device='cuda', strategy='greedy'):
    model.to(device)
    model.eval()
    ids = tokenizer.encode(prefix).ids
    input_ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0).to(device)
    hidden = None
    generated = ids.copy()

    for _ in range(max_len):
        logits, hidden = model(input_ids, hidden)
        next_logits = logits[0, -1]  # (vocab_size)
        if strategy == 'greedy':
            next_id = torch.argmax(next_logits).item()
        else:
            probs = torch.softmax(next_logits, dim=0).cpu().detach().numpy()
            next_id = random.choices(range(len(probs)), weights=probs)[0]
        generated.append(next_id)
        input_ids = torch.tensor([[next_id]], dtype=torch.long).to(device)

    return tokenizer.decode(generated)

In [238]:
class MarkovChain:
    def __init__(self):
        self.transitions = {}

    def train(self, token_ids):
        for a, b, c, d in zip(
            token_ids,
            token_ids[1:],
            token_ids[2:],
            token_ids[3:]
        ):
            key = (a, b, c)
            if key not in self.transitions:
                self.transitions[key] = {}
            self.transitions[key][d] = self.transitions[key].get(d, 0) + 1

    def next_token(self, context, strategy='greedy'):

        if context not in self.transitions:
          context = random.choice(list(self.transitions.keys()))

        next_counts = self.transitions[context]
        if strategy == 'greedy':
            return max(next_counts, key=next_counts.get)
        else:
            tokens, counts = zip(*next_counts.items())
            return random.choices(tokens, weights=counts, k=1)[0]

    def generate(self, tokenizer, prefix, max_len=50, strategy='greedy'):
        ids = tokenizer.encode(prefix).ids
        generated = ids.copy()

        if len(ids) >= 3:
            context = tuple(ids[-3:])
        else:

            context = tuple([ids[0]] * (3 - len(ids)) + ids)

        for _ in range(max_len):
            nxt = self.next_token(context, strategy)
            generated.append(nxt)

            context = (context[1], context[2], nxt)

        return tokenizer.decode(generated)


In [239]:
text_preprocessor = TextPreprocessor(path_to_text, path_to_preprocess_text)
text_preprocessed = text_preprocessor.preprocess()

In [240]:
tokenizer = Tokenizer(path_to_preprocess_text, text_preprocessed)

In [241]:
dataset = tokenizer._make_dataset(seq_length=128)

In [242]:
dataloader = DataLoader(dataset, batch_size=128, shuffle=True)

In [243]:
vocab_size = tokenizer.get_vocab_size()

In [244]:
model = DummyLSTM(vocab_size)

In [245]:
train_model(model, dataloader, epochs=15, lr=1e-3)

Epoch 1/15 — Loss: 6.0597
Epoch 2/15 — Loss: 4.9713
Epoch 3/15 — Loss: 4.4093
Epoch 4/15 — Loss: 3.9679
Epoch 5/15 — Loss: 3.5659
Epoch 6/15 — Loss: 3.1922
Epoch 7/15 — Loss: 2.8484
Epoch 8/15 — Loss: 2.5330
Epoch 9/15 — Loss: 2.2434
Epoch 10/15 — Loss: 1.9790
Epoch 11/15 — Loss: 1.7340
Epoch 12/15 — Loss: 1.5074
Epoch 13/15 — Loss: 1.3005
Epoch 14/15 — Loss: 1.1145
Epoch 15/15 — Loss: 0.9487


In [246]:
prefix = "Никогда и ничего не просите!"

In [251]:
print("LSTM Greedy:   ", generate_lstm(model, tokenizer, prefix, strategy='greedy'))
print("LSTM Sampling:", generate_lstm(model, tokenizer, prefix, strategy='sampling'))

LSTM Greedy:    Никогда и ничего не просите! 
– отозвался Стравинский. 
– Я – специалить, – подтвердил ученый подумение. 
– Ну, так, – сказал Пилат, – а обыкновенное желание жить
LSTM Sampling: Никогда и ничего не просите! 
– воскликнул иностранец. 
– Длеа-поточная: «Аннушка» приго арестант и, видя, как бы нужно вчера того и, повернувшись к рукой, добавил: – Преступник! 


In [252]:
markov_chain = MarkovChain()

In [253]:
tokens = tokenizer.get_tokens()

In [254]:
markov_chain.train(tokens)

In [270]:
print("Markov Greedy:", markov_chain.generate(tokenizer, prefix, strategy='greedy'))
print("Markov Sampling:", markov_chain.generate(tokenizer, prefix, strategy='sampling'))

Markov Greedy: Никогда и ничего не просите! не бр это вы, какойб последний, швейцар ру егорад,  лит, перес дверь, пятела часа Вар лев в я-ный,,ательно выдинул противой Михаил,того Бездомный, чтоих,,», Иванович в
Markov Sampling: Никогда и ничего не просите!икенал выходит вас будтонулоуп, Иванович июался Ивану вдками лист я
,ней стоитно, дост тра окрош плщжм у Никанорисан в председателюонь он переводчик колонны женщина см.налсяюм чтодом но
