In [None]:
"""import urllib.request

url = "https://raw.githubusercontent.com/karpathy/ng-video-lecture/refs/heads/master/input.txt"
filename = "input.txt"

urllib.request.urlretrieve(url, filename)
print(f"{filename} has been downloaded.")"""

In [None]:
filename = 'input.txt'
with open (filename, 'r', encoding= 'utf-8' ) as f:
    text = f.read()

In [None]:
print (f"length of dataset in characters: {len(text)}")

In [None]:
print (text[:1000])  # print the first 1000 characters

In [None]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
print(''.join(chars))
print (f"vocab size: {vocab_size}")

In [None]:
# Define the vocabulary mapping for tokenization
char_to_index = {char: idx for idx, char in enumerate(chars)}
index_to_char = {idx: char for idx, char in enumerate(chars)}

def tokenize(text):
    """Converts a string into a list of integer token IDs."""
    return [char_to_index[char] for char in text]

def detokenize(token_ids):
    """Converts a list of integer token IDs back into a string."""
    return ''.join([index_to_char[idx] for idx in token_ids])
tokenized_text = tokenize("hello world")
print(tokenized_text)
print(detokenize(tokenized_text))

In [None]:
import torch
data = torch.tensor(tokenize(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:1000])  # print the first 1000 token IDs

In [None]:
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

In [None]:
block_size = 8  # context length
train_data[:block_size+1]

In [None]:
x = train_data[:block_size]
y = train_data[1:block_size+1]
for t in range(block_size):
    context = x[:t+1] # 'context' is the "history" the model sees so far (from start to current index t)
    target = y[t] # 'target' is the specific token that comes immediately after that context
    print(f"when input is {context.tolist()} the target: {target.item()}")  

In [None]:
for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    
    context_str = detokenize(context.tolist())
    target_char = detokenize([target.item()])
    
    # Notice the str() wrapping the numeric values
    print(f"Context IDs: {str(context.tolist()):<30} | Text: '{context_str}'")
    print(f"Target ID:   {str(target.item()):<10} | Next: '{target_char}'")
    print("-" * 60)

In [None]:
torch.manual_seed(2971)
batch_size = 4  # Number of independent sequences to process in parallel
block_size = 8  # Maximum length of the sequence (context) for predictions

def get_batch(split_type):
    """
    Constructs a batch of inputs (x) and targets (y).
    Each target y is the input x shifted by one character.
    """
    # Select the appropriate dataset split
    dataset = train_data if split_type == 'train' else val_data
    
    # Generate 'batch_size' number of random starting points in the dataset
    # We subtract block_size to ensure we don't go out of bounds
    random_offsets = torch.randint(len(dataset) - block_size, (batch_size,))
    
    # Stack individual sequences into a single 2D Tensor (Batch, Block)
    input_batch = torch.stack([dataset[i : i + block_size] for i in random_offsets])
    
    # Target batch is shifted forward by one index
    target_batch = torch.stack([dataset[i + 1 : i + block_size + 1] for i in random_offsets])
    
    return input_batch, target_batch

# Generate a sample training batch
batch_inputs, batch_targets = get_batch('train')

print(f"Batch Inputs Shape: {batch_inputs.shape}")
print(f"Batch Targets Shape: {batch_targets.shape}")
print("-" * 60)

# Demonstrate the autoregressive training examples within the batch
for b_idx in range(batch_size):
    print(f"--- Processing Sequence {b_idx} in the Batch ---")
    
    for t_idx in range(block_size):
        # Extract the sequence history up to current time t
        context_tokens = batch_inputs[b_idx, : t_idx + 1]
        target_token = batch_targets[b_idx, t_idx]
        
        # Convert numeric IDs back to human-readable text
        context_text = detokenize(context_tokens.tolist())
        target_text = detokenize([target_token.item()])
        
        # Log the relationship between context and prediction
        print(f"Seq {b_idx}, Step {t_idx} | Context: {str(context_tokens.tolist()):<30} | Text: '{context_text}'")
        print(f"              | Next ID: {str(target_token.item()):<10} | Next Char: '{target_text}'")
    
    print("-" * 60)

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# Set seed for reproducibility
torch.manual_seed(2971)

