<a href="https://colab.research.google.com/github/ChVenkatSai/Scratch-GPT/blob/main/GPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torchtyping

In [2]:
import torch
import torch.nn as nn
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
from torchtyping import TensorType
from collections import OrderedDict
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import requests
from torch.utils.data import Subset
import random

In [3]:
class GPT(nn.Module):

    def __init__(self, vocab_size: int, context_length: int, model_dim: int, num_blocks: int, num_heads: int):
        super().__init__()
        torch.manual_seed(0)
        self.context_length = context_length
        self.model_dim = model_dim
        self.word = nn.Embedding(vocab_size,model_dim)
        self.position = nn.Embedding(context_length,model_dim)
        od = OrderedDict()
        for i in range(num_blocks):
            od.update({f"block {i}":self.TransformerBlock(model_dim,num_heads)})
        self.layer = nn.Sequential(od)
        self.norm = nn.LayerNorm(model_dim)
        self.final = nn.Linear(model_dim, vocab_size)

    def forward(self, context: TensorType[int]) -> TensorType[float]:
        torch.manual_seed(0)
        word_embed = self.word(context)
        batch_size = context.shape[0]
        pos = torch.arange(self.context_length).to(device)
        pos_embed = self.position(pos).unsqueeze(0).expand(batch_size,-1,-1)
        embed = word_embed+pos_embed
        block = self.layer(embed)
        norm = self.norm(block)
        fin = self.final(norm)
        return fin

    class TransformerBlock(nn.Module):

      def __init__(self, model_dim: int, num_heads: int):
          super().__init__()
          torch.manual_seed(0)
          self.first = nn.LayerNorm(model_dim)
          self.attention = self.MultiHeadedSelfAttention(model_dim,model_dim,num_heads)
          self.second = nn.LayerNorm(model_dim)
          self.linear = self.VanillaNeuralNetwork(model_dim)

      def forward(self, embedded: TensorType[float]) -> TensorType[float]:
          torch.manual_seed(0)
          norm1 = self.first(embedded)
          first = self.attention(norm1)
          new = first+embedded
          norm2 = self.second(new)
          second = self.linear(norm2)
          return second + new

      class MultiHeadedSelfAttention(nn.Module):

        def __init__(self, embedding_dim: int, attention_dim: int, num_heads: int):
            super().__init__()
            torch.manual_seed(0)
            self.head_size = attention_dim//num_heads
            self.multiattention = nn.ModuleList([self.SingleHeadAttention(embedding_dim,self.head_size) for _ in range(num_heads)])


        def forward(self, embedded: TensorType[float]) -> TensorType[float]:
            l = []
            for layer in (self.multiattention):
                l.append(layer(embedded))
            return torch.cat(l, dim=2)

        class SingleHeadAttention(nn.Module):

          def __init__(self, embedding_dim: int, attention_dim: int):
              super().__init__()
              torch.manual_seed(0)
              self.attention_dim = attention_dim
              self.key = nn.Linear(embedding_dim,attention_dim,bias=False)
              self.query = nn.Linear(embedding_dim,attention_dim,bias=False)
              self.value = nn.Linear(embedding_dim,attention_dim,bias=False)

          def forward(self, embedded: TensorType[float]) -> TensorType[float]:
              keys = self.key(embedded)
              queries = self.query(embedded)
              values = self.value(embedded)
              attention = torch.matmul(queries,torch.transpose(keys,1,2))
              attention = attention/torch.sqrt(torch.tensor(self.attention_dim))
              mask = torch.ones(attention.shape)
              mask = torch.triu(mask,diagonal=1).bool()
              masked = attention.masked_fill(mask.to(device), float('-inf'))
              norm = nn.functional.softmax(masked,dim=2)
              final = torch.matmul(norm,values)
              return final

      class VanillaNeuralNetwork(nn.Module):

        def __init__(self, model_dim: int):
            super().__init__()
            torch.manual_seed(0)
            self.up_projection = nn.Linear(model_dim, model_dim * 4)
            self.relu = nn.ReLU()
            self.down_projection = nn.Linear(model_dim * 4, model_dim)
            self.dropout = nn.Dropout(0.2) # using p = 0.2

        def forward(self, x: TensorType[float]) -> TensorType[float]:
            torch.manual_seed(0)
            return self.dropout(self.down_projection(self.relu(self.up_projection(x))))

In [None]:


