<a href="https://colab.research.google.com/github/Vishal-113/NLP4-/blob/main/Mini_Transformer_Encoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import sys

# Set a consistent random seed for reproducibility
np.random.seed(42)

# --- Hyperparameters and Data Setup ---

# 1. Use a small dataset (10 short sentences)
sentences = [
    "The quick brown fox jumps over the lazy dog.",
    "A journey of a thousand miles begins with a single step.",
    "To be or not to be that is the question.",
    "All that glitters is not gold.",
    "Where there is a will there is a way.",
    "The early bird catches the worm.",
    "An apple a day keeps the doctor away.",
    "Practice makes perfect.",
    "Life is what happens when you're busy making other plans.",
    "If you want to live a happy life tie it to a goal not to people or things."
]

# Parameters
D_MODEL = 32        # Embedding dimension
MAX_LEN = 15        # Max sequence length (for padding)
NUM_HEADS = 4       # Number of attention heads
D_K = D_MODEL // NUM_HEADS  # Dimension of Q, K, V for each head
D_FF = 128          # Inner dimension of the Feed-Forward Network
NUM_LAYERS = 1      # Number of Encoder layers

PAD_IDX = 0
UNK_IDX = 1

# --- Tokenization and Embedding ---

def build_vocab(sentences):
    """Tokenizes text and builds a simple word-to-index mapping."""
    word_to_idx = {}
    for sentence in sentences:
        # Simple cleaning
        tokens = sentence.lower().replace('.', '').replace(',', '').strip().split()
        for word in tokens:
            if word not in word_to_idx:
                word_to_idx[word] = len(word_to_idx)

    # Add special tokens
    word_to_idx = {word: idx + 2 for word, idx in word_to_idx.items()}
    word_to_idx['<pad>'] = PAD_IDX
    word_to_idx['<unk>'] = UNK_IDX
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    return word_to_idx, idx_to_word

def tokenize_batch(sentences, word_to_idx, max_len):
    """Converts sentences to padded token ID tensors."""
    batch_tokens = []
    for sentence in sentences:
        tokens = sentence.lower().replace('.', '').replace(',', '').strip().split()
        indices = [word_to_idx.get(token, UNK_IDX) for token in tokens]

        # Padding
        padded_indices = indices + [PAD_IDX] * (max_len - len(indices))
        batch_tokens.append(padded_indices[:max_len])

    return np.array(batch_tokens)

# Build vocab and tokenize data
word_to_idx, idx_to_word = build_vocab(sentences)
VOCAB_SIZE = len(word_to_idx)
token_batch = tokenize_batch(sentences, word_to_idx, MAX_LEN)

# Embedding Layer Weights (randomly initialized)
embedding_weights = np.random.randn(VOCAB_SIZE, D_MODEL) * 0.01

def embed_tokens(token_ids):
    """Looks up token IDs in the embedding matrix."""
    return embedding_weights[token_ids]

# --- 2. Sinusoidal Positional Encoding ---

def positional_encoding(max_len, d_model):
    """Generates the Sinusoidal Positional Encoding matrix."""
    pe = np.zeros((max_len, d_model))
    position = np.arange(0, max_len)[:, np.newaxis]

    # Calculate the division term: 1 / 10000^(2i/d_model)
    div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))

    # Apply sin to even indices (2i)
    pe[:, 0::2] = np.sin(position * div_term)
    # Apply cos to odd indices (2i + 1)
    pe[:, 1::2] = np.cos(position * div_term)

    # Add batch dimension for broadcasting: (1, max_len, d_model)
    return pe[np.newaxis, :, :]

# --- 3. Implement Add & Norm (Layer Normalization) ---

class LayerNorm:
    """Standard Layer Normalization (computes mean/std across the last axis)."""
    def __init__(self, d_model, eps=1e-6):
        # Learnable parameters (gamma/weight and beta/bias)
        self.gamma = np.ones(d_model)
        self.beta = np.zeros(d_model)
        self.eps = eps # Epsilon to prevent division by zero

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        mean = x.mean(axis=-1, keepdims=True)
        std = x.std(axis=-1, keepdims=True)
        # Normalize: (x - mean) / std_plus_eps
        normalized = (x - mean) / (std + self.eps)
        # Apply learnable scale and shift
        return normalized * self.gamma + self.beta

# --- 4. Implement Self-Attention ---

