# We will implement LLM from scratch

## Import the libraries

We will use pytorch and numpy for the AI, ML computations.
Matplotlib for the visuals

In [1]:
from transformers import AutoTokenizer
import torch
import torch.nn as nn
from torch.nn import functional as F
import matplotlib.pyplot as plt
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


# Params
Here are the configurations for the LLM.

In [2]:
embeddings_dimension = 8 # Also known as "hidden_size" in hugginface, the inner mlp dimension or "intermediete_size" is 8 (embeddings_dimension) * 4 = 32
num_attention_heads = 2
attention_head_size = embeddings_dimension // num_attention_heads # This is the dimension every attention head wil have
num_transformer_blocks = 2
max_context_length = 128 # also known as "max_position_embeddings" in hugginface, is the maximum of tokens the llm can process
vocab_size = 0 # Set to zero initially
eos_token = None # Define EOS token later

### This is only for creating the Plot to visualize the embeddings

In [None]:
def createPlot(nums, xlabel="Embedding Dimensions", title="Embeddings Visualization"):
    # Convert embeddings to numpy for visualization
    embeddings_np = nums.detach().numpy()

    # Get dimensions
    batch_size = embeddings_np.shape[0]
    tokenized_input_length = embeddings_np.shape[1]
    embeddings_dimension = embeddings_np.shape[2]

    # Plotting embeddings
    fig, axes = plt.subplots(batch_size, 1, figsize=(25, 5 * batch_size), squeeze=False)  # Increase figure size for bigger squares
    fig.subplots_adjust(hspace=0.5)

    for batch_idx in range(batch_size):
        ax = axes[batch_idx, 0]
        cax = ax.matshow(embeddings_np[batch_idx], aspect='auto', cmap='viridis')

        # Add color bar for reference
        fig.colorbar(cax, ax=ax)

        # Set labels
        ax.set_xticks(np.arange(embeddings_dimension))
        ax.set_yticks(np.arange(tokenized_input_length))
        ax.set_xticklabels([f'Dim {i}' for i in range(embeddings_dimension)])
        ax.set_yticklabels([f'Token {i}' for i in range(tokenized_input_length)])

        # Rotate the tick labels and set their alignment
        plt.xticks(rotation=90)
        ax.set_xlabel(xlabel)
        ax.set_ylabel("Tokens")

        # Adding numerical values to the plot
        for i in range(tokenized_input_length):
            for j in range(embeddings_dimension):
                text = ax.text(j, i, f'{embeddings_np[batch_idx, i, j]:.2f}', ha='center', va='center', color='white')

        ax.set_title(f"{title} - Batch {batch_idx + 1}")

    plt.show()


def createLossPlot(epochs, losses, title="training"):
    plt.figure(dpi=500)
    plt.plot(epochs, losses, linewidth=1)
    plt.xlabel("Epochs")
    plt.ylabel("losses")
    plt.title(title)
    plt.show()

# Tokenisation

Here we will create a simple Character-Level Tokenizer and the input/training text

Where "S" is the Start of sequence SOS Token
Where "E" is the End of sequence EOS Token

In [None]:
input_text = "SThis is a input textE" # Because its a character leverl tokenizer the SOS is: S and the EOS is: E

# Here we create the Tokenizer
chars = sorted(list(set(input_text)))
vocab_size = len(chars)

# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
tokenize = lambda s: [stoi[c] for c in s] # tokenizer: take a string, output a list of integers
detokenize = lambda l: ''.join([itos[i] for i in l]) # detokenizer: take a list of integers, output a string

In [None]:
tokenized_input = tokenize(input_text)

# Convert tokenized input to a tensor and add batch dimension
tokenized_input_tensor = torch.tensor(tokenized_input).unsqueeze(0)
eos_token = tokenized_input[-1]
sos_token = tokenized_input[0]

print(f"New updated vocab size: {vocab_size}")
print(f"sos_token: {sos_token}")
print(f"eos_token: {eos_token}")
print(f"The tokenized Text (as a normal array): {tokenized_input}")
print(f"The context size of the current input text is: {len(tokenized_input)}")
print(f"The detokenized Tensor: \"{detokenize(tokenized_input)}\"")
print(f"As you can see the space token is the number. {tokenized_input[4]}")

## Layer 1: Embeddings

This will give out multidimensional embeddings (a lot of numbers) that represents the given Tokens.

In [None]:
embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embeddings_dimension, padding_idx=eos_token)
token_embeddings = embedding(torch.tensor(tokenized_input_tensor))

print(f"The shape is: {token_embeddings.shape}")
print(token_embeddings)

print(f"\nFor example the Embeddings that represent the SOS token are: {token_embeddings[0].tolist()}")

## Create the Plot for the Embeddings

In [None]:
createPlot(token_embeddings)

# Layer 2: LayerNorm

In [None]:
post_attention_layernorm = nn.LayerNorm(embeddings_dimension)
normalized_token_embeddings = post_attention_layernorm(token_embeddings)

print(normalized_token_embeddings.shape)
createPlot(normalized_token_embeddings)

# Layer 3: Multihead Attention

In [None]:
class AttentionHead(nn.Module):
    """ one head of self-attention """

    def __init__(self):
        super().__init__()
        self.key_proj = nn.Linear(embeddings_dimension, attention_head_size, bias=False)
        self.query_proj = nn.Linear(embeddings_dimension, attention_head_size, bias=False)
        self.value_proj = nn.Linear(embeddings_dimension, attention_head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(max_context_length, max_context_length)))

        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        # input of size (Batch, Length, Dimensions)
        # output of size (Batch, Length, head size)
        B, L, D = x.shape

        keys = self.key_proj(x)   # (B,T,hs)
        print("keys beggining")
        createPlot(keys, xlabel="Created Key Projection weights", title="key_proj")
        queries = self.query_proj(x) # (B,T,hs)
        print("queries beggining")
        createPlot(queries, xlabel="Created Query Projection weights", title="query_proj")
        values = self.value_proj(x) # (B,T,hs)
        print("values beggining")
        createPlot(values, xlabel="Created Value Projection weights", title="value_proj")
        
        # ATTENTION START
        # compute attention scores ("affinities")
        wei = queries @ keys.transpose(-2,-1) * keys.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        print("keys and queries multiplied")
        createPlot(wei, xlabel="Attention Beginn", title="queries @ keys.T")
        # Mask the input
        wei = wei.masked_fill(self.tril[:L, :L] == 0, float('-inf')) # (B, T, T)
        print("Masked")
        createPlot(wei, xlabel="Masked Attention Scores", title="Casual Attention")

        wei = F.softmax(wei, dim=-1) # (B, T, T)
        print("Softmaxed")
        createPlot(wei, xlabel="Mased Scored Softmax Probabilities", title="Casual Softmax Probabilities")
        wei = self.dropout(wei)
        print("droped out")
        createPlot(wei, xlabel="Attention Scores Droped out", title="Attention Dropout")
        # perform the weighted aggregation of the values
        out = wei @ values # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        print("Attentino head output")
        createPlot(out, xlabel="Final Attenion output weights", title="Final Attention output")
        # ATTENTION END

        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self):
        super().__init__()
        self.heads = nn.ModuleList([AttentionHead() for _ in range(num_attention_heads)])
        self.out_proj = nn.Linear(attention_head_size * num_attention_heads, embeddings_dimension, bias=False)
        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        print("Concatinated attention outputs")
        createPlot(out, xlabel="Concatinated Attention Heads output weights", title="Concatinated Attention Heads")
        print("Final output of multihead attention")
        out = self.out_proj(out)
        createPlot(out, xlabel="Final Multi-Head-Attenion output", title="out_proj")
        return self.dropout(out)

In [None]:
mha = MultiHeadAttention()
print(mha)

In [None]:
mha_output = mha(normalized_token_embeddings)
print(mha_output.shape)
print(mha_output)

# Layer 4: MLP

In [None]:
class MLP(nn.Module):
    def __init__(self):
        super().__init__()

        self.gate_proj = nn.Linear(embeddings_dimension, 4 * embeddings_dimension, bias=False)
        self.up_proj = nn.Linear(embeddings_dimension, 4 * embeddings_dimension, bias=False)

        self.down_proj = nn.Linear(4 * embeddings_dimension, embeddings_dimension, bias=False)

        self.act_fn = nn.SELU()

    def forward(self, x):
        gate = self.gate_proj(x)
        print("gate_proj")
        createPlot(gate, xlabel="created weights in the gate_proj", title="gate_proj")

        up = self.up_proj(x)
        print("up_proj")
        createPlot(up, xlabel="created weights in the up_proj", title="up_proj")

        x = self.act_fn(gate * up)
        print("multiplied and activated")
        createPlot(x, xlabel="Matrix Multiplied and Activated weights", title="self.act_fn(gate * up)")

        down = self.down_proj(x)
        print("down_proj")
        createPlot(down, xlabel="Final MLP output, and created weights in the down_proj", title="down_proj")
        return down

In [None]:
mlp = MLP()
print(mlp)

In [None]:
mlp_output = mlp(mha_output)

print(mlp_output.shape)
print(mlp_output)