# Download a book from Project Gutenberg
url = "http://www.gutenberg.org/files/1342/1342-0.txt"  # Example: Pride and Prejudice
response = requests.get(url)
text = response.text

# Preprocess the text
text = text.lower()
text = text.replace('\r', ' ').replace('\n', ' ').replace('\t', ' ')
text = ''.join([c for c in text if c.isalpha() or c.isspace()])

# Prepare the dataset
class TextDataset(Dataset):
    def __init__(self, text, context_length):
        self.text = text
        self.context_length = context_length
        self.vocab = sorted(set(text))
        self.char_to_idx = {ch: idx for idx, ch in enumerate(self.vocab)}
        self.idx_to_char = {idx: ch for idx, ch in enumerate(self.vocab)}
        self.data = self.encode(text)

    def encode(self, text):
        return [self.char_to_idx[ch] for ch in text]

    def __len__(self):
        return len(self.data) - self.context_length

    def __getitem__(self, idx):
        context = self.data[idx:idx + self.context_length]
        target = self.data[idx + 1:idx + self.context_length + 1]
        return torch.tensor(context, dtype=torch.long), torch.tensor(target, dtype=torch.long)

# Hyperparameters
vocab_size = len(set(text))
context_length = 128
model_dim = 512
num_blocks = 6
num_heads = 8
batch_size = 64
num_epochs = 5
learning_rate = 0.001

# Instantiate the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GPT(vocab_size, context_length, model_dim, num_blocks, num_heads).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Load dataset
dataset = TextDataset(text, context_length)
subset_indices = random.sample(range(len(dataset)), k=100000)  # Choose samples randomly if training size is large for free compute model i.e T4 GPU
print("Unencoded text:", dataset.text[:context_length + 1])
print(dataset[0])
subset_dataset = Subset(dataset, subset_indices)
dataloader = DataLoader(subset_dataset, batch_size=batch_size, shuffle=True)

# Training loop
for epoch in range(num_epochs):
    model.train()
    for batch_idx, (context, target) in enumerate(dataloader):
        context, target = context.to(device), target.to(device)

        # Forward pass
        output = model(context)

        # Reshape output and target to match the dimensions for CrossEntropyLoss
        output = output.view(-1, vocab_size)
        target = target.view(-1)

        # Compute loss
        loss = criterion(output, target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        if batch_idx % 500 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx}/{len(dataloader)}], Loss: {loss.item():.4f}')

print("Training complete!")


In [None]:
# After training is complete
WEIGHT_PATH = 'weights.pt'
torch.save(model.state_dict(), WEIGHT_PATH)
print(f"Model weights saved to {WEIGHT_PATH}")

In [None]:
with torch.no_grad():
    context = torch.zeros(1, context_length, dtype=torch.long).to(device)  # Example initial context
    logits = model(context)
    print(logits)

In [10]:
def generate(model, new_chars: int, context, context_length: int, int_to_char: dict) -> str:
    res = []
    for i in range(new_chars):
        if len(context.T) > context_length:
            context = context[:, -context_length:]
        prediction = model(context) # B, T, Vocab_Size
        last_time_step = prediction[:, -1, :] # B, Vocab_Size
        probabilities = nn.functional.softmax(last_time_step, dim = -1)
        next_char = torch.multinomial(probabilities, 1)
        context = torch.cat((context, next_char), dim = -1)
        res.append(int_to_char[next_char.item()])
    return ''.join(res)

In [None]:
new_chars = 1000 # Number of characters to generate
context = torch.ones(1, 128, dtype = torch.int64).to(device)  # Initial context, e.g., all 1 tensor
generated_text = generate(model, new_chars, context,context_length, dataset.idx_to_char)

# Print or use the generated text
print(generated_text)

In [None]:
# Meaningful context

initial_context = "it is a truth universally acknowledged that a single man in possession of a good fortune must be in want of a wife"

initial_context = initial_context[:context_length].ljust(context_length)

# Encode the initial context
encoded_context = [dataset.char_to_idx[ch] for ch in initial_context]
context_tensor = torch.tensor(encoded_context, dtype=torch.long).unsqueeze(0).to(device)  # Add batch dimension

# Print the initial context and its tensor
print("Initial context (string):", initial_context)
print("Initial context (tensor):", context_tensor)

generated_text =generate(model, new_chars=500, context=context_tensor, context_length=context_length, int_to_char=dataset.idx_to_char)

print("Generated text:", generated_text)