def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    Computes Scaled Dot-Product Attention.
    Q, K, V shape: (batch_size, num_heads, seq_len, d_k)
    """
    d_k = Q.shape[-1]

    # 1. Compute scores: Q * K_T / sqrt(d_k)
    # K.transpose(0, 1, 3, 2) results in (batch_size, num_heads, d_k, seq_len)
    scores = np.matmul(Q, K.transpose(0, 1, 3, 2)) / np.sqrt(d_k)

    # 2. Apply mask (if any) to prevent attention to padding
    if mask is not None:
        # Mask is True (1.0) at padding positions, so we add a huge negative value
        scores += mask * -1e9

    # 3. Softmax to get attention weights
    # Stable softmax: subtract max for numerical stability
    exp_scores = np.exp(scores - np.max(scores, axis=-1, keepdims=True))
    attention_weights = exp_scores / np.sum(exp_scores, axis=-1, keepdims=True)

    # 4. Multiply with V
    output = np.matmul(attention_weights, V)

    return output, attention_weights

# --- 4. Implement Multi-Head Attention ---

class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Linear projections for Q, K, V, and Output (randomly initialized)
        self.W_Q = np.random.randn(d_model, d_model) * 0.01
        self.W_K = np.random.randn(d_model, d_model) * 0.01
        self.W_V = np.random.randn(d_model, d_model) * 0.01
        self.W_O = np.random.randn(d_model, d_model) * 0.01

    def split_heads(self, x):
        # Reshape to (batch_size, seq_len, num_heads, d_k)
        batch_size, seq_len, d_model = x.shape
        x = x.reshape(batch_size, seq_len, self.num_heads, self.d_k)
        # Transpose to (batch_size, num_heads, seq_len, d_k)
        return x.transpose(0, 2, 1, 3)

    def combine_heads(self, x):
        # x shape: (batch_size, num_heads, seq_len, d_k)
        batch_size, num_heads, seq_len, d_k = x.shape
        # Transpose back to (batch_size, seq_len, num_heads, d_k)
        x = x.transpose(0, 2, 1, 3)
        # Reshape to (batch_size, seq_len, d_model)
        return x.reshape(batch_size, seq_len, self.d_model)

    def forward(self, Q, K, V, mask=None):

        # 1. Linear projections
        Q_proj = np.matmul(Q, self.W_Q)
        K_proj = np.matmul(K, self.W_K)
        V_proj = np.matmul(V, self.W_V)

        # 2. Split into multiple heads
        Q_split = self.split_heads(Q_proj)
        K_split = self.split_heads(K_proj)
        V_split = self.split_heads(V_proj)

        # 3. Scaled Dot-Product Attention
        attn_output, attn_weights = scaled_dot_product_attention(Q_split, K_split, V_split, mask)
        # attn_weights shape: (batch_size, num_heads, seq_len, seq_len)

        # 4. Concatenate heads
        concat_output = self.combine_heads(attn_output)

        # 5. Final linear projection
        output = np.matmul(concat_output, self.W_O)

        return output, attn_weights

# --- 4. Implement Feed-Forward Network (FFN) ---

class FeedForward:
    def __init__(self, d_model, d_ff):
        # Weights (d_model, d_ff) and (d_ff, d_model)
        self.W1 = np.random.randn(d_model, d_ff) * 0.01
        self.b1 = np.zeros(d_ff)
        self.W2 = np.random.randn(d_ff, d_model) * 0.01
        self.b2 = np.zeros(d_model)

    def forward(self, x):
        # GELU activation (modern and smooth activation function)
        def gelu(x):
            return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3)))

        # 1. Linear (d_model -> d_ff) + Bias
        h = np.matmul(x, self.W1) + self.b1

        # 2. Activation
        h = gelu(h)

        # 3. Linear (d_ff -> d_model) + Bias
        output = np.matmul(h, self.W2) + self.b2
        return output

# --- 4. Transformer Encoder Layer ---

class EncoderLayer:
    def __init__(self, d_model, num_heads, d_ff):
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)

        # Layer Normalization layers
        self.norm1 = LayerNorm(d_model)
        self.norm2 = LayerNorm(d_model)

    def forward(self, x, mask=None):
        # 1. Multi-Head Attention Sublayer (Self-Attention)
        attn_output, attn_weights = self.attn.forward(Q=x, K=x, V=x, mask=mask)

        # Add & Norm 1 (Residual connection + Layer Normalization)
        x = self.norm1.forward(x + attn_output)

        # 2. Feed-Forward Sublayer
        ffn_output = self.ffn.forward(x)

        # Add & Norm 2 (Residual connection + Layer Normalization)
        x = self.norm2.forward(x + ffn_output)

        return x, attn_weights

# --- Full Mini Transformer Encoder ---

class TransformerEncoder:
    def __init__(self, num_layers, d_model, num_heads, d_ff, max_len, vocab_size, embedding_weights):
        self.embedding_weights = embedding_weights
        self.pe = positional_encoding(max_len, d_model)
        self.layers = [EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]

    def create_padding_mask(self, token_ids):
        """Creates a broadcastable mask (1.0 at padding positions)."""
        # is_padding: True where token is <pad> (0)
        is_padding = (token_ids == PAD_IDX) # (batch_size, seq_len)

        # Mask shape: (batch_size, 1, 1, seq_len)
        # This will broadcast to mask the K/V sequence (the columns of the attention matrix).
        mask = is_padding[:, np.newaxis, np.newaxis, :]

        return mask.astype(np.float32)

    def forward(self, token_ids):
        # 1. Embedding + Positional Encoding
        x = embed_tokens(token_ids)
        x += self.pe[:, :x.shape[1], :] # Slicing PE to match sequence length

        # 2. Padding Mask
        padding_mask = self.create_padding_mask(token_ids)

        all_attn_weights = []
        for i, layer in enumerate(self.layers):
            x, attn_weights = layer.forward(x, mask=padding_mask)
            all_attn_weights.append(attn_weights)

        # x: (batch_size, seq_len, d_model) - Final contextual embeddings
        return x, all_attn_weights

# --- Execution and Demonstration (Step 5) ---

# Initialize the Encoder
encoder = TransformerEncoder(
    num_layers=NUM_LAYERS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    d_ff=D_FF,
    max_len=MAX_LEN,
    vocab_size=VOCAB_SIZE,
    embedding_weights=embedding_weights
)

# 1. Input Tokens
print("-" * 50)
print("1. Input Tokens (First Sentence):")
print("-" * 50)

# Get the first sentence's original tokens and token IDs
first_sentence_tokens = [idx_to_word[i] for i in token_batch[0] if i != PAD_IDX]
first_sentence_ids = token_batch[0][:len(first_sentence_tokens)]

print("Raw Sentence:   ", sentences[0])
print("Token IDs:      ", first_sentence_ids)
print("Token Sequence: ", first_sentence_tokens)
print("Padding to Max_Len:", token_batch[0])


# Run the forward pass
final_embeddings, all_attn_weights = encoder.forward(token_batch)


# 2. Final Contextual Embeddings
print("\n" + "=" * 50)
print("2. Final Contextual Embeddings (First 3 Tokens of First Sentence)")
print("Shape: (Batch Size, Seq Length, D_MODEL) = ", final_embeddings.shape)
print("=" * 50)

# Extract first sentence's contextual embeddings (up to first 5 words)
first_sentence_embeddings = final_embeddings[0, :5, :]

for i, token in enumerate(first_sentence_tokens[:5]):
    embedding_snippet = ', '.join(f'{x:.4f}' for x in first_sentence_embeddings[i, :4]) + '...'
    print(f"Token '{token:<5}': [{embedding_snippet}]")


# 3. Attention Heatmap (Matrix Printout)
print("\n" + "#" * 50)
print("3. Attention Heatmap (Head 1, First Sentence)")
print("#" * 50)

# Attn weights shape: (batch_size, num_heads, seq_len, seq_len)
# We use the first layer, first head, first sentence.
attn_matrix = all_attn_weights[0][0, 0, :, :]

# Slice the matrix to show only the tokens, ignoring padding rows/columns
N = len(first_sentence_tokens)
attn_matrix_visible = attn_matrix[:N, :N]

# Print header row (Query/K tokens)
header = "{:<10}".format("Query\\Key") + "".join(f"{t:<10}" for t in first_sentence_tokens)
print(header)
print("-" * len(header))

# Print body rows (Query tokens)
for i, query_token in enumerate(first_sentence_tokens):
    row = f"{query_token:<10}"
    # Print weights from the query token (row i) to all key tokens (cols j)
    for j in range(N):
        row += f"{attn_matrix_visible[i, j]:<10.4f}"
    print(row)

# Interpretation
print("\nNote: Each row shows how much the 'Query' word (row token) attended to the 'Key' words (column tokens).")
print("High values (e.g., 0.1000) indicate strong connections for this specific attention head.")

--------------------------------------------------
1. Input Tokens (First Sentence):
--------------------------------------------------
Raw Sentence:    The quick brown fox jumps over the lazy dog.
Token IDs:       [2 3 4 5 6 7 2 8 9]
Token Sequence:  ['the', 'quick', 'brown', 'fox', 'jumps', 'over', 'the', 'lazy', 'dog']
Padding to Max_Len: [2 3 4 5 6 7 2 8 9 0 0 0 0 0 0]

2. Final Contextual Embeddings (First 3 Tokens of First Sentence)
Shape: (Batch Size, Seq Length, D_MODEL) =  (10, 15, 32)
Token 'the  ': [-0.9876, 1.0276, -0.9908, 1.0199...]
Token 'quick': [0.6570, 0.0034, -0.0151, 0.6649...]
Token 'brown': [0.8049, -2.0130, 0.7705, -0.2031...]
Token 'fox  ': [-0.6835, -2.8093, 1.0087, -1.1653...]
Token 'jumps': [-2.1849, -2.0096, 0.6118, -1.9376...]

##################################################
3. Attention Heatmap (Head 1, First Sentence)
##################################################
Query\Key the       quick     brown     fox       jumps     over      the       lazy 