In [1]:
import numpy as np 
import tiktoken as tk
import torch
import os 

In [2]:
with open ('data.txt', 'r', encoding='utf-8') as f:
    data = f.read()

print('length of dataset', len(data))

length of dataset 1115394


In [3]:
print(data[:100])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


In [4]:
# extract the unique characters that occur in this text 
chars = sorted(list(set(data)))
vocab_size = len(chars)
print(''.join(chars))
print('vocab size', vocab_size)


 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
vocab size 65


In [5]:
# develope a strategy to tokenize the input text 
# create a mapping from characters to integers
char_to_int = {ch:i for i,ch in enumerate(chars)} # create a dictionary that maps characters to integers
int_to_char = {i:ch for i,ch in enumerate(chars)} # create a dictionary that maps integers to characters
encode = lambda x: [char_to_int[ch] for ch in x] # take a string and convert it to a list of integers
decode = lambda x: ''.join([int_to_char[ch] for ch in x]) # take the list of integers and convert it back to a string

print('encoded:', encode('hello world'))
print('decoded:', decode(encode('hello world')))


encoded: [46, 43, 50, 50, 53, 1, 61, 53, 56, 50, 42]
decoded: hello world


In [6]:
# Using tiktoken instead of the above simple tokenizer
enc = tk.get_encoding('gpt2')
print(enc.encode('hello world'))
print(enc.decode(enc.encode('hello world')))

# get number of classes from the tokenizer for the data.txt file 
print('number of classes:', enc.n_vocab)



[31373, 995]
hello world
number of classes: 50257


In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
import tiktoken  # Ensure tiktoken is installed

class LLModel(nn.Module):
    def __init__(self, block_size=128, batch_size=32, lr=5e-5):
        """
        Initializes the LLModel class with a transformer model.
        """
        super().__init__()  # Ensure the nn.Module is properly initialized

        # Device setup
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

        # Tokenizer setup using tiktoken
        self.tokenizer = tiktoken.get_encoding('gpt2')
        self.token_encoder = self.tokenizer.encode
        self.token_decoder = self.tokenizer.decode
        self.vocab_size = self.tokenizer.n_vocab
        self.num_classes = self.vocab_size  # Assuming token-level classification

        # Hyperparameters
        self.block_size = block_size
        self.batch_size = batch_size

        # Transformer model
        self.model = TransformerModel(
            vocab_size=self.vocab_size,
            embed_dim=128,
            num_heads=4,
            hidden_dim=256,
            num_layers=2,
            num_classes=self.num_classes
        ).to(self.device)

        # Loss function & optimizer
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.AdamW(self.model.parameters(), lr=lr)

    def get_batch(self, tokenized_data, split='train'):
        """
        Generates a batch of input sequences based on tokenized data
        """
        # Ensure enough data points are available
        if len(tokenized_data) <= self.block_size:
            raise ValueError("Not enough data for the given block size.")

        # Select train or validation set
        data = tokenized_data if split == 'train' else tokenized_data  # Adjust if needed

        ix = torch.randint(len(data) - self.block_size, (self.batch_size,))
        x = torch.stack([data[i : i + self.block_size] for i in ix])  # Shape: (batch_size, block_size)
        y = torch.stack([data[i + 1 : i + self.block_size + 1] for i in ix])  # Shape: (batch_size, block_size)
        assert x.shape == y.shape, "Input and target shapes do not match."
        assert x.shape == (self.batch_size, self.block_size), f"Invalid batch shape: {x.shape}"
        assert y.shape == (self.batch_size, self.block_size), f"Invalid target shape: {y.shape}"
        return x, y

    def train_model(self, text_data, epochs=1000, print_freq=10, train_val_split=0.8):
        """
        Trains the model on input text data.
        """
        best_loss = float('inf')
        tokenized_data = torch.tensor(self.token_encoder(text_data), dtype=torch.long, device=self.device)
        n = int(train_val_split * len(tokenized_data))
        train_data, val_data = tokenized_data[:n], tokenized_data[n:]
        x, y = self.get_batch(train_data, 'train')
        x_val, y_val = self.get_batch(val_data, 'train')
        for epoch in range(epochs):
            self.model.train()
            self.optimizer.zero_grad()
            logits = self.model(x)
            loss = self.criterion(logits.view(-1, self.num_classes), y.view(-1))
            loss.backward()
            self.optimizer.step()

            val_loss = self.criterion(self.model(x_val).view(-1, self.num_classes), y_val.view(-1))

            # Save the best model
            if loss.item() < best_loss:
                best_loss = loss.item()
                torch.save(self.model.state_dict(), 'best_model.pth')

            if epoch % print_freq == 0:
                print(f"Epoch {epoch} | Loss: {loss.item():.4f} | Val Loss: {val_loss:.4f}")

        return best_loss

    def evaluate(self, text_data):
        """
        Evaluates the model on a validation batch.
        """
        tokenized_data = torch.tensor(self.token_encoder(text_data), dtype=torch.long, device=self.device)
        x, y = self.get_batch(tokenized_data, 'train')
        self.model.eval()
        with torch.no_grad():
            logits = self.model(x)
            loss = self.criterion(logits.view(-1, self.num_classes),y.view(-1))
        return loss.item()

    def predict(self, text):
        tokenized_data = torch.tensor(self.token_encoder(text), device=self.device).unsqueeze(0)
        self.model.eval()
        with torch.no_grad():
            logits = self.model(tokenized_data)
            predicted_tokens = torch.argmax(logits, dim=-1)
            return self.token_decoder(predicted_tokens.squeeze().tolist())


class TransformerModel(nn.Module):
    def __init__(self, vocab_size: int, embed_dim=128, num_heads=4, hidden_dim=256, num_layers=2, num_classes=50257):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.positional_encoding = PositionalEncoding(embed_dim)
        self.encoder_layers = nn.TransformerEncoderLayer(
            d_model=embed_dim, nhead=num_heads, dim_feedforward=hidden_dim, batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layers, num_layers=num_layers)
        self.classifier = nn.Linear(embed_dim, num_classes)

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        logits = self.classifier(x)
        return logits


class PositionalEncoding(nn.Module):
    """
    Implements positional encoding for Transformer models.
    """
    def __init__(self, embed_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)

        self.register_buffer("pe", pe)

    def forward(self, x):
        return x + self.pe[:, : x.size(1), :]


In [None]:
# Initialize the model
model = LLModel()
model.train_model(data, epochs=1000, print_freq=10)

Epoch 0 | Loss: 10.9861 | Val Loss: 10.9722
Epoch 10 | Loss: 10.8180 | Val Loss: 10.8259
Epoch 20 | Loss: 10.6460 | Val Loss: 10.6790
Epoch 30 | Loss: 10.4659 | Val Loss: 10.5267
Epoch 40 | Loss: 10.2818 | Val Loss: 10.3641
Epoch 50 | Loss: 10.0832 | Val Loss: 10.1953


In [None]:
model.predict(data[:1000])
