In [1]:
import os
os.chdir("..")

In [26]:
import requests
import numpy as np
import time

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torch.optim as optim

In [3]:
# Get the data
shakespear_url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
shakespear = requests.get(shakespear_url).text
shakespear = shakespear.lower()

In [4]:
context_size = 8
vocab_size = len(set(shakespear))

In [5]:
# Char level tokenization
class CharTokenizer:
    def __init__(self, text):
        chars = sorted(list(set(text)))
        self.str_to_int = { ch:i for i,ch in enumerate(chars) }
        self.int_to_str = { i:ch for i,ch in enumerate(chars) }

    def encode(self, text):            
        ids = [self.str_to_int[char] for char in text]
        return ids
    
    def decode(self, ids):
        text = [self.int_to_str[num] for num in ids]
        text = ''.join(text)
        return text

In [17]:
class PicoGPTDataset(Dataset):
    def __init__(self, token_ids, context_size, stride):
        self.tokenizer = tokenizer
        self.input_ids = []
        self.target_ids = []
 
        for i in range(0, len(token_ids) - context_size, stride):
            input_chunk = token_ids[i:i + context_size]
            target_chunk = token_ids[i + 1: i + context_size + 1]
            # requires_grad_(True) tells all of the input tensors 
            # should be used to calculate the gradients
            # it is set to False here by default
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))
 
    def __len__(self):
        return len(self.input_ids)
 
    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]

In [21]:
def create_dataloader(text, batch_size=4, test_size=0.1, context_size=context_size, stride=1, shuffle=True, drop_last=True):
    tokenizer = CharTokenizer(text)
    token_ids = tokenizer.encode(text)

    test = int(test_size*len(token_ids))
    test_tokens = token_ids[test:]
    train_tokens = token_ids[:test]
    dataset_train = PicoGPTDataset(train_tokens, context_size, stride)
    dataset_test = PicoGPTDataset(test_tokens, context_size, stride)
    
    dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
    return dataloader_train, dataloader_test

In [22]:
train_data, test_data = create_dataloader(shakespear)

In [8]:
data_iter = iter(dataloader)
inputs, targets = next(data_iter)
print("Inputs shape:", inputs.shape)

Inputs shape: torch.Size([4, 8])


In [33]:
class CharRNN(nn.Module):
    def __init__(self, hidden_size, output_size, vocab_size, embed_size):
        super(CharRNN, self).__init__()

        self.embeddings = nn.Embedding(vocab_size, embed_size)
        self.gru = nn.GRU(
            input_size=embed_size,
            hidden_size=hidden_size,
            batch_first=True,
        )
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, inputs):
        embeds = self.embeddings(inputs)
        output, hidden = self.gru(embeds)
        output = self.linear(output)
        return output

In [34]:
model = CharRNN(hidden_size=128, output_size=8, vocab_size=vocab_size, embed_size=64)

In [35]:
params = list(model.parameters())
len(params)

7

In [23]:
import torch
import torch.nn as nn

# Define hyperparameters
vocab_size = 39  # Set accordingly
embedding_dim = 16
hidden_size = 128

# Initialize model, loss, and optimizer
model = ShakespeareModel(vocab_size, embedding_dim, hidden_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(model.parameters())

# Training loop
num_epochs = 1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

best_val_acc = 0.0
checkpoint_path = "my_shakespeare_model.pth"
i=0
for epoch in range(num_epochs):
    model.train()
    for batch in dataloader:
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
        loss.backward()
        optimizer.step()
        if i==10:
            break
        i+=1
    
    # Validation loop
    # model.eval()
    # correct, total = 0, 0
    # with torch.no_grad():
    #     for batch in valid_set:  # Assume valid_set is a DataLoader
    #         inputs, targets = batch
    #         inputs, targets = inputs.to(device), targets.to(device)
            
    #         outputs = model(inputs)
    #         predicted = torch.argmax(outputs, dim=-1)
    #         correct += (predicted == targets).sum().item()
    #         total += targets.numel()
    
    # val_acc = correct / total
    # print(f"Epoch {epoch+1}, Validation Accuracy: {val_acc:.4f}")

    # # Save the best model
    # if val_acc > best_val_acc:
    #     best_val_acc = val_acc
    #     torch.save(model.state_dict(), checkpoint_path)


In [26]:
outputs.shape

torch.Size([4, 8, 39])

In [None]:
start = time.time()
all_losses = train(model, dataloader, n_epoch=27, learning_rate=0.15, report_every=5)
end = time.time()
print(f"training took {end-start}s")

In [33]:
data_iter = iter(dataloader)
inputs, targets = next(data_iter)

In [34]:
inputs

tensor([[18, 27, 30, 32, 20,  1, 32, 20],
        [24, 16, 21, 31, 20,  1, 18, 30],
        [ 0, 27, 18, 32, 17, 26, 17, 30],
        [31,  8,  0, 15, 27, 33, 31, 21]])

In [40]:
targets

torch.Size([4, 8])

In [50]:
def simple_text_generate(model, idx, max_new_tokens, context_size):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
           
        logits = logits[:, -1, :]
        probas = torch.softmax(logits, dim=-1)
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)
        idx = torch.cat((idx, idx_next), dim=1)
        
    return idx

In [110]:
def generate_and_print_sample(model, tokenizer, device, start_context):
    model.eval()
    context_size = model.embeddings.weight.shape[0]
    encoded = encoded = torch.tensor(tokenizer.encode(start_context)).unsqueeze(0)
    with torch.no_grad():
        token_ids = simple_text_generate(
            model=model, idx=encoded,
            max_new_tokens=7, context_size=context_size
        )
        decoded_text = tokenizer.decode(token_ids.squeeze(0).tolist())
        print(decoded_text.replace("\n", " "))  # Compact print format
    model.train()

In [111]:
generate_and_print_sample(model, tokenizer, 'cpu', 'thy')

thy ,,' --


In [114]:
from utils import *

In [116]:
calc_loss_batch(inputs, targets, model, 'cpu')

IndexError: Target 27 is out of bounds.

In [65]:
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0) # add batch dimension
    return encoded_tensor
    

In [81]:
import tiktoken
tok = tiktoken.get_encoding('gpt2')
text_to_token_ids('thy world nonsense', tok)

tensor([[20057,   995, 18149]])