In [1]:
# This is the sample implementation of BPE in tiktoken (https://github.com/openai/tiktoken/blob/main/tiktoken/_educational.py)
# It is modified to work with our code.


"""This is an educational implementation of the byte pair encoding algorithm."""
import collections
import regex

gpt2_regex = (r"""'s|'t|'re|'ve|'m|'ll|'d| ?[\p{L}]+| ?[\p{N}]+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""" )
gpt4_regex = (r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+""")

class SimpleBytePairEncoding:
    def __init__(self, *, mergeable_ranks: dict[bytes, int]) -> None:
        """Creates an Encoding object."""
        # A regex pattern string that is used to split the input text
        self.pat_str = gpt4_regex
        # A dictionary mapping token bytes to their ranks. The ranks correspond to merge priority
        self.mergeable_ranks = mergeable_ranks

        self._decoder = {token: token_bytes for token_bytes, token in mergeable_ranks.items()}
        self._pat = regex.compile(gpt4_regex)

    def encode(self, text: str) -> list[int]:
        """Encodes a string into tokens.

        >>> enc.encode("hello world")
        [388, 372]
        """
        # Use the regex to split the text into (approximately) words
        words = self._pat.findall(text)
        tokens = []
        for word in words:
            # Turn each word into tokens, using the byte pair encoding algorithm
            word_bytes = word.encode("utf-8")
            word_tokens = bpe_encode(self.mergeable_ranks, word_bytes)
            tokens.extend(word_tokens)
        return tokens
    
    def decode_bytes(self, tokens: list[int]) -> bytes:
        """Decodes a list of tokens into bytes.

        >>> enc.decode_bytes([388, 372])
        b'hello world'
        """
        return b"".join(self._decoder[token] for token in tokens)

    def decode(self, tokens: list[int]) -> str:
        """Decodes a list of tokens into a string.

        Decoded bytes are not guaranteed to be valid UTF-8. In that case, we replace
        the invalid bytes with the replacement character "�".

        >>> enc.decode([388, 372])
        'hello world'
        """
        return self.decode_bytes(tokens).decode("utf-8", errors="replace")

    @staticmethod
    def train(training_data: str, vocab_size: int):
        """Train a BPE tokeniser on some data!"""
        mergeable_ranks = bpe_train(data=training_data, vocab_size=vocab_size)
        return SimpleBytePairEncoding(mergeable_ranks=mergeable_ranks)


def bpe_encode(mergeable_ranks: dict[bytes, int], input: bytes) -> list[int]:
    parts = [bytes([b]) for b in input]
    while True:

        # Iterate over all pairs and find the pair we want to merge the most
        min_idx = None
        min_rank = None
        for i, pair in enumerate(zip(parts[:-1], parts[1:])):
            rank = mergeable_ranks.get(pair[0] + pair[1])
            if rank is not None and (min_rank is None or rank < min_rank):
                min_idx = i
                min_rank = rank

        # If there were no pairs we could merge, we're done!
        if min_rank is None:
            break
        assert min_idx is not None

        # Otherwise, merge that pair and leave the rest unchanged. Then repeat.
        parts = parts[:min_idx] + [parts[min_idx] + parts[min_idx + 1]] + parts[min_idx + 2 :]

    tokens = [mergeable_ranks[part] for part in parts]
    return tokens


def bpe_train(data: str, vocab_size: int) -> dict[bytes, int]:
    # First, add tokens for each individual byte value
    if vocab_size < 2**8:
        raise ValueError("vocab_size must be at least 256, so we can encode all bytes")
    ranks = {}
    for i in range(2**8):
        ranks[bytes([i])] = i

    # Splinter up our data into lists of bytes
    # data = "Hello world"
    # words = [
    #     [b'H', b'e', b'l', b'l', b'o'],
    #     [b' ', b'w', b'o', b'r', b'l', b'd']
    # ]
    words: list[list[bytes]] = [
        [bytes([b]) for b in word.encode("utf-8")] for word in regex.findall(gpt4_regex, data)
    ]

    # Now, use our data to figure out which merges we should make
    while len(ranks) < vocab_size:
        # Find the most common pair. This will become our next token
        stats = collections.Counter()
        for piece in words:
            for pair in zip(piece[:-1], piece[1:]):
                stats[pair] += 1

        most_common_pair = max(stats, key=lambda x: stats[x])
        token_bytes = most_common_pair[0] + most_common_pair[1]
        token = len(ranks)
        # Add the new token!
        ranks[token_bytes] = token

        # Now merge that most common pair in all the words. That is, update our training data
        # to reflect our decision to make that pair into a new token.
        new_words = []
        for word in words:
            new_word = []
            i = 0
            while i < len(word) - 1:
                if (word[i], word[i + 1]) == most_common_pair:
                    # We found our pair! Merge it
                    new_word.append(token_bytes)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            if i == len(word) - 1:
                new_word.append(word[i])
            new_words.append(new_word)
        words = new_words

    return ranks

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence

def pad_sequences_pytorch(sequences, maxlen, padding_value=0):
    tensor_list = [torch.tensor(seq, dtype=torch.long) for seq in sequences]
    padded_seqs = pad_sequence(tensor_list, batch_first=True, padding_value=padding_value)
    
    if padded_seqs.size(1) < maxlen: padded_seqs = F.pad(padded_seqs, (maxlen - padded_seqs.size(1), 0), value=padding_value)
    else: padded_seqs = padded_seqs[:, -maxlen:]
        
    return padded_seqs


def createInputOutput(contents , tokenizer, step_size=10 ):
    input_seq = []
    output_seq = []

    encoded_content = [tokenizer.encode(content) for content in contents]
    max_encoded_content_length = max([len(i) for i in encoded_content])

    for poems in encoded_content:
        for position in range(0,len(poems)-1,step_size):
            new_input = poems[:position]
            new_output = poems[position]
            
            padded_input = pad_sequences_pytorch([new_input], maxlen=max_encoded_content_length, padding_value=0)

            input_seq.append(padded_input[0])
            output_seq.append(new_output)

    return torch.stack(input_seq), F.one_hot(torch.tensor(output_seq , dtype=torch.long), num_classes=len(tokenizer.mergeable_ranks)).float()

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.cuda.empty_cache()

class BiLSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(BiLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.bilstm1 = nn.LSTM(embed_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.bilstm2 = nn.LSTM(hidden_dim * 2, hidden_dim, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(0.5)  
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.bilstm1(x)
        x, _ = self.bilstm2(x)
        x = self.dropout(x)  
        x = self.fc(x[:, -1, :])
        return x
    
class ModifiedBiLSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(ModifiedBiLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.bilstm1 = nn.LSTM(embed_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.bilstm2 = nn.LSTM(hidden_dim * 2, hidden_dim, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(0.9)
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)  
        self.fc2 = nn.Linear(hidden_dim, output_dim)      
        self.relu = nn.Softmax()
        
    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.bilstm1(x)
        x, _ = self.bilstm2(x)
        x = self.dropout(x)
        x = x[:, -1, :]   
        x = self.fc1(x)   
        x = self.relu(x)  
        x = self.fc2(x)   
        return x
    
class OnlineLSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(OnlineLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.bilstm = nn.LSTM(embed_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
    
    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.bilstm(x)
        x = self.fc(x[:, -1, :])
        return x

In [None]:

vocab_size = len(tokenizer.mergeable_ranks)
embed_dim = 10  
hidden_dim = 15 
output_dim = vocab_size  

# model = BiLSTMModel(vocab_size, embed_dim, hidden_dim , output_dim).to(device)
model = OnlineLSTMModel(vocab_size, embed_dim, hidden_dim , output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)  # type: ignore

train_dataset = TensorDataset(input_seq.to(device), output_seq.to(device))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)  
        optimizer.zero_grad()  
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()  
        optimizer.step() 

        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

print("Training complete.")

In [None]:
import torch
import torch.nn.functional as F

def generate(inp, temperature , loop):
    some_input_data = tokenizer.encode(inp)
    some_input_data = pad_sequences_pytorch([some_input_data], maxlen=input_seq.shape[1], padding_value=0)

    input_data = torch.tensor(some_input_data, dtype=torch.long).to(device)
    model.eval()

    with torch.no_grad(): outputs = model(input_data)

    logits = outputs / temperature
    probs = F.softmax(logits, dim=-1)

    if (loop == 0):
        top_values, top_indices = torch.topk(outputs, 10, dim=-1)

        sorted_top_values, sorted_indices = top_values.sort(descending=True)
        sorted_top_indices = top_indices.gather(dim=-1, index=sorted_indices)

        print("Top 10 highest values and their corresponding indices in the tensor:")
        for i in range(10): print(f"Value: {sorted_top_values[0][i].item()}, Index: {sorted_top_indices[0][i].item()} , Token: {tokenizer.decode([sorted_top_indices[0][i].item()])}") #type: ignore


    next_word_index = torch.multinomial(probs[0], num_samples=1).item()
    response = tokenizer.decode([next_word_index])  # type: ignore

    # print(inp + response)
    return response

inp = ""
for i in range(50):
    outs = generate(inp, 0.25 , i) 
    inp = inp + " " + outs

print('\n',inp)