# Byte-Pair encoding

In [31]:
from pathlib import Path
import torch
import sys
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader, random_split
from torch import nn, optim
import torch.nn.functional as F
from tqdm import tqdm
from Run.misspelling_percentage import calculate_misspelling_percentage


# Import custom dataset and model
from Dataset.DatasetText import DatasetText as Dataset
from Models.RNN import RNN
from Models import utils

In [32]:
# Add the path to the MInbpe repository
MINBPE_PATH = Path.cwd().parent.parent / 'MInbpe'
if str(MINBPE_PATH) not in sys.path:
    sys.path.append(str(MINBPE_PATH))

ROOT = Path.cwd().parent

# Add the root to the system path
if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))

# Testing BPE

In [33]:
import minbpe
from minbpe import RegexTokenizer

In [16]:
dataset_name = "harry_potter.txt"
mode = "character"
sequence_length = 100

# Load the dataset (without BPE)
folder_path = ROOT / "Data"  / dataset_name
dataset = Dataset(folder_path=folder_path, sequence_length=sequence_length, mode=mode)
print("Vocabulary size before BPE",dataset.vocab_size)
print("Initial vocabulary before BPE", dataset.uniq_words) 

Vocabulary size before BPE 101
Initial vocabulary before BPE [' ', 'e', 't', 'a', 'o', 'n', 'r', 'i', 'h', 's', 'd', 'l', 'u', 'g', 'y', 'w', 'm', 'c', 'f', '.', ',', 'p', '"', '\n', 'b', 'k', 'v', 'H', "'", 'I', '-', 'T', 'S', 'M', '?', 'W', 'D', 'R', 'A', 'P', '!', 'x', 'B', 'G', 'C', 'N', 'j', 'Y', 'L', 'F', 'z', 'O', 'E', ';', 'q', '\xad', 'V', 'U', 'K', ':', 'Q', 'J', '1', '*', ')', '(', '2', '3', '4', '0', 'X', '5', '9', '6', 'Z', '8', '7', '_', '`', '/', '=', '\x93', 'é', '\\', '%', '$', ']', '\x95', '\x96', '&', '¦', '~', '^', '«', 'ù', '}', '{', '[', '»', '>', '\x1f']


In [18]:
tokenizer = RegexTokenizer()
with open(folder_path, "r") as file:
    text = file.read()

tokenizer.train(text, vocab_size=400)


In [23]:
result = tokenizer.encode("hello world") # string -> tokens


In [25]:
print(result)

[104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]


In [None]:

tokenizer.decode([1000, 2000, 3000]) # tokens -> string
tokenizer.save("tok32k") # writes tok32k.model and tok32k.vocab
tokenizer.load("tok32k.model") # loads the model back from disk

In [None]:
tokenizer = RegexTokenizer()
tokenizer.train(, vocab_size=32768)
tokenizer.encode("hello world") # string -> tokens
tokenizer.decode([1000, 2000, 3000]) # tokens -> string
tokenizer.save("tok32k") # writes tok32k.model and tok32k.vocab
tokenizer.load("tok32k.model") # loads the model back from disk

# LSTM training (with BPE)

In [36]:
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter
from minbpe import RegexTokenizer
from tqdm import tqdm
import os

class DatasetTextBPE(torch.utils.data.Dataset):
    def __init__(self, folder_path, sequence_length, mode="word", use_bpe=False, vocab_size=300):
        self.sequence_length = sequence_length
        self.folder_path = folder_path
        self.mode = mode
        self.use_bpe = use_bpe

        # Load and process text data
        self.words = self.load_words()

        if self.use_bpe:
            self.tokenizer = RegexTokenizer()
            self.train_tokenizer(self.words, vocab_size)
            self.words_indexes = self.encode_words(self.words)
            self.vocab_size = len(self.tokenizer.vocab)
        else:
            self.uniq_words = self.get_uniq_words()
            self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
            self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}
            self.words_indexes = [self.word_to_index[w] for w in self.words]
            self.vocab_size = len(self.uniq_words)

    def load_words(self):
        with open(self.folder_path, "r") as file:
            text = file.read()
        if self.mode == "word":
            return text.split()
        elif self.mode == "character":
            return list(text)
        else:
            raise ValueError("wrong mode")

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def train_tokenizer(self, text, vocab_size):
        # Join the text into a single string if mode is word, else keep as is
        very_long_training_string = ' '.join(text) if self.mode == "word" else ''.join(text)
        self.tokenizer.train(very_long_training_string, vocab_size)
        self.tokenizer.save("tok32k")

    def encode_words(self, text):
        very_long_training_string = ' '.join(text) if self.mode == "word" else ''.join(text)
        return self.tokenizer.encode(very_long_training_string)

    def __len__(self):
        return len(self.words_indexes) // (self.sequence_length + 1)

    def __getitem__(self, index):
        start_index = index * self.sequence_length
        end_index = start_index + self.sequence_length

        input_indices = self.words_indexes[start_index:end_index]
        target_indices = self.words_indexes[start_index + 1 : end_index + 1]

        return torch.tensor(input_indices), torch.tensor(target_indices)

class LSTM_BPE(nn.Module):
    def __init__(
        self,
        vocab_size,
        hidden_dim=100,
        embedding_dim=None,
        num_layers=1,
        dropout=0.0,
    ):
        super(LSTM_BPE, self).__init__()

        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers

        if embedding_dim is not None:
            # Learned embedding
            self.embedding = nn.Embedding(
                num_embeddings=vocab_size,
                embedding_dim=self.embedding_dim,
            )
            input_size = self.embedding_dim
        else:
            # Assume input is already one-hot encoded
            input_size = vocab_size

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=self.hidden_dim,
            num_layers=self.num_layers,
            dropout=dropout,
            batch_first=True,
        )
        self.fc = nn.Linear(self.hidden_dim, vocab_size)

    def forward(self, x, states):
        embed = self.embedding(x) if self.embedding_dim is not None else x
        output, states = self.lstm(embed, states)
        logits = self.fc(output)
        return logits, states

    def init_state(self, batch_size):
        # init hidden and cell states
        return (
            torch.zeros(self.num_layers, batch_size, self.hidden_dim),
            torch.zeros(self.num_layers, batch_size, self.hidden_dim),
        )

    def generate(
        self,
        dataset,
        device,
        text,
        total_length=1000,
        temperature=1.0,
        mode="character",
        top_p=0.9,
        nucleus_sampling=False,
    ):
        self.eval()

        if mode == "word":
            words = text.split(" ")
        elif mode == "character":
            words = list(text)
        else:
            raise NotImplementedError

        state_h, state_c = self.init_state(1)
        state_h, state_c = state_h.to(device), state_c.to(device)

        for i in range(total_length):
            # Map input text to token indices using the dataset's token mappings
            if dataset.use_bpe:
                x = torch.tensor([[dataset.token_to_index[token] for token in dataset.tokenizer.encode(text[i:])]]).to(device)
            else:
                x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]]).to(device)

            if self.embedding_dim is None:
                x = F.one_hot(x, num_classes=self.vocab_size).float()

            y_pred, (state_h, state_c) = self(x, (state_h, state_c))

            last_word_logits = y_pred[0][-1] / temperature
            probs = torch.nn.functional.softmax(last_word_logits, dim=0)

            if nucleus_sampling:
                sorted_probs, sorted_indices = torch.sort(probs, descending=True)
                cumulative_probs = torch.cumsum(sorted_probs, dim=0)
                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_probs[sorted_indices_to_remove] = 0.0
                sorted_probs /= sorted_probs.sum()  # normalize
                word_index = torch.multinomial(sorted_probs, 1).item()
            else:
                word_index = torch.multinomial(probs, 1).item()

            if dataset.use_bpe:
                words.append(dataset.index_to_token[word_index])
            else:
                words.append(dataset.index_to_word[word_index])

        return words

# Step 3: Initialize the dataset
folder_path = os.path.join("..", "Data", "harry_potter.txt")  
sequence_length = 100
use_bpe = True
vocab_size = 256

dataset = DatasetTextBPE(folder_path=folder_path, sequence_length=sequence_length, use_bpe=use_bpe, vocab_size=vocab_size)

# Step 4: Create data loader
batch_size = 64
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Step 5: Initialize the model
hidden_dim = 1024
embedding_dim = None
num_layers = 1
dropout = 0.0
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LSTM_BPE(vocab_size=dataset.vocab_size, hidden_dim=hidden_dim, embedding_dim=embedding_dim, num_layers=num_layers, dropout=dropout).to(device)

# Step 6: Define the training loop
def train_model(model, dataloader, num_epochs, learning_rate):
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in tqdm(range(num_epochs)):
        total_loss = 0.0

        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            state_h, state_c = model.init_state(x.size(0))
            state_h, state_c = state_h.to(device), state_c.to(device)

            optimizer.zero_grad()
            y_pred, (state_h, state_c) = model(x, (state_h, state_c))
            loss = criterion(y_pred.permute(0, 2, 1), y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(dataloader)}")

# Step 7: Train the model
num_epochs = 10  # Set this to a higher number for better results
learning_rate = 0.001
train_model(model, train_dataloader, num_epochs, learning_rate)

# Step 8: Generate text using the trained model
init_text = "Harry"
generated_text = model.generate(dataset, device, text=init_text, total_length=100, temperature=1.0, mode="character", top_p=0.9, nucleus_sampling=True)
print("Generated text:", "".join(generated_text))


  0%|          | 0/10 [00:00<?, ?it/s]


RuntimeError: For unbatched 2-D input, hx and cx should also be 2-D but got (3-D, 3-D) tensors