class BigramLanguageModel(nn.Module):
    """
    A simple Bigram Language Model. 
    It predicts the next character based solely on the current character.
    """
    def __init__(self, vocab_size):
        super().__init__()
        # Each token's ID acts as an index to look up the 'logits' (scores) 
        # for what character should come next.
        # Shape: (vocab_size, vocab_size)
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

    def forward(self, indices, targets=None):
        """
        Calculates the scores (logits) for the next character in a sequence.
        indices: (Batch, Time) tensor of integers
        targets: (Batch, Time) tensor of integers
        """
        # We look up the 'logits' for the next token directly from the table
        # Output shape: (Batch, Time, Channels/Vocab_Size)
        logits = self.token_embedding_table(indices)

        if targets is None:
            loss = None
        else:
            # PyTorch's cross_entropy expects the 'Channels' to be the 2nd dimension
            # We flatten the Batch and Time dimensions to calculate loss across the whole batch
            batch_size, sequence_length, vocab_channels = logits.shape
            
            # Reshape to (Batch * Time, Vocab_Size)
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            
            # Reshape targets to (Batch * Time)
            targets_flattened = targets.view(batch_size * sequence_length)
            
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        """
        Generates new text by repeatedly sampling from the model's predictions.
        indices: (Batch, Time) current context of tokens
        """
        for _ in range(max_new_tokens):
            # 1. Get predictions for the current sequence
            logits, _ = self(indices)

            # 2. Bigram models only care about the very last token in the sequence
            # We pluck out the last 'Time' step: (Batch, Channels)
            last_token_logits = logits[:, -1, :] 

            # 3. Convert raw scores to probabilities
            probabilities = F.softmax(last_token_logits, dim=-1) 

            # 4. Randomly pick the next token based on the probability distribution
            next_token_index = torch.multinomial(probabilities, num_samples=1) 

            # 5. Concatenate the new token to the existing sequence and repeat
            indices = torch.cat((indices, next_token_index), dim=1) 

        return indices

In [None]:
# 1. Initialize the model with the size of our character vocabulary
# 'vocab_size' should be len(chars)
language_model = BigramLanguageModel(vocab_size)

# 2. Perform a forward pass using the training batch we generated earlier
# 'batch_inputs' is the context (X), 'batch_targets' is the correct next character (Y)
next_token_logits, training_loss = language_model(batch_inputs, batch_targets)

# 3. Output the results
print(f"Logits Tensor Shape (Batch, Time, Channels): {next_token_logits.shape}")
print(f"Current Training Loss: {training_loss.item():.4f}")

# --- Validation Logic ---
# Expected initial loss for a random model should be -ln(1/vocab_size)
import math
expected_initial_loss = -math.log(1/vocab_size)
print(f"Expected loss for a random model: {expected_initial_loss:.4f}")

In [None]:
print(detokenize(language_model.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=100)[0].tolist()))

In [None]:
optimizer = torch.optim.AdamW(language_model.parameters(), lr=1e-3)

In [None]:
def train_model(model, optimizer, training_steps=10000, eval_interval=1000):
    """
    Trains the language model for a fixed number of steps.
    Returns the history of the training loss.
    """
    model.train()
    loss_history = []
    
    print(f"=== Starting Training for {training_steps} steps ===")
    
    for step in range(training_steps):
        # 1. Fetch a fresh batch of training data
        input_batch, target_batch = get_batch('train')
        
        # 2. Forward pass: compute predictions and current loss
        _, loss = model(input_batch, target_batch)
        
        # 3. Backward pass: compute gradients and update weights
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        
        # Periodically log progress and save history
        if step % eval_interval == 0 or step == training_steps - 1:
            loss_value = loss.item()
            loss_history.append(loss_value)
            print(f"Step [{step:5d}/{training_steps}] - Training Loss: {loss_value:.4f}")
            
    print(f"Training Complete. Final Loss: {loss.item():.4f}")
    return loss_history


