In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import tiktoken
from torch.utils.data import Dataset, DataLoader

  cpu = _conversion_method_template(device=torch.device("cpu"))


In [2]:
file_path = "data/frankenstein.txt"

In [3]:
class TextDataset(Dataset):
    def __init__(self, text:str, tokenizer, max_length:int, stride:int) -> None:
        self.input_ids = []
        self.target_ids = []

        # Encode the text
        token_ids = tokenizer.encode(text, allowed_special={"<|endoftext|>"})

        # Use a sliding window to chunk the book into overlapping sequences of max_length
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self) -> int:
        return len(self.input_ids)

    def __getitem__(self, idx:int) ->tuple[torch.Tensor, torch.Tensor]:
        return self.input_ids[idx], self.target_ids[idx]

In [4]:
# Initialize the tokenizer
tokenizer = tiktoken.get_encoding("gpt2")  # You can choose other encodings if needed

# Example text (replace this with your own data)
with open(file_path, "r", encoding="utf-8") as f:
    text = f.read()

# Hyperparameters
max_length = 128
stride = 64
batch_size = 32

# Create the dataset and dataloader
dataset = TextDataset(text, tokenizer, max_length, stride)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [5]:
class MiniGPT(nn.Module):
    def __init__(self, vocab_size, context_length, embedding_dim=256, num_heads=8, num_layers=4, ff_dim=512, dropout=0.1):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.position_embedding = nn.Embedding(context_length, embedding_dim)
        
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=embedding_dim,
                nhead=num_heads,
                dim_feedforward=ff_dim,
                dropout=dropout,
                activation='gelu'
            )
            for _ in range(num_layers)
        ])
        
        self.ln_f = nn.LayerNorm(embedding_dim)
        self.head = nn.Linear(embedding_dim, vocab_size, bias=False)
        
    def forward(self, input_ids):
        batch_size, seq_length = input_ids.size()
        positions = torch.arange(0, seq_length, dtype=torch.long, device=input_ids.device).unsqueeze(0).expand(batch_size, seq_length)
        
        x = self.token_embedding(input_ids) + self.position_embedding(positions)
        
        for layer in self.layers:
            x = layer(x)
        
        x = self.ln_f(x)
        logits = self.head(x)
        return logits

    def generate(self, input_ids, max_new_tokens=50, temperature=1.0, top_k=50):
        self.eval()
        with torch.no_grad():
            for _ in range(max_new_tokens):
                logits = self.forward(input_ids)
                logits = logits[:, -1, :] / temperature
                if top_k is not None:
                    values, indices = torch.topk(logits, top_k)
                    logits = torch.full_like(logits, -float('Inf')).scatter_(1, indices, values)
                probs = F.softmax(logits, dim=-1)
                next_token = torch.multinomial(probs, num_samples=1)
                input_ids = torch.cat([input_ids, next_token], dim=1)
        return input_ids


In [6]:
# Determine vocabulary size
vocab_size = tokenizer.n_vocab

# Initialize the model
model = MiniGPT(vocab_size=vocab_size, context_length=max_length)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)


In [9]:
epochs = 50

model.train()
for epoch in range(epochs):
    total_loss = 0
    for batch_idx, (input_ids, target_ids) in enumerate(dataloader):
        input_ids = input_ids.to(device)
        target_ids = target_ids.to(device)
        
        optimizer.zero_grad()
        logits = model(input_ids)
        
        # Reshape logits and targets for loss computation
        logits = logits.view(-1, vocab_size)
        target_ids = target_ids.view(-1)
        
        loss = criterion(logits, target_ids)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        if (batch_idx + 1) % 100 == 0:
            avg_loss = total_loss / 100
            print(f"Epoch [{epoch+1}/{epochs}], Step [{batch_idx+1}/{len(dataloader)}], Loss: {avg_loss:.4f}")
            total_loss = 0
    print(f"Epoch [{epoch+1}/{epochs}] completed. Total Loss: {total_loss}")

Epoch [1/50] completed. Total Loss: 199.14232444763184
Epoch [2/50] completed. Total Loss: 197.29787826538086
Epoch [3/50] completed. Total Loss: 195.35980677604675
Epoch [4/50] completed. Total Loss: 193.6152548789978
Epoch [5/50] completed. Total Loss: 191.7218861579895
Epoch [6/50] completed. Total Loss: 190.0475845336914
Epoch [7/50] completed. Total Loss: 188.25259232521057
Epoch [8/50] completed. Total Loss: 186.5202317237854
Epoch [9/50] completed. Total Loss: 184.94557189941406
Epoch [10/50] completed. Total Loss: 183.25842952728271
Epoch [11/50] completed. Total Loss: 181.79093766212463
Epoch [12/50] completed. Total Loss: 180.13189482688904
Epoch [13/50] completed. Total Loss: 178.5540850162506
Epoch [14/50] completed. Total Loss: 177.0247905254364
Epoch [15/50] completed. Total Loss: 175.46701097488403
Epoch [16/50] completed. Total Loss: 174.062358379364
Epoch [17/50] completed. Total Loss: 172.68023681640625
Epoch [18/50] completed. Total Loss: 171.1564450263977
Epoch [19/

KeyboardInterrupt: 

In [10]:
# Example prompt
prompt = "Once upon a time"

# Encode the prompt
input_ids = torch.tensor([tokenizer.encode(prompt)], dtype=torch.long).to(device)

# Generate text
generated_ids = model.generate(input_ids, max_new_tokens=50, temperature=0.8, top_k=50)

# Decode the generated ids
generated_text = tokenizer.decode(generated_ids[0].tolist())

print("Generated Text:")
print(generated_text)

Generated Text:
Once upon a time, and
the not banished of future destiny, I should
The father.

horror,


No, or if to the Arabian were
the woe, and sometimes you, and

”
com scene



In [13]:
def count_parameters(model) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [14]:
count_parameters(model)

27873280