<a href="https://colab.research.google.com/github/Rajadhurairajendhiran123/gpt_scratch/blob/main/gpt_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install transformers torch numpy


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
from tqdm import tqdm

# Load the text data
with open('/content/llm trine.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# Create a character-level vocabulary
chars = sorted(set(text))
vocab_size = len(chars)
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}

# Function to encode text into integer format
def encode(text):
    return [stoi[c] for c in text]  # convert text to list of integers

# Function to decode indices back into text
def decode(indices):
    return ''.join([itos[i] for i in indices])  # convert list of integers back to text

# Convert the text into integer data
data = torch.tensor(encode(text), dtype=torch.long)

# Define the dataset class
class TextDataset(Dataset):
    def __init__(self, text, block_size):
        self.text = text
        self.block_size = block_size
        self.num_samples = len(text) - block_size  # each sample is a sequence of 'block_size' characters

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # Get a sequence of characters
        chunk = self.text[idx: idx + self.block_size + 1]
        # The last character is the target (next character)
        x = torch.tensor(chunk[:-1], dtype=torch.long)  # input sequence
        y = torch.tensor(chunk[1:], dtype=torch.long)   # target sequence
        return x, y

# Create train and validation splits
block_size = 128  # Example block size, adjust as needed
train_data = TextDataset(data, block_size)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

# Define the Self Attention Head
class SelfAttentionHead(nn.Module):
    def __init__(self, embed_dim, head_size, block_size, dropout, device):
        super().__init__()
        self.key = nn.Linear(embed_dim, head_size, bias=False)
        self.query = nn.Linear(embed_dim, head_size, bias=False)
        self.value = nn.Linear(embed_dim, head_size, bias=False)
        self.tril = torch.tril(torch.ones(block_size, block_size, device=device))  # Ensure tril is on the correct device
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        # Compute scaled dot-product attention
        wei = q @ k.transpose(-2, -1) * C**-0.5
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)

        out = wei @ v
        return out

# Define Multihead Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, block_size, dropout, device):
        super().__init__()
        head_size = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [SelfAttentionHead(embed_dim, head_size, block_size, dropout, device) for _ in range(num_heads)]
        )
        self.proj = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([head(x) for head in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

# Define Feed Forward Network
class FeedForward(nn.Module):
    def __init__(self, embed_dim, dropout):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(embed_dim, 4 * embed_dim),
            nn.ReLU(),
            nn.Linear(4 * embed_dim, embed_dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

# Define Transformer Block
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, block_size, dropout=0.1, device='cuda'):
        super(TransformerBlock, self).__init__()
        self.device = device
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.block_size = block_size
        self.dropout = dropout

        # Attention layers
        self.attn = MultiHeadAttention(embed_dim, num_heads, block_size, dropout, device)

        # Feedforward network
        self.ffn = FeedForward(embed_dim, dropout)

        # Dropout and layer normalization
        self.dropout_layer = nn.Dropout(dropout)
        self.ln_1 = nn.LayerNorm(embed_dim)
        self.ln_2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        # Attention Layer
        attn_out = self.attn(x)
        x = self.ln_1(x + self.dropout_layer(attn_out))

        # Feedforward Layer
        ff_out = self.ffn(x)
        x = self.ln_2(x + self.dropout_layer(ff_out))

        return x

# Define the GPT Language Model
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, num_heads=8, num_layers=4, block_size=128, dropout=0.1, device='cuda'):
        super(GPTLanguageModel, self).__init__()

        self.device = device

        # Token and Position embeddings
        self.token_embed = nn.Embedding(vocab_size, embed_dim).to(device)
        self.pos_embed = nn.Embedding(block_size, embed_dim).to(device)

        # Transformer blocks with device
        self.blocks = nn.ModuleList(
            [TransformerBlock(embed_dim, num_heads, block_size, dropout, device) for _ in range(num_layers)]
        ).to(device)

        # Final layer normalization and output projection
        self.ln_f = nn.LayerNorm(embed_dim).to(device)
        self.head = nn.Linear(embed_dim, vocab_size).to(device)

    def forward(self, x):
        batch_size, seq_len = x.size()

        # Get token embeddings
        token_embeddings = self.token_embed(x).to(self.device)  # [batch_size, seq_len, embed_dim]

        # Create position embeddings
        position_embeddings = self.pos_embed(torch.arange(seq_len, device=self.device))  # [seq_len, embed_dim]
        position_embeddings = position_embeddings.unsqueeze(0).expand(batch_size, -1, -1).to(self.device)  # [batch_size, seq_len, embed_dim]

        # Combine token and position embeddings
        x = token_embeddings + position_embeddings

        # Pass through transformer blocks
        for block in self.blocks:
            x = block(x)

        # Apply final layer normalization and linear projection
        x = self.ln_f(x)
        logits = self.head(x)  # [batch_size, seq_len, vocab_size]

        return logits

# Training Function
def train_model(model, train_loader, epochs=10, lr=1e-3, device='cuda'):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for i, (x, y) in enumerate(train_loader):
            x, y = x.to(device), y.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            logits = model(x)

            # Compute the loss (cross-entropy)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), y.view(-1))
            total_loss += loss.item()

            # Backward pass
            loss.backward()
            optimizer.step()

            # Print progress
            if i % 100 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Step {i}, Loss: {loss.item():.4f}")

        # Print average loss for the epoch
        print(f"Epoch {epoch+1}/{epochs}, Average Loss: {total_loss/len(train_loader):.4f}")

# Configuration and training setup
device = 'cuda' if torch.cuda.is_available() else 'cpu'
vocab_size = len(chars)  # Dynamic vocab size based on text data
block_size = 128  # Max sequence length
seq_len = 128  # Sequence length for training

# Initialize the model
model = GPTLanguageModel(vocab_size=vocab_size, embed_dim=256, num_heads=8, num_layers=4, block_size=block_size, device=device)

# Train the model
train_model(model, train_loader, epochs=5, lr=1e-3, device=device)


In [None]:
import torch
import torch.nn.functional as F

def sample(model, start_text, max_length=200, temperature=1.0, device='cuda', top_k=50):
    """
    Sample text from the model given a starting text.

    Parameters:
    - model: The trained language model.
    - start_text: The initial prompt text.
    - max_length: The maximum number of tokens to generate.
    - temperature: Controls randomness in prediction (higher means more randomness).
    - device: Device to run the model on (e.g., 'cuda' or 'cpu').
    - top_k: The number of highest probability tokens to consider during sampling (for top-k sampling).

    Returns:
    - generated_text: The generated text as a string.
    """
    model.eval()  # Set the model to evaluation mode
    input_ids = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)  # Encode the input text
    generated = input_ids

    for _ in range(max_length):
        logits = model(generated)  # Get the model's output logits
        logits = logits[:, -1, :] / temperature  # Scale logits by temperature
        logits = top_k_sampling(logits, top_k)  # Apply top-k sampling to logits
        probs = F.softmax(logits, dim=-1)  # Convert logits to probabilities
        next_token = torch.multinomial(probs, 1)  # Sample the next token

        generated = torch.cat((generated, next_token), dim=1)  # Append the next token to the generated sequence

        # Stop if an end token (e.g., newline) is generated
        if next_token.item() == stoi['\n']:
            break

    generated_text = decode(generated[0].cpu().numpy())  # Decode the generated sequence back to text
    return generated_text