def evaluate_model(model, eval_iterations=200):
    """
    Evaluates the model on both training and validation splits to check for overfitting.
    This replaces the 'test_model' function for Generative Models.
    """
    model.eval()
    split_losses = {}
    with torch.no_grad():
        for split in ['train', 'val']:
            # Accumulate loss over multiple random batches for a stable average
            batch_losses = torch.zeros(eval_iterations)
            for k in range(eval_iterations):
                inputs, targets = get_batch(split)
                _, loss = model(inputs, targets)
                batch_losses[k] = loss.item()
        
            split_losses[split] = batch_losses.mean().item()
    
        print(f"=== Evaluation Results ===")
        print(f"Train Loss: {split_losses['train']:.4f}")
        print(f"Val Loss:   {split_losses['val']:.4f}")
    
        model.train() # Switch back to training mode
    return split_losses

In [None]:
print(detokenize(language_model.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
# 1. Setup Optimizer
optimizer = torch.optim.AdamW(language_model.parameters(), lr=1e-3)

# 2. Train the model
history = train_model(language_model, optimizer, training_steps=5000)

# 3. Evaluate the results
final_metrics = evaluate_model(language_model)

In [None]:
print(detokenize(language_model.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
import matplotlib.pyplot as plt

def plot_learning_curve(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('Bigram Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()

# Call the function using the history from your training
plot_learning_curve(history, eval_interval=1000)

In [None]:
# Constants for dimensions: Batch (sentences), Time (words), Channels (features)
torch.manual_seed(2971)

batch_size, sequence_length, feature_dim = 4, 8, 2
x= torch.randn((batch_size, sequence_length, feature_dim))
# Initialize a tensor to store our 'Bag of Words' averages
# Shape: (4, 8, 2)
sequence_averages = torch.zeros((batch_size, sequence_length, feature_dim))

for b_idx in range(batch_size):
    for t_idx in range(sequence_length):
        # 1. Look at all information from the start up to the current time 't'
        # x_previous represents the "context" or "history"
        x_previous = x[b_idx, :t_idx+1] # Shape: (t_idx+1, feature_dim)

        # 2. Average the features across the time dimension
        # We want to summarize all previous words into one single average vector
        sequence_averages[b_idx, t_idx] = torch.mean(x_previous, dim=0)

In [None]:
x[0]

In [None]:
sequence_averages[0]

In [None]:
tril_ones = torch.tril(torch.ones((3, 3)))
ones = torch.ones((3, 3))
random_matrix = torch.randn((3, 2))
print(f"Tril Matrix:\n{tril_ones}\n")
print(f"Ones Matrix:\n{ones}\n")
print(f"Random Matrix:\n{random_matrix}\n")
print(f"Tril Matrix * Random Matrix:\n{tril_ones @ random_matrix}\n")
print(f"Ones Matrix * Random Matrix:\n{ones @ random_matrix}\n")

In [None]:
# now if we normalize the tril matrix
tril_normalized = tril_ones / tril_ones.sum(dim=1, keepdim=True)
print(f"Normalized Tril Matrix:\n{tril_normalized}\n")
print(f"Normalized Tril Matrix * Random Matrix:\n{tril_normalized @ random_matrix}\n")

In [None]:
#now let's vectorize it and use it in our model since this is the mathematical trick used in attention mechanisms
torch.manual_seed(2971)
weighted_sum = torch.tril(torch.ones((sequence_length, sequence_length)))
weighted_sum = weighted_sum / weighted_sum.sum(dim=1, keepdim=True)
weighted_sum

In [None]:
new_sequence_averages = weighted_sum @ x
torch.allclose(new_sequence_averages, sequence_averages)

In [None]:
#now let's do it again but this time using softmax to get the weights
import torch.nn.functional as F
torch.manual_seed(2971)
tril = torch.tril(torch.ones((sequence_length, sequence_length)))
new_weights = torch.zeros((sequence_length, sequence_length))
new_weights = new_weights.masked_fill(tril == 0, float('-inf'))
new_weights = F.softmax(new_weights, dim=-1)
another_sequence_averages = new_weights @ x
torch.allclose(another_sequence_averages, sequence_averages)

In [None]:
#let's do it again but this time lets see how a single head perform self-attention
torch.manual_seed(2971)
batch_size, sequence_length, feature_dim = 4, 8, 2
x= torch.randn((batch_size, sequence_length, feature_dim))
head_size = 16
# now imagine token is speaking: it's telling every other tokens the following:
key = nn.Linear(feature_dim, head_size, bias=False)   # here is what I have to say
query = nn.Linear(feature_dim, head_size, bias=False) # here is what I'm looking for
value = nn.Linear(feature_dim, head_size, bias=False) # if you find me interesting here is the information I carry


k = key(x)  # (Batch, Time, Head_Size)
q = query(x)  # (Batch, Time, Head_Size)
v = value(x)  # when we apply value on the current x it gives us self-attention, but we can also apply it on other tokens to get cross-attention, something like we do for decoding an encoder

new_weighted_matrix = q @ k.transpose(-2, -1) # (Batch, Time, Head_Size) @ (Batch, Head_Size, Time) -> (Batch, Time, Time)
new_weighted_matrix = new_weighted_matrix.masked_fill(tril == 0, float('-inf')) # we use this so the current token does not speak to future tokens and only learns from previous tokens
# if we delete the above line, the model will be non-causal and will be able to look into the future tokens, like encoder blocks in transformers, but now this structure is more like decoder blocks in transformers
new_weighted_matrix = F.softmax(new_weighted_matrix, dim=-1)



out = new_weighted_matrix @ v  # (Batch, Time, Time) @ (Batch, Time, Feature_Dim) -> (Batch, Time, Feature_Dim)
out.shape

In [None]:
new_weighted_matrix[0]

In [None]:
# one important thing we have to do for the weighted matrix is to scale it, because the dot products can get large in magnitude which can push the softmax into regions where it has extremely small gradients
# and that may cause the model to not learn properly. by scaling the weighted matrix we are going to sharpen the distribution of important nodes.
# we do it by deviding by the square root of head_size before sending it to softmax

new_weighted_matrix = new_weighted_matrix / (head_size ** 0.5)

In [None]:
new_weighted_matrix[0]

In [None]:
#let's create one head of a self-attention mechanism


class Head(nn.Module):
    """ One head of self-attention """

    def __init__(self, feature_dim, head_size):
        super().__init__()
        self.key = nn.Linear(feature_dim, head_size, bias=False)
        self.query = nn.Linear(feature_dim, head_size, bias=False)
        self.value = nn.Linear(feature_dim, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones((block_size, block_size))))

    def forward(self, x):
        batch_size, sequence_length, feature_dim = x.shape

        k = self.key(x)    # (Batch, Time, Head_Size)
        q = self.query(x)  # (Batch, Time, Head_Size)
        v = self.value(x)  # (Batch, Time, Head_Size)

        # Compute attention scores
        attn_scores = q @ k.transpose(-2, -1)  # (Batch, Time, Time)
        attn_scores = attn_scores.masked_fill(self.tril[:sequence_length, :sequence_length] == 0, float('-inf'))
        attn_scores = attn_scores / (k.size(-1) ** 0.5)  # Scale the scores

        attn_weights = F.softmax(attn_scores, dim=-1)  # (Batch, Time, Time)

        out = attn_weights @ v  # (Batch, Time, Head_Size)
        return out

In [None]:
# now that we know how self attention works, let's implement it in our model


# let's build a biagram model with self attention

class SelfAttentionLanguageModel(nn.Module):
    """
    A Bigram Language Model enhanced with Self-Attention mechanism.
    It predicts the next character based on the entire context using self-attention.
    """
    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)  # Assuming max sequence length of block_size
        self.feature_dim = feature_dim
        self.self_attention_head = Head(feature_dim, head_size=feature_dim)





    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape
        
        # 1. Embed the input tokens to get their feature representations
        token_embedding = self.token_embedding_table(indices)  # Shape: (Batch, Time, Feature_Dim)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))  # Shape: (Time, Feature_Dim)
        x = token_embedding + posiitional_indices  # Shape: (Batch, Time, Feature_Dim)
        x = self.self_attention_head(x)  # Apply self-attention
        logits = self.language_model_head(x)  # Initial logits (not used further)


        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices

In [None]:
Self_Attention_model = SelfAttentionLanguageModel(vocab_size, feature_dim=32)


optimizer_Self_Attention = torch.optim.AdamW(Self_Attention_model.parameters(), lr=1e-3)

# 2. Train the model
history = train_model(Self_Attention_model, optimizer_Self_Attention, training_steps=5000)
# 3. Evaluate the results
final_metrics = evaluate_model(Self_Attention_model)

In [None]:
print(detokenize(Self_Attention_model.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_with_Self_Attention(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('Self-Attention Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_with_Self_Attention(history, eval_interval=1000)

In [None]:
# the result still isn't good enough!!! one thing we can do is to stack multiple heads of self-attention and then stack multiple layers of such heads to make it deeper
# so let's try that
# let's create a multi-head self-attention mechanism and stack multiple layers of it

class MultiHeadAttention(nn.Module):
    """ Multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size, feature_dim):

        super().__init__()
        self.heads = nn.ModuleList([Head(feature_dim, head_size) for _ in range(num_heads)])
        

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        
        return out
    

class SelfAttentionMultiHead(nn.Module): 
    """ Multi-head self-attention followed by a feed-forward network """

    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)  # Assuming max sequence length of block_size
        self.multihead_attention = MultiHeadAttention(num_heads=4, head_size=feature_dim // 4, feature_dim= feature_dim)


    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape
        
        # 1. Embed the input tokens to get their feature representations
        token_embedding = self.token_embedding_table(indices)  # Shape: (Batch, Time, Feature_Dim)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))  # Shape: (Time, Feature_Dim)
        x = token_embedding + posiitional_indices  # Shape: (Batch, Time, Feature_Dim)
        x = self.multihead_attention(x)  # Apply self-attention
        logits = self.language_model_head(x)  # Initial logits (not used further)


        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices

In [None]:
multiHead = SelfAttentionMultiHead(vocab_size, feature_dim=32)  

optimizer_Multi_Head = torch.optim.AdamW(multiHead.parameters(), lr=1e-3)


history = train_model(multiHead, optimizer_Multi_Head, training_steps=5000)

final_metrics = evaluate_model(multiHead)

In [None]:
print(detokenize(multiHead.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_Multi_Head(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('Multi Head Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_Multi_Head(history, eval_interval=1000)

In [None]:
#up until now the model goes to calculate the logit too soon and because of that token doesn't have enough time to learn anything
# so let's add a single layer to it and give it the ability to think and learn more

In [None]:
class MultiHeadThinker(nn.Module):
    """ Multi-head self-attention followed by a feed-forward network """

    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)  # Assuming max sequence length of block_size
        self.multihead_attention = MultiHeadAttention(num_heads=4, head_size=feature_dim // num_heads, feature_dim= feature_dim)
        self.feed_forward = nn.Sequential(
            nn.Linear(feature_dim, feature_dim * 4),
            nn.ReLU(),
            nn.Linear(feature_dim * 4, feature_dim)
        )


    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape

        # 1. Embed the input tokens to get their feature representations
        token_embedding = self.token_embedding_table(indices)  # Shape: (Batch, Time, Feature_Dim)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))  # Shape: (Time, Feature_Dim)
        x = token_embedding + posiitional_indices  # Shape: (Batch, Time, Feature_Dim)
        x = self.multihead_attention(x)  # Apply self-attention
        x = self.feed_forward(x)  # Apply feed-forward network
        logits = self.language_model_head(x)  # Initial logits (not used further)


        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices

In [None]:
thinker = MultiHeadThinker(vocab_size, feature_dim=32)  

optimizer_Thinker = torch.optim.AdamW(thinker.parameters(), lr=1e-3)


history = train_model(thinker, optimizer_Thinker, training_steps=5000)

final_metrics = evaluate_model(thinker)

In [None]:
print(detokenize(thinker.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_Thinker(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('Thinker Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_Thinker(history, eval_interval=1000)

In [None]:
#now let's go even deeper to the network and apply multiple layers of multihead
#let's build our block first
class FeedForward(nn.Module):
    """ A simple linear layer followed by a non-linearity """

    def __init__(self, feature_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(feature_dim, feature_dim * 4),
            nn.ReLU(),
            nn.Linear(feature_dim * 4, feature_dim)
        )

    def forward(self, x):
        return self.net(x)



class Block(nn.Module):
    """ Transform block: communication followed by computation """

    def __init__(self, feature_dim , num_head):
        super().__init__()
        head_size = feature_dim // num_head
        self.self_attention_head = MultiHeadAttention(num_head, head_size, feature_dim)
        self.ffwd = FeedForward(feature_dim)
    def forward(self, x):
        x =self.self_attention_head(x)
        x =self.ffwd(x)
        return x   

     
class MultiHeadMultiLayer(nn.Module):
    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)  # Assuming max sequence length of block_size
        self.multihead_attention = MultiHeadAttention(num_heads=4, head_size=feature_dim // 4, feature_dim= feature_dim)
        self.feed_forward = nn.Sequential(
            nn.Linear(feature_dim, feature_dim * 4),
            nn.ReLU(),
            nn.Linear(feature_dim * 4, feature_dim)
        )


    

    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape

        # 1. Embed the input tokens to get their feature representations
        token_embedding = self.token_embedding_table(indices)  # Shape: (Batch, Time, Feature_Dim)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))  # Shape: (Time, Feature_Dim)
        x = token_embedding + posiitional_indices  # Shape: (Batch, Time, Feature_Dim)
        x = self.multihead_attention(x)  # Apply self-attention
        x = self.feed_forward(x)  # Apply feed-forward network
        logits = self.language_model_head(x)  # Initial logits (not used further)


        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices
    

In [None]:
multimodel = MultiHeadMultiLayer(vocab_size, feature_dim=32)
optimizer_MultiModel = torch.optim.AdamW(multimodel.parameters(), lr=1e-3)
history = train_model(multimodel, optimizer_MultiModel, training_steps=5000)
final_metrics = evaluate_model(multimodel)

In [None]:
print(detokenize(multimodel.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_Multi_Layer(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('Multi Head Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_Multi_Layer(history, eval_interval=1000)

In [None]:
class NewMultiHead(nn.Module):
    def __init__(self, num_heads, head_size, feature_dim):
        super().__init__()
        self.head_size = head_size
        self.num_heads = num_heads
        self.feature_dim = feature_dim
        self.heads = nn.ModuleList([Head(feature_dim, head_size) for _ in range(num_heads)])  # Pass feature_dim first
        self.projection = nn.Linear(feature_dim, feature_dim)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        return self.projection(out)

class NewBlock(nn.Module):
    """ Transform block: communication followed by computation """

    def __init__(self, feature_dim, num_head):
        super().__init__()
        self.feature_dim = feature_dim
        self.num_head = num_head
        head_size = feature_dim // num_head
        self.self_attention_head = NewMultiHead(num_heads=num_head, head_size=head_size, feature_dim=feature_dim)
        self.ffwd = NewFeedForward(feature_dim)

    def forward(self, x):
        x = x + self.self_attention_head(x)
        x = x + self.ffwd(x)
        return x

class NewFeedForward(nn.Module):
    """ A simple linear layer followed by a non-linearity """

    def __init__(self, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.net = nn.Sequential(
            nn.Linear(feature_dim, feature_dim * 4),
            nn.ReLU(),
            nn.Linear(feature_dim * 4, feature_dim)
        )

    def forward(self, x):
        return self.net(x)

class NonLinearMultiHead(nn.Module):
    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.vocab_size = vocab_size
        self.blocks = nn.Sequential(
            NewBlock(feature_dim, num_head=4),
            NewBlock(feature_dim, num_head=4),
            NewBlock(feature_dim, num_head=4)
        )
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)

    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape

        token_embedding = self.token_embedding_table(indices)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))
        x = token_embedding + posiitional_indices
        x = self.blocks(x)
        logits = self.language_model_head(x)

        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices


In [None]:
NLMH = NonLinearMultiHead(vocab_size, feature_dim=32)
optimizer_NLMH = torch.optim.AdamW(NLMH.parameters(), lr=1e-3)
NLMH_history = train_model(NLMH, optimizer_NLMH, training_steps=5000)
NLMH_metrics = evaluate_model(NLMH)

In [None]:
print(detokenize(NLMH.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_NLMH(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('NLMH Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_NLMH(NLMH_history, eval_interval=1000)

In [None]:
# now let's add a layernorm to the model
class LayerNorm1d(nn.Module):
    def __init__(self, feature_dim, eps=1e-5):
        super().__init__()
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(feature_dim))
        self.beta = nn.Parameter(torch.zeros(feature_dim))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta
    

In [None]:
# now we are going to define block with layernorm
class BlockWithLayerNorm(nn.Module):
    def __init__(self, feature_dim, num_head):
        super().__init__()
        head_size = feature_dim // num_head
        self.self_attention_head = NewMultiHead(num_head, head_size, feature_dim)
        self.ffwd = NewFeedForward(feature_dim)
        self.ln1 = LayerNorm1d(feature_dim)
        self.ln2 = LayerNorm1d(feature_dim)

    def forward(self, x):
        x = x + self.self_attention_head(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [None]:
# now lets build a model and instead of previous block call blockwithlayernorm on it

class NonLinearLayerNorm(nn.Module):
    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.vocab_size = vocab_size
        self.blocks = nn.Sequential(
            BlockWithLayerNorm(feature_dim, num_head=4),
            BlockWithLayerNorm(feature_dim, num_head=4),
            BlockWithLayerNorm(feature_dim, num_head=4),
            LayerNorm1d(feature_dim)
        )
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)

    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape

        token_embedding = self.token_embedding_table(indices)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))
        x = token_embedding + posiitional_indices
        x = self.blocks(x)
        logits = self.language_model_head(x)

        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices


In [None]:
NLLN = NonLinearLayerNorm(vocab_size, feature_dim=32)
optimizer_NLLN = torch.optim.AdamW(NLLN.parameters(), lr=1e-3)
NLLN_history = train_model(NLLN, optimizer_NLLN, training_steps=5000)
NLLN_metrics = evaluate_model(NLLN)

In [None]:
print(detokenize(NLLN.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_NLLN(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('NLLN Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_NLLN(NLLN_history, eval_interval=1000)

In [None]:
# now let's add dropout to feedforward and multiheadattention and our block
class FeedForwardWithDropout(nn.Module):
    def __init__(self, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.net = nn.Sequential(
            nn.Linear(feature_dim, feature_dim * 4),
            nn.ReLU(),
            nn.Linear(feature_dim * 4, feature_dim),
            nn.Dropout(0.2)
        )

    def forward(self, x):
        return self.net(x)
    
class MultiHeadWithDropout(nn.Module):
    def __init__(self, num_heads, head_size, feature_dim):
        super().__init__()
        self.head_size = head_size
        self.num_heads = num_heads
        self.feature_dim = feature_dim
        self.heads = nn.ModuleList([HeadWithDropout(feature_dim, head_size) for _ in range(num_heads)])  # Pass feature_dim first
        self.projection = nn.Linear(feature_dim, feature_dim)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.projection(out)
        return self.dropout(out)    
    

class BlockWithDropout(nn.Module):
    def __init__(self, feature_dim, num_head):
        super().__init__()
        head_size = feature_dim // num_head
        self.self_attention_head = MultiHeadWithDropout(num_head, head_size, feature_dim)
        self.ffwd = FeedForwardWithDropout(feature_dim)
        self.ln1 = LayerNorm1d(feature_dim)
        self.ln2 = LayerNorm1d(feature_dim)

    def forward(self, x):
        x = x + self.self_attention_head(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x    
    

# we can also add dropout to Head

class HeadWithDropout(nn.Module):
    def __init__(self, feature_dim, head_size):
        super().__init__()
        self.feature_dim = feature_dim
        self.head_size = head_size
        self.key = nn.Linear(feature_dim, head_size, bias=False)
        self.query = nn.Linear(feature_dim, head_size, bias=False)
        self.value = nn.Linear(feature_dim, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        batch_size, sequence_length, feature_dim = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)
        wei = q @ k.transpose(-2, -1) * feature_dim ** -0.5
        wei = wei.masked_fill(self.tril[:sequence_length, :sequence_length] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        out = wei @ v
        return out    
    
#Let's tune some of our Hyperparameters

batch_size = 64
block_size = 256
max_iters = 5000
eval_interval = 500
eval_iters = 200
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
feature_dim = 384
num_head = 6
n_layers = 6
dropout = 0.2
# now let's build a model and instead of previous block call blockwithlayernorm on it

class NonLinearDropout(nn.Module):
    def __init__(self, vocab_size, feature_dim):
        super().__init__()
        self.feature_dim = feature_dim
        self.vocab_size = vocab_size
        self.blocks = nn.Sequential(
            *[BlockWithDropout(feature_dim, num_head) for _ in range(n_layers)],
            LayerNorm1d(feature_dim)
        )
        self.token_embedding_table = nn.Embedding(vocab_size, feature_dim)
        self.language_model_head = nn.Linear(feature_dim, vocab_size)
        self.positional_embedding_table = nn.Embedding(block_size, feature_dim)

    def forward(self, indices, targets=None):
        batch_size, sequence_length = indices.shape

        token_embedding = self.token_embedding_table(indices)
        posiitional_indices = self.positional_embedding_table(torch.arange(sequence_length))
        x = token_embedding + posiitional_indices
        x = self.blocks(x)
        logits = self.language_model_head(x)

        if targets is None:
            loss = None
        else:
            batch_size, sequence_length, vocab_channels = logits.shape
            logits_flattened = logits.view(batch_size * sequence_length, vocab_channels)
            targets_flattened = targets.view(batch_size * sequence_length)
            loss = F.cross_entropy(logits_flattened, targets_flattened)

        return logits, loss

    def generate(self, indices, max_new_tokens):
        for _ in range(max_new_tokens):
            indices_condensed = indices[:, -block_size:]
            logits, _ = self(indices_condensed)
            last_token_logits = logits[:, -1, :]
            probabilities = F.softmax(last_token_logits, dim=-1)
            next_token_index = torch.multinomial(probabilities, num_samples=1)
            indices = torch.cat((indices, next_token_index), dim=1)
        return indices    

In [None]:
NLDO = NonLinearDropout(vocab_size, feature_dim=feature_dim)
optimizer_NLDO = torch.optim.AdamW(NLDO.parameters(), lr=learning_rate)
NLDO_history = train_model(NLDO, optimizer_NLDO, training_steps=max_iters)
NLDO_metrics = evaluate_model(NLDO)

In [None]:
print(detokenize(NLDO.generate(indices=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=500)[0].tolist()))

In [None]:
def plot_learning_curve_NLDO(history, eval_interval):
    """
    Visualizes the loss reduction over time.
    """
    plt.figure(figsize=(10, 6))
    
    # Create the x-axis based on the evaluation intervals
    steps = [i * eval_interval for i in range(len(history))]
    
    plt.plot(steps, history, label='Training Loss', color='#2ecc71', linewidth=2)
    
    # Formatting the chart
    plt.title('NLDO Model Learning Curve', fontsize=14)
    plt.xlabel('Training Steps', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    
    plt.show()



# Call the function using the history from your training
plot_learning_curve_NLLN(NLDO_history, eval_interval=1000)