## Simple demo for LLM training

A simplified Transformer-based Language Model using NumPy. 

Note: This implementation uses mocked gradients and does not perform actual backpropagation for all components. 

Implementing true gradients for a Transformer manually is highly complex and not practical; hence, this script serves educational purposes only.

In [1]:
import numpy as np

# Seed for reproducibility
np.random.seed(42)

# -------------------------------
# 1. Dataset Preparation
# -------------------------------

# Define vocabulary
vocab = ['<PAD>', 'A', 'B', 'C', 'D']
vocab_size = len(vocab)
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for idx, word in enumerate(vocab)}

# Sample input sequences: [batch_size, seq_length]
X = np.array([
    [1, 2, 3],  # First sequence: A B C
    [2, 4, 1]   # Second sequence: B D A
])

# Sample target sequences: [batch_size, seq_length]
targets = np.array([
    [2, 3, 4],  # Targets: B C D
    [4, 1, 2]   # Targets: D A B
])

print("Input Sequences (X):\n", X)
print("\nTarget Sequences:\n", targets)

# -------------------------------
# 2. Model Components
# -------------------------------

# Positional Encoding
def get_positional_encoding(seq_length, embed_size):
    PE = np.zeros((seq_length, embed_size))
    for pos in range(seq_length):
        for i in range(0, embed_size, 2):
            PE[pos, i] = np.sin(pos / (10000 ** ((2 * i)/embed_size)))
            if i + 1 < embed_size:
                PE[pos, i + 1] = np.cos(pos / (10000 ** ((2 * (i + 1))/embed_size)))
    return PE

class MultiHeadSelfAttention:
    def __init__(self, embed_size, num_heads):
        assert embed_size % num_heads == 0, "Embedding size must be divisible by number of heads."
        self.embed_size = embed_size
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads
        
        # Initialize weight matrices for Q, K, V
        self.W_Q = np.random.randn(embed_size, embed_size) / np.sqrt(embed_size)
        self.W_K = np.random.randn(embed_size, embed_size) / np.sqrt(embed_size)
        self.W_V = np.random.randn(embed_size, embed_size) / np.sqrt(embed_size)
        
        # Output weight matrix
        self.W_O = np.random.randn(embed_size, embed_size) / np.sqrt(embed_size)
    
    def forward(self, X):
        """
        Forward pass for multi-head self-attention.
        
        Args:
            X: Input embeddings of shape [batch_size, seq_length, embed_size]
        
        Returns:
            Output after attention of shape [batch_size, seq_length, embed_size]
        """
        batch_size, seq_length, embed_size = X.shape
        
        # Compute Q, K, V
        Q = X @ self.W_Q  # [batch_size, seq_length, embed_size]
        K = X @ self.W_K
        V = X @ self.W_V
        
        # Split into heads
        Q = Q.reshape(batch_size, seq_length, self.num_heads, self.head_dim).transpose(0,2,1,3)  # [batch, heads, seq, head_dim]
        K = K.reshape(batch_size, seq_length, self.num_heads, self.head_dim).transpose(0,2,1,3)
        V = V.reshape(batch_size, seq_length, self.num_heads, self.head_dim).transpose(0,2,1,3)
        
        # Scaled Dot-Product Attention
        scores = (Q @ K.transpose(0,1,3,2)) / np.sqrt(self.head_dim)  # [batch, heads, seq, seq]
        attn_weights = softmax(scores, axis=-1)  # [batch, heads, seq, seq]
        attn_output = attn_weights @ V  # [batch, heads, seq, head_dim]
        
        # Concatenate heads
        attn_output = attn_output.transpose(0,2,1,3).reshape(batch_size, seq_length, embed_size)  # [batch, seq, embed_size]
        
        # Final linear layer
        output = attn_output @ self.W_O  # [batch, seq, embed_size]
        
        return output

class FeedForward:
    def __init__(self, embed_size, hidden_dim):
        # Initialize weights and biases
        self.W1 = np.random.randn(embed_size, hidden_dim) / np.sqrt(embed_size)
        self.b1 = np.zeros(hidden_dim)
        self.W2 = np.random.randn(hidden_dim, embed_size) / np.sqrt(hidden_dim)
        self.b2 = np.zeros(embed_size)
    
    def forward(self, X):
        """
        Forward pass for feed-forward network.
        
        Args:
            X: Input of shape [batch_size, seq_length, embed_size]
        
        Returns:
            Output of shape [batch_size, seq_length, embed_size]
        """
        self.X = X
        self.Z1 = X @ self.W1 + self.b1  # [batch, seq, hidden]
        self.A1 = np.maximum(0, self.Z1)  # ReLU
        self.Z2 = self.A1 @ self.W2 + self.b2  # [batch, seq, embed]
        return self.Z2