def top_k_sampling(logits, top_k):
    """
    Apply top-k sampling to logits.
    Args:
    - logits: The model's output logits.
    - top_k: The number of highest probability tokens to consider.

    Returns:
    - logits: The adjusted logits after applying top-k sampling.
    """
    # Ensure top_k is not greater than the number of logits
    top_k = min(top_k, logits.size(-1))  # Make sure top_k is within the size of the vocabulary

    # Get the indices of the top-k highest logits
    values, indices = torch.topk(logits, top_k)
    logits = torch.zeros_like(logits).to(logits.device)
    logits.scatter_(1, indices, values)
    return logits

# Example usage:
start_text = "Idomeneus, why dost thou prate endlessly?"
generated_text = sample(model, start_text, max_length=200, temperature=0.8, device=device, top_k=50)
print(generated_text)


Idomeneus, why dost thou prate endlessly?



In [None]:
def sample_continuous(model, start_text, max_length=200, temperature=0.8, device='cuda', top_k=50, stop_token='\n'):
    """
    Generate text continuously by appending the generated tokens to the input text.

    Args:
    - model: The trained language model.
    - start_text (str): The initial text to start the generation.
    - max_length (int): Maximum length of the generated text.
    - temperature (float): Controls randomness in the output. Lower values make the output more deterministic.
    - device (str): The device on which the model is loaded ('cuda' or 'cpu').
    - top_k (int): The number of top logits to sample from (for diversity).
    - stop_token (str): Token that indicates the end of generation.

    Returns:
    - generated_text (str): The continuous text generated by the model.
    """
    model.eval()
    input_ids = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)
    generated = input_ids

    generated_text = start_text

    for _ in range(max_length):
        # Generate logits for the next token
        logits = model(generated)
        logits = logits[:, -1, :] / temperature

        # Apply top-k sampling for diversity
        probs = F.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, 1)

        # Append the predicted token to the generated sequence
        generated = torch.cat((generated, next_token), dim=1)

        # Decode the token and append it to the generated text
        next_token_text = decode(next_token[0].cpu().numpy())
        generated_text += next_token_text

        # Stop if the stop token is encountered
        if next_token_text == stop_token:
            break

    return generated_text

# Example usage in a loop:
start_text = "Idomeneus, why dost thou prate endlessly?"
generated_text = start_text
for _ in range(5):  # Generate text in 5 steps or more as needed
    generated_text = sample_continuous(model, generated_text, max_length=200, temperature=0.8, device=device, top_k=50)
    print(generated_text)
    print("------------------------------------------------------------------------------------------------------")
