# miniLLM

### Imports

In [9]:
import numpy as np
import re
from typing import List, Dict, Tuple
import pickle

## Tokenizer

In [10]:
class Tokenizer:
    """Simple character-level tokenizer"""
    def __init__(self):
        self.char_to_idx = {}
        self.idx_to_char = {}
        self.vocab_size = 0

    def fit(self, text: str):
        chars = sorted(list(set(text)))
        self.vocab_size = len(chars)
        self.char_to_idx = {ch: i for i, ch in enumerate(chars)}
        self.idx_to_char = {i: ch for i, ch in enumerate(chars)}

    def encode(self, text: str) -> List[int]:
        return [self.char_to_idx[ch] for ch in text if ch in self.char_to_idx]

    def decode(self, indices: List[int]) -> str:
        return ''.join([self.idx_to_char[i] for i in indices])

## Milti-head Attention
Allows the model to focus on different parts of the imput

In [11]:
class MultiHeadAttention:
    """Multi-head self-attention mechanism"""
    def __init__(self, d_model: int, num_heads: int):
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Initialize weights
        self.W_q = np.random.randn(d_model, d_model) * 0.01
        self.W_k = np.random.randn(d_model, d_model) * 0.01
        self.W_v = np.random.randn(d_model, d_model) * 0.01
        self.W_o = np.random.randn(d_model, d_model) * 0.01

        # Cache for backward pass
        self.cache = {}

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scores = np.matmul(Q, K.transpose(0, 1, 3, 2)) / np.sqrt(self.d_k)

        if mask is not None:
            scores = scores + mask

        attention_weights = self.softmax(scores)
        output = np.matmul(attention_weights, V)
        return output, attention_weights

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.shape

        # Linear projections
        Q = np.matmul(x, self.W_q)
        K = np.matmul(x, self.W_k)
        V = np.matmul(x, self.W_v)

        # Reshape for multi-head attention
        Q = Q.reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
        K = K.reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
        V = V.reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)

        # Apply attention
        attn_output, _ = self.scaled_dot_product_attention(Q, K, V, mask)

        # Concatenate heads
        attn_output = attn_output.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, self.d_model)

        # Final linear projection
        output = np.matmul(attn_output, self.W_o)
        return output

## Feed-Forward Network
Position wise transform applied after attention

In [12]:
class FeedForward:
    """Position-wise feed-forward network"""
    def __init__(self, d_model: int, d_ff: int):
        self.W1 = np.random.randn(d_model, d_ff) * 0.01
        self.b1 = np.zeros(d_ff)
        self.W2 = np.random.randn(d_ff, d_model) * 0.01
        self.b2 = np.zeros(d_model)

    def relu(self, x):
        return np.maximum(0, x)

    def forward(self, x):
        hidden = self.relu(np.matmul(x, self.W1) + self.b1)
        output = np.matmul(hidden, self.W2) + self.b2
        return output

## Transformer Block

In [13]:
class TransformerBlock:
    """Single transformer decoder block"""
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.dropout = dropout

        # Layer norm parameters
        self.gamma1 = np.ones(d_model)
        self.beta1 = np.zeros(d_model)
        self.gamma2 = np.ones(d_model)
        self.beta2 = np.zeros(d_model)

    def layer_norm(self, x, gamma, beta, eps=1e-6):
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)
        return gamma * (x - mean) / np.sqrt(var + eps) + beta

    def forward(self, x, mask=None, training=False):
        # Self-attention with residual connection
        attn_output = self.attention.forward(x, mask)
        x = self.layer_norm(x + attn_output, self.gamma1, self.beta1)

        # Feed-forward with residual connection
        ffn_output = self.ffn.forward(x)
        x = self.layer_norm(x + ffn_output, self.gamma2, self.beta2)

        return x

## MiniLLM

In [14]:
class TransformerLM:
    """Complete transformer language model"""
    def __init__(self, vocab_size: int, d_model: int = 128, num_heads: int = 4,
                 num_layers: int = 4, d_ff: int = 512, max_seq_len: int = 256):
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.max_seq_len = max_seq_len

        # Token embeddings
        self.token_embedding = np.random.randn(vocab_size, d_model) * 0.01

        # Positional encoding
        self.pos_encoding = self._create_positional_encoding(max_seq_len, d_model)

        # Transformer blocks
        self.blocks = [TransformerBlock(d_model, num_heads, d_ff)
                       for _ in range(num_layers)]

        # Output projection
        self.output_proj = np.random.randn(d_model, vocab_size) * 0.01

        # Store all parameters for optimization
        self.params = []
        self._collect_parameters()

    def _create_positional_encoding(self, max_len: int, d_model: int):
        pos_enc = np.zeros((max_len, d_model))
        position = np.arange(0, max_len)[:, np.newaxis]
        div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))

        pos_enc[:, 0::2] = np.sin(position * div_term)
        pos_enc[:, 1::2] = np.cos(position * div_term)
        return pos_enc

    def _collect_parameters(self):
        """Collect all trainable parameters"""
        self.params = [self.token_embedding, self.output_proj]
        for block in self.blocks:
            self.params.extend([
                block.attention.W_q, block.attention.W_k,
                block.attention.W_v, block.attention.W_o,
                block.ffn.W1, block.ffn.b1, block.ffn.W2, block.ffn.b2,
                block.gamma1, block.beta1, block.gamma2, block.beta2
            ])

    def _create_causal_mask(self, seq_len: int):
        mask = np.triu(np.ones((seq_len, seq_len)), k=1)
        mask = mask * -1e9
        return mask

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

    def forward(self, x, training=False):
        batch_size, seq_len = x.shape

        # Embed tokens
        x_embed = self.token_embedding[x]

        # Add positional encoding
        x_embed = x_embed + self.pos_encoding[:seq_len]

        # Create causal mask
        mask = self._create_causal_mask(seq_len)

        # Pass through transformer blocks
        for block in self.blocks:
            x_embed = block.forward(x_embed, mask, training)

        # Project to vocabulary
        logits = np.matmul(x_embed, self.output_proj)
        return logits

    def generate(self, start_tokens: List[int], max_new_tokens: int = 100,
                 temperature: float = 1.0) -> List[int]:
        tokens = start_tokens.copy()

        for _ in range(max_new_tokens):
            # Get context (last max_seq_len tokens)
            context = tokens[-self.max_seq_len:]
            x = np.array([context])

            # Forward pass
            logits = self.forward(x, training=False)

            # Get logits for last token
            next_token_logits = logits[0, -1, :] / temperature

            # Apply softmax
            probs = self.softmax(next_token_logits)

            # Sample from distribution
            next_token = np.random.choice(self.vocab_size, p=probs)
            tokens.append(next_token)

        return tokens

## Trainer Class

In [15]:
class Trainer:
    """Training loop with backpropagation"""
    def __init__(self, model: TransformerLM, learning_rate: float = 0.001):
        self.model = model
        self.lr = learning_rate

        # Adam optimizer parameters
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.eps = 1e-8

        # Initialize momentum and velocity for each parameter
        self.m = [np.zeros_like(p) for p in model.params]
        self.v = [np.zeros_like(p) for p in model.params]
        self.t = 0

    def compute_loss(self, logits, targets):
        """Cross-entropy loss"""
        batch_size, seq_len, vocab_size = logits.shape

        # Flatten logits and targets
        logits_flat = logits.reshape(-1, vocab_size)
        targets_flat = targets.reshape(-1)

        # Compute softmax
        probs = self.softmax(logits_flat)

        # Cross-entropy loss
        loss = -np.mean(np.log(probs[np.arange(len(targets_flat)), targets_flat] + 1e-10))
        return loss, probs

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

    def compute_gradients_numerical(self, x, y, epsilon=1e-4):
        """Numerical gradient computation (simplified for demonstration)"""
        gradients = []
        original_loss, _ = self.compute_loss(self.model.forward(x), y)

        for i, param in enumerate(self.model.params):
            grad = np.zeros_like(param)

            # Sample a few random indices for efficiency
            indices = np.random.choice(param.size, min(100, param.size), replace=False)

            for idx in indices:
                idx_tuple = np.unravel_index(idx, param.shape)

                # Finite difference
                param[idx_tuple] += epsilon
                loss_plus, _ = self.compute_loss(self.model.forward(x), y)
                param[idx_tuple] -= epsilon

                grad[idx_tuple] = (loss_plus - original_loss) / epsilon

            gradients.append(grad)

        return gradients

    def update_parameters(self, gradients):
        """Adam optimizer update"""
        self.t += 1

        for i, (param, grad) in enumerate(zip(self.model.params, gradients)):
            # Update biased first moment estimate
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad

            # Update biased second moment estimate
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grad ** 2)

            # Bias correction
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)

            # Update parameters
            param -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)

    def prepare_batches(self, encoded_text: List[int], batch_size: int, seq_len: int):
        """Prepare training batches"""
        batches = []

        for i in range(0, len(encoded_text) - seq_len - 1, seq_len):
            if len(batches) >= batch_size:
                break

            x = encoded_text[i:i + seq_len]
            y = encoded_text[i + 1:i + seq_len + 1]

            if len(x) == seq_len and len(y) == seq_len:
                batches.append((np.array(x), np.array(y)))

        return batches

    def train(self, text: str, tokenizer: Tokenizer, epochs: int = 10,
              batch_size: int = 4, seq_len: int = 32):
        """Main training loop"""
        encoded = tokenizer.encode(text)
        print(f"Training on {len(encoded)} tokens for {epochs} epochs\n")

        for epoch in range(epochs):
            # Prepare batches
            batches = self.prepare_batches(encoded, batch_size, seq_len)

            if not batches:
                print("Not enough data for batches!")
                break

            epoch_loss = 0

            for batch_idx, (x, y) in enumerate(batches):
                # Add batch dimension
                x_batch = x.reshape(1, -1)
                y_batch = y.reshape(1, -1)

                # Forward pass
                logits = self.model.forward(x_batch)

                # Compute loss
                loss, _ = self.compute_loss(logits, y_batch)
                epoch_loss += loss

                # Backward pass (simplified numerical gradients)
                if batch_idx == 0:  # Only update on first batch for efficiency
                    gradients = self.compute_gradients_numerical(x_batch, y_batch)
                    self.update_parameters(gradients)

            avg_loss = epoch_loss / len(batches)
            print(f"Epoch {epoch + 1}/{epochs} - Loss: {avg_loss:.4f}")

            # Generate sample text every few epochs
            if (epoch + 1) % 5 == 0:
                self.generate_sample(tokenizer)

    def generate_sample(self, tokenizer: Tokenizer):
        """Generate and print a sample"""
        start_text = "Hello"
        start_tokens = tokenizer.encode(start_text)
        generated = self.model.generate(start_tokens, max_new_tokens=50, temperature=0.8)
        text = tokenizer.decode(generated)
        print(f"\nGenerated sample:\n{text}\n")

## Demonstraition

In [19]:
# Sample training text (use more data for better results)
text = """Hello world! This is a simple transformer language model built from scratch.
It can learn to generate text by predicting the next character.
The model uses multi-head attention and feed-forward networks.
With enough training data, it can learn patterns in language.
Hello world! Machine learning is fascinating and powerful.
Neural networks can learn complex patterns from data.
The transformer architecture revolutionized natural language processing.
Attention mechanisms allow models to focus on relevant information.
Deep learning has achieved remarkable results in many domains.
""" * 5  # Repeat for more training data

# Initialize tokenizer
tokenizer = Tokenizer()
tokenizer.fit(text)

print(f"Vocabulary size: {tokenizer.vocab_size}")
print(f"Training text length: {len(text)} characters")

# Initialize model (smaller for faster training)
model = TransformerLM(
    vocab_size=tokenizer.vocab_size,
    d_model=32,
    num_heads=2,
    num_layers=2,
    d_ff=64,
    max_seq_len=64
)

print(f"\nModel initialized")
print("=" * 60)

# Initialize trainer
trainer = Trainer(model, learning_rate=0.01)

# Train the model
print("\nStarting training...")
print("=" * 60)
trainer.train(
    text=text,
    tokenizer=tokenizer,
    epochs=50,
    batch_size=16,
    seq_len=32
)

print("\n" + "=" * 60)
print("Training complete!")
print("=" * 60)

# Generate final samples
print("\nFinal generation samples:\n")
for temp in [0.5, 0.8, 1.0]:
    start_text = "Hello"
    start_tokens = tokenizer.encode(start_text)
    generated = model.generate(start_tokens, max_new_tokens=100, temperature=temp)
    text_out = tokenizer.decode(generated)
    print(f"Temperature {temp}:")
    print(f"{text_out}\n")

Vocabulary size: 38
Training text length: 2915 characters

Model initialized

Starting training...
Training on 2915 tokens for 50 epochs

Epoch 1/50 - Loss: 3.6239
Epoch 2/50 - Loss: 3.6069
Epoch 3/50 - Loss: 3.5896
Epoch 4/50 - Loss: 3.5710
Epoch 5/50 - Loss: 3.5506

Generated sample:
HelloH!rgdkv,ppadco
!HdT!tH,sf!o-zgzH!bDtTD!.M-kri arsW

Epoch 6/50 - Loss: 3.5274
Epoch 7/50 - Loss: 3.5037
Epoch 8/50 - Loss: 3.4788
Epoch 9/50 - Loss: 3.4536
Epoch 10/50 - Loss: 3.4292

Generated sample:
HelloirWtdtiiMceueDaoediv looa
rTuN!HTrriw Nicibn lTdvz

Epoch 11/50 - Loss: 3.4065
Epoch 12/50 - Loss: 3.3860
Epoch 13/50 - Loss: 3.3696
Epoch 14/50 - Loss: 3.3576
Epoch 15/50 - Loss: 3.3479

Generated sample:
HellowwDtTadThiwI.osl m  tl.tlits  Tw MroInsrItdIrvamva

Epoch 16/50 - Loss: 3.3432
Epoch 17/50 - Loss: 3.3413
Epoch 18/50 - Loss: 3.3438
Epoch 19/50 - Loss: 3.3511
Epoch 20/50 - Loss: 3.3624

Generated sample:
Helloaoelsei! sheTtr !eiW sk,rtNgie ta r iarrs se s!io 

Epoch 21/50 - Loss: 3.3769


## Thoughts
This LLM doesn't perform very well but that is it be expected from a model with such a small training sample.
I did learn alot form this, especially about transformers.