class LayerNorm:
    def __init__(self, embed_size, epsilon=1e-6):
        self.epsilon = epsilon
        # Initialize scale (gamma) and shift (beta) parameters
        self.gamma = np.ones((1, 1, embed_size))
        self.beta = np.zeros((1, 1, embed_size))
    
    def forward(self, X):
        """
        Forward pass for layer normalization.
        
        Args:
            X: Input of shape [batch_size, seq_length, embed_size]
        
        Returns:
            Normalized output of same shape as X
        """
        self.mean = np.mean(X, axis=-1, keepdims=True)
        self.variance = np.var(X, axis=-1, keepdims=True)
        self.X_norm = (X - self.mean) / np.sqrt(self.variance + self.epsilon)
        out = self.gamma * self.X_norm + self.beta
        return out

class TransformerEncoderLayer:
    def __init__(self, embed_size, num_heads, hidden_dim):
        self.attention = MultiHeadSelfAttention(embed_size, num_heads)
        self.attn_layer_norm = LayerNorm(embed_size)
        
        self.ffn = FeedForward(embed_size, hidden_dim)
        self.ffn_layer_norm = LayerNorm(embed_size)
    
    def forward(self, X):
        """
        Forward pass for a single Transformer encoder layer.
        
        Args:
            X: Input of shape [batch_size, seq_length, embed_size]
        
        Returns:
            Output of same shape as X
        """
        # Multi-Head Self-Attention with residual connection and layer normalization
        attn_output = self.attention.forward(X)  # [batch, seq, embed]
        X = self.attn_layer_norm.forward(X + attn_output)  # [batch, seq, embed]
        
        # Feed-Forward Network with residual connection and layer normalization
        ffn_output = self.ffn.forward(X)  # [batch, seq, embed]
        X = self.ffn_layer_norm.forward(X + ffn_output)  # [batch, seq, embed]
        
        return X

# Initialize Output Layer weights and biases
# [embed_size, vocab_size]
W_out = np.random.randn(8, vocab_size) / np.sqrt(8)
b_out = np.zeros(vocab_size)

def softmax(x, axis=-1):
    """
    Applies the softmax function to the input array along the specified axis.
    
    Args:
        x: Input array.
        axis: Axis along which to apply softmax.
    
    Returns:
        Softmax applied array.
    """
    shifted_x = x - np.max(x, axis=axis, keepdims=True)
    exp_x = np.exp(shifted_x)
    return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

def cross_entropy_loss(probs, targets):
    """
    Computes the average cross-entropy loss over the batch.
    
    Args:
        probs: Predicted probabilities of shape [batch_size, seq_length, vocab_size]
        targets: True class indices of shape [batch_size, seq_length]
    
    Returns:
        float: Average cross-entropy loss
    """
    batch_size, seq_length, vocab_size = probs.shape
    n = batch_size * seq_length
    
    # Ensure probabilities are clipped to avoid log(0)
    epsilon = 1e-12
    probs = np.clip(probs, epsilon, 1. - epsilon)
    
    # Flatten probabilities and targets
    probs_flat = probs.reshape(n, vocab_size)
    targets_flat = targets.flatten()
    
    # Compute log probabilities of the correct classes
    log_probs = -np.log(probs_flat[np.arange(n), targets_flat])
    
    # Compute average loss
    loss = np.sum(log_probs) / n
    return loss

def compute_accuracy(probs, targets):
    """
    Computes the accuracy over the batch.
    
    Args:
        probs: Predicted probabilities of shape [batch_size, seq_length, vocab_size]
        targets: True class indices of shape [batch_size, seq_length]
    
    Returns:
        float: Accuracy (between 0 and 1)
    """
    predictions = np.argmax(probs, axis=-1)  # [batch_size, seq_length]
    correct = (predictions == targets).astype(float)
    accuracy = np.mean(correct)
    return accuracy

class TransformerLanguageModel_NumPy:
    def __init__(self, vocab_size, embed_size, num_heads, hidden_dim, num_layers, seq_length):
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.num_heads = num_heads
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.seq_length = seq_length
        
        # Initialize Embedding Matrix
        self.E = np.random.randn(vocab_size, embed_size) / np.sqrt(vocab_size)
        
        # Initialize Positional Encoding
        self.positional_encoding = get_positional_encoding(seq_length, embed_size)  # [seq_length, embed_size]
        
        # Initialize Transformer Encoder Layers
        self.encoder_layers = [TransformerEncoderLayer(embed_size, num_heads, hidden_dim) for _ in range(num_layers)]
        
        # Initialize Output Layer
        self.W_out = np.random.randn(embed_size, vocab_size) / np.sqrt(embed_size)
        self.b_out = np.zeros(vocab_size)
    
    def forward(self, X):
        """
        Forward pass through the entire Transformer model.
        
        Args:
            X: Input sequences of shape [batch_size, seq_length]
        
        Returns:
            logits: [batch_size, seq_length, vocab_size]
            probs: [batch_size, seq_length, vocab_size]
        """
        batch_size, seq_length = X.shape
        
        # Token Embedding
        X_emb = self.E[X]  # [batch_size, seq_length, embed_size]
        
        # Add Positional Encoding
        X_emb += self.positional_encoding  # Broadcasting [seq_length, embed_size]
        
        # Pass through Transformer Encoder Layers
        for layer in self.encoder_layers:
            X_emb = layer.forward(X_emb)  # [batch_size, seq_length, embed_size]
        
        # Output Layer
        logits = X_emb @ self.W_out + self.b_out  # [batch_size, seq_length, vocab_size]
        
        # Apply softmax to get probabilities
        probs = softmax(logits, axis=-1)  # [batch_size, seq_length, vocab_size]
        
        return logits, probs
    
# Initialize model
model = TransformerLanguageModel_NumPy(
    vocab_size=vocab_size,
    embed_size=8,
    num_heads=2,
    hidden_dim=16,
    num_layers=1,
    seq_length=3
)

# Training parameters
learning_rate = 0.01
epochs = 1000

for epoch in range(1, epochs + 1):
    # Forward Pass
    logits, probs = model.forward(X)  # [batch_size, seq_length, vocab_size]
    
    # Compute Loss
    loss = cross_entropy_loss(probs, targets)
    
    # Compute Accuracy
    accuracy = compute_accuracy(probs, targets)
    
    # Backward Pass (Mocked Gradients)
    # Proper backpropagation is not implemented; thus, parameters are not effectively updated
    # This mock serves only for structural demonstration
    # To implement real training, gradients for all parameters must be computed accurately
    
    # Mocked gradient for output layer
    grad_logits = probs.copy()  # [batch_size, seq_length, vocab_size]
    grad_logits[np.arange(X.shape[0])[:, None], 
                np.arange(X.shape[1]), 
                targets] -= 1
    grad_logits /= (X.shape[0] * X.shape[1])  # Normalize gradients
    
    # Compute gradients for output layer
    X_emb = model.E[X] + model.positional_encoding  # [batch_size, seq_length, embed_size]
    for layer in model.encoder_layers:
        X_emb = layer.forward(X_emb)  # [batch_size, seq_length, embed_size]
    grad_W_out = X_emb.reshape(-1, model.embed_size).T @ grad_logits.reshape(-1, vocab_size)  # [embed_size, vocab_size]
    grad_b_out = np.sum(grad_logits, axis=(0,1))  # [vocab_size]
    
    # Update Output Layer Parameters
    model.W_out -= learning_rate * grad_W_out
    model.b_out -= learning_rate * grad_b_out
    
    # (No gradient computations for embedding or encoder layers)
    
    # Logging
    if epoch % 100 == 0 or epoch == 1:
        print(f"Epoch {epoch:4d}: Loss = {loss:.4f}, Accuracy = {accuracy*100:.2f}%")

def decode_predictions(logits, idx_to_word):
    """
    Decodes logits to predicted token indices.
    
    Args:
        logits: [batch_size, seq_length, vocab_size]
        idx_to_word: Dictionary mapping indices to words
    
    Returns:
        List of predicted token sequences.
    """
    preds = np.argmax(logits, axis=-1)  # [batch_size, seq_length]
    predicted_sequences = []
    for seq in preds:
        predicted_seq = [idx_to_word[idx] for idx in seq]
        predicted_sequences.append(predicted_seq)
    return predicted_sequences

# Get predictions
predicted_sequences = decode_predictions(logits, idx_to_word)

for i in range(len(X)):
    input_seq = [idx_to_word[idx] for idx in X[i]]
    target_seq = [idx_to_word[idx] for idx in targets[i]]
    predicted_seq = predicted_sequences[i]
    print(f"\nSequence {i+1}:")
    print(f"Input:    {' '.join(input_seq)}")
    print(f"Target:   {' '.join(target_seq)}")
    print(f"Predicted:{' '.join(predicted_seq)}")



Input Sequences (X):
 [[1 2 3]
 [2 4 1]]

Target Sequences:
 [[2 3 4]
 [4 1 2]]
Epoch    1: Loss = 1.8555, Accuracy = 33.33%
Epoch  100: Loss = 1.1927, Accuracy = 50.00%
Epoch  200: Loss = 0.8908, Accuracy = 83.33%
Epoch  300: Loss = 0.7225, Accuracy = 100.00%
Epoch  400: Loss = 0.6155, Accuracy = 100.00%
Epoch  500: Loss = 0.5397, Accuracy = 100.00%
Epoch  600: Loss = 0.4822, Accuracy = 100.00%
Epoch  700: Loss = 0.4366, Accuracy = 100.00%
Epoch  800: Loss = 0.3991, Accuracy = 100.00%
Epoch  900: Loss = 0.3677, Accuracy = 100.00%
Epoch 1000: Loss = 0.3408, Accuracy = 100.00%

Sequence 1:
Input:    A B C
Target:   B C D
Predicted:B C D

Sequence 2:
Input:    B D A
Target:   D A B
Predicted:D A B