## And we're done, lets create the Blocks and everything else

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        self.attn = MultiHeadAttention()
        self.mlp = MLP()

        self.ln1 = nn.LayerNorm(embeddings_dimension)
        self.ln2 = nn.LayerNorm(embeddings_dimension)

    def forward(self, x):
        attn_o = self.attn(self.ln1(x))
        print("Transformer Block attn output:")
        createPlot(attn_o, xlabel="Transformer Block attn output scores", title="self.attn(self.ln1(x))")

        x = x + attn_o
        print("Transformer Block first Residual Conection:")
        createPlot(x, xlabel="Transformer Block first Residual Conection scores", title="x + attn_o")

        mlp_o = self.mlp(self.ln2(x))
        print("Transformer Block mlp output:")
        createPlot(mlp_o, xlabel="Transformer Block mlp output scores", title="self.mlp(self.ln2(x))")

        x = x + mlp_o
        print("Transformer Block second Residual Conection and final Output:")
        createPlot(x, xlabel="ransformer Block second Residual Conection and final output scores", title="x + mlp_o")
        return x

In [None]:
transformer_block = TransformerBlock()
print(transformer_block)

In [None]:
transformer_block(token_embeddings)

# Creating the full model

In [None]:
# Sinusoidal positional encoding
def get_sinusoidal_positional_encoding(seq_len, d_model):
    pe = torch.zeros(seq_len, d_model)
    position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0)
    return pe

In [None]:
class LlamaModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.positional_encoding = get_sinusoidal_positional_encoding(max_context_length, embeddings_dimension)
        self.transformer_blocks = nn.Sequential(*[TransformerBlock() for _ in range(num_transformer_blocks)])
        self.ln_f = nn.LayerNorm(embeddings_dimension)
        self.lm_head = nn.Linear(embeddings_dimension, vocab_size)

    def forward(self, x):
        x = x + self.positional_encoding[:, :x.size(1), :]
        print(f"Positionaly Encoded Embeddings:")
        createPlot(x, xlabel="Positional Encodings", title="embeddings.float() + self.positional_encoding[:, :x.size(1), :]")
        
        x = self.transformer_blocks(x) # (B,T,C)
        print(f"output transformer blocks:")
        createPlot(x, xlabel="outputs of the transformer blocks", title="self.transformer_blocks(x)")

        x = self.ln_f(x) # (B,T,C)
        print(f"layer norm:")
        createPlot(x, xlabel="layer norm of the attention scores from the blocks", title="self.ln_f(x)")

        logits = self.lm_head(x) # (B,T,vocab_size)
        print(f"output lm heads (logits):")
        createPlot(x, xlabel="Final output logits of the LLM", title="self.lm_head(x)")

        return logits

In [None]:
model = LlamaModel()
print(model)

## Forward Pass the created embeddings through the Llama

In [None]:
logits = model(token_embeddings)
# so the output of the transformer is the shape torch.Size([22, 14]) the input was torch.Size([22, 13]) the last layer in the shape is the next generated token

In [None]:
print(logits.shape)
print(logits)

# Inference

In [None]:
# focus only on the last time step eg the last embeddings layer
last_logits = logits[:, -1, :] # becomes (B, D)

print(f"The LLm basicaly outputs the embeddings of the next predicted Token:\n{last_logits.tolist()}")

# apply softmax to get probabilities
probs = F.softmax(last_logits, dim=-1) # (B, D)

# sample from the distribution
next_token = torch.multinomial(probs, num_samples=1) # (B, 1)

print(next_token)

# Detokenize the generated output Token

In [None]:
the_generated_text = detokenize([next_token.item()])

print(f"The next generated Token is: \"{the_generated_text}\"")

# All in one

In [None]:
class AttentionHead(nn.Module):
    """ one head of self-attention """

    def __init__(self):
        super().__init__()
        self.key_proj = nn.Linear(embeddings_dimension, attention_head_size, bias=False)
        self.query_proj = nn.Linear(embeddings_dimension, attention_head_size, bias=False)
        self.value_proj = nn.Linear(embeddings_dimension, attention_head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(max_context_length, max_context_length)))

        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B, L, D = x.shape

        keys = self.key_proj(x)   # (B,T,hs)
        queries = self.query_proj(x) # (B,T,hs)
        values = self.value_proj(x) # (B,T,hs)
        
        # ATTENTION START
        # compute attention scores ("affinities")
        wei = queries @ keys.transpose(-2,-1) * keys.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:L, :L] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        out = wei @ values # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        # ATTENTION END
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self):
        super().__init__()
        self.heads = nn.ModuleList([AttentionHead() for _ in range(num_attention_heads)])
        self.out_proj = nn.Linear(attention_head_size * num_attention_heads, embeddings_dimension, bias=False)
        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.out_proj(out)
        return self.dropout(out)
    

class MLP(nn.Module):
    def __init__(self):
        super().__init__()

        self.gate_proj = nn.Linear(embeddings_dimension, 4 * embeddings_dimension, bias=False)
        self.down_proj = nn.Linear(4 * embeddings_dimension, embeddings_dimension, bias=False)
        self.up_proj = nn.Linear(embeddings_dimension, 4 * embeddings_dimension, bias=False)
        self.act_fn = nn.SELU()

    def forward(self, x):
        return self.down_proj(self.act_fn(self.gate_proj(x) * self.up_proj(x)))
    
class TransformerBlock(nn.Module):
    def __init__(self):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        self.attn = MultiHeadAttention()
        self.mlp = MLP()

        self.ln1 = nn.LayerNorm(embeddings_dimension)
        self.ln2 = nn.LayerNorm(embeddings_dimension)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x
    
class LlamaModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embeddings_dimension, padding_idx=eos_token)
        self.positional_encoding = get_sinusoidal_positional_encoding(max_context_length, embeddings_dimension)
        self.transformer_blocks = nn.Sequential(*[TransformerBlock() for _ in range(num_transformer_blocks)])
        self.ln_f = nn.LayerNorm(embeddings_dimension)
        self.lm_head = nn.Linear(embeddings_dimension, vocab_size)

    def forward(self, x, targets=None):
        x = self.embeddings(x) + self.positional_encoding[:, :x.size(1), :]
        x = self.transformer_blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)
        
        if targets is None:
            loss = None
        else:
            B, L, D = logits.shape
            logits = logits.view(B*L, D)
            targets = targets.view(B*L)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -max_context_length:]
            new_logits, _ = self(idx_cond)
            new_logits = new_logits[:, -1, :]
            probs = F.softmax(new_logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

In [None]:
llama = LlamaModel()
print(sum(p.numel() for p in llama.parameters())/1e6, 'M parameters')
print(llama)

In [None]:
output = llama.generate(tokenized_input_tensor, max_new_tokens=50)
out = output[0].tolist()
generated_text = detokenize(out)
print(generated_text)

# Lets train it

In [None]:
epochs = []
val_losses = []
train_losses = []
batch_size = 16 # how many independent sequences will we process in parallel?
max_epochs = 100
eval_interval = 10
eval_epochs = 200

# Ensure tokenized_input has sufficient length for max_context_length
if len(tokenized_input) < max_context_length:
    tokenized_input += [eos_token] * (max_context_length - len(tokenized_input) - 1)

# Train and validation split
data = torch.tensor(tokenized_input, dtype=torch.long)
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

# After that the traing dataset looks like this "SThis is a input textEEEEEEEEEEEEEEEEEEE......"

# data loading
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(0, max(len(data) - max_context_length, 1), (batch_size,))
    x = torch.stack([data[i:i+max_context_length] for i in ix])
    y = torch.stack([data[i+1:i+max_context_length+1] for i in ix])

    # Pad sequences if necessary
    if x.size(1) < max_context_length:
        pad_size = max_context_length - x.size(1)
        pad_x = torch.full((batch_size, pad_size), eos_token, dtype=torch.long)
        pad_y = torch.full((batch_size, pad_size), eos_token, dtype=torch.long)
        x = torch.cat([x[:, :-1], pad_x], dim=1)
        y = torch.cat([y, pad_y], dim=1)
    
    return x.to("cpu"), y.to("cpu")


@torch.no_grad()
def estimate_loss():
    out = {}
    llama.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_epochs)
        for k in range(eval_epochs):
            X, Y = get_batch(split)
            logits, loss = llama(X, targets=Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    llama.train()
    return out

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(llama.parameters(), lr=0.0005)

for epoch in range(max_epochs - 1):
    # every once in a while evaluate the loss on train and val sets
    if epoch % eval_interval == 0 or epoch == max_epochs - 1:
        losses = estimate_loss()
        epochs.append(epoch)
        train_losses.append(losses['train'])
        val_losses.append(losses['val'])
        print(f"step {epoch}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = llama(xb, targets=yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

torch.save(llama.state_dict(), "pytorch_model.pth")
createLossPlot(epochs, train_losses)
createLossPlot(epochs, val_losses, title="Validation")

# Let's test our trained model

The start input is "SThis" and we want it to complete is to the training dataset.

So the expected Output should be "SThis is a input textEEEEEEEEEEEEEEEEEEE......"

In [None]:
llama.load_state_dict(torch.load("pytorch_model.pth"))

test_input_text = "SThis"
test_tokenized_input = tokenize(input_text)
test_tokenized_input_tensor = torch.tensor(tokenized_input).unsqueeze(0)
output = llama.generate(test_tokenized_input_tensor, max_new_tokens=50)
out = output[0].tolist()
generated_text = detokenize(out)
print(generated_text)