# NeuroFormer

**NeuroFormer** is a **modular Transformer architecture** built from scratch in PyTorch that supports:

- **Decoder-only** — for language modeling and chatbots  
- **Encoder-only** — for tasks like question answering  
- **Encoder-Decoder** — for machine translation

In [1]:
# Importing Libraries
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
# Setting Device to GPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

## Defining Utility Functions

## Pad Masking

- Creates a padding mask to ignore pad tokens during attention.

In [3]:
def pad_mask(seq, pad_token=0):

  # seq -> [batch_size, sequence_length]
  mask = (seq != pad_token).unsqueeze(1).unsqueeze(1) # [B, 1, 1, T]
  return mask

## Casual Masking

- Prevents a token from attending to future tokens during training, used in **Decoder** blocks.

- Masking is applied in decoder block to enable **auto-regressive** generation

In [4]:
def causal_mask(sequence_length, device):

  # batch_size is referred as B and sequnece_length is referred as T while mentioning Dimentions

  # Creating a Lower Triangular matrix
  mask = torch.ones(sequence_length, sequence_length, device=device, dtype=torch.bool)
  mask = torch.tril(mask) # Dimentions: (T, T)

  # Adding batch_size, and num_heads
  mask = mask.unsqueeze(0).unsqueeze(0) # Dimentions: (1, 1, T, T)
  return mask

## Cross Masking

- It is a binary attention mask used in Cross Attention which are used in encoder-decoder architecture like T5
- Prevent the **decoder** from attending to **pad** tokens in the encoder output.

In [5]:
def cross_mask(src_seq, tgt_seq, src_pad_token=0, tgt_pad_token=0):

  # src_seq: [batch_size, src_len]
  # tgt_seq: [batch_size, tgt_len]

  batch_size, tgt_len = tgt_seq.shape
  batch_size, src_len = src_seq.shape

  # Create source padding mask: [batch_size, src_len]
  src_valid = (src_seq != src_pad_token)  # True for valid tokens

  # Create target padding mask: [batch_size, tgt_len]
  tgt_valid = (tgt_seq != tgt_pad_token)  # True for valid tokens

  # Each target position can attend to all valid source positions
  cross_mask = src_valid.unsqueeze(1).expand(-1, tgt_len, -1)  # [B, tgt_len, src_len]

  # Mask out padded target positions (they shouldn't attend to anything)
  tgt_mask = tgt_valid.unsqueeze(-1)  # [batch_size, tgt_len, 1]
  mask = cross_mask & tgt_mask  # [batch_size, tgt_len, src_len]

  # Add num_heads: [batch_size, 1, tgt_len, src_len]
  return mask.unsqueeze(1)

In [6]:
def combine_masks(*masks): # Takes Variable mask input

  # If no Mask is passed retruns None
  if not masks:
      return None

  # Take the first mask as starting point
  combined = masks[0]

  # Iterate over rest of masks and if mask is not None, it combines and return True only if all mask agrees on that position
  for mask in masks[1:]:
      if mask is not None:
          combined = combined & mask

  return combined

## Scaled Dot Product Attention

$$
\text{Attention}(Q, K, V) = \text{softmax}\left( \frac{QK^T}{\sqrt{d_k}} \right) V
$$

- $Q$ -> It represents the query vector, or what the model is searching for
- $K$ -> It represents the key vector, or all the words in the input sequence and is used to compare against query
- $V$ -> It represents the value vector, the actual meaning of each word in the sentence.
- $d_k$ -> Dimetionality of the Query/Key vector used for Scaling
- $QK^T$ -> To find to similarity between the query vector and key vector






In [7]:
def scaled_dot_product_attention(q, k, v, mask = None):
  d_k = q.size()[-1] # head_dim
  dot_product = torch.matmul(q, k.transpose(-1, -2)) # q: [B, num_heads, T, head_dim], kᵀ: [B, num_heads, head_dim, T], q @ kᵀ: [B, num_heads, T, T]
  scaled = dot_product / math.sqrt(d_k) # [B, num_heads, T, T]

  if mask is not None:
    # Convert boolean mask to additive mask (-inf for False positions)
    scaled = scaled.masked_fill(~mask , float('-inf')) # After Broadcasting: [B, num_heads, T, T]

  scaled = scaled - scaled.max(dim=-1, keepdim=True)[0]  # subtract max before softmax [B, num_heads, T, T]
  attention = F.softmax(scaled, dim=-1) # attention: [B, num_heads, T, T]
  values = torch.matmul(attention, v) # v: [B, num_heads, T, head_dim] , Values: [B, num_heads, T, head_dim]

  return attention, values

## Multi-Head Attention

$$MultiHead(Q, K, V) = Concat(head₁, ..., headₕ) · Wᵒ$$
$$headᵢ = Attention(Q · Wᵢ^Q, K · Wᵢ^K, V · Wᵢ^V)$$

### Multi-Head Attention splits the model into multiple **heads** that learns different types of relationship between the data.

### Each individual head performs seperate scaled dot product attention independently and they improve the training speed through Parallelism.


In [8]:
class MultiHeadAttention(nn.Module): # nn.Module so that it can inherit nn.Module functions and Becomes reusable and modular block

  def __init__(self, d_model, num_heads):
    super().__init__()
    self.d_model = d_model # Dimentions of Input Embeddings
    self.num_heads = num_heads # Number of heads in the Multi-Head Attention
    self.head_dim = d_model // num_heads # Dimentions of Embedding passed in each Head
    self.qkv_layer = nn.Linear(d_model, 3 * d_model) # Projects the input as Query, Key and Value
    self.linear_layer = nn.Linear(d_model, d_model) # After combining all the heads this layer concatenates result back to input embedding dimentions

  def forward(self, x, mask = None):
    batch_size, sequence_length, d_model = x.size() # Input X is of dimentions: [B, T, d_model]
    qkv = self.qkv_layer(x) # It makes the dimentions: [B, T, 3 * d_model]

    qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim) # 3 * d_model is converted to (num_heads, 3 * head_dim) as to process in each seperate head num_heads is included and 3 * head_dim as it contains q,k,v combined
    qkv = qkv.permute(0, 2, 1, 3) # Reshaping it to [B, num_heads, T, 3 * head_dim] for efficient attention computation

    q, k, v = qkv.chunk(3, dim=-1) # Splitting the last dimention for q,k,v and making it to [B, num_heads, T, head_dim]
    attention, values = scaled_dot_product_attention(q, k, v, mask) # Calculating the attention and values

    values = values.transpose(1, 2).contiguous() # Combines the output of all heads
    values = values.reshape(batch_size, sequence_length, d_model) # dimentions: [B, T, d_model]
    out = self.linear_layer(values) # Final transformation to combine the output of all heads into d_model: [B, T, d_model]

    return out

## Layer Normalization
$$
\text{LayerNorm}(x) = \gamma \cdot \left( \frac{x - \mu}{\sqrt{\sigma^2 + \varepsilon}} \right) + \beta
$$
Layer Normalization is a technique used to **stabilize** and **accelerate training** by normalizing the inputs across the features of each sample.
**Applies per sample** across **features** (unlike BatchNorm which operates across the batch).

In [9]:
class LayerNormalization(nn.Module):

  def __init__(self, d_model, eps=1e-5):
    super().__init__()
    self.eps = eps
    self.d_model = d_model
    self.gamma = nn.Parameter(torch.ones(d_model)) # Learnable Parameter
    self.beta = nn.Parameter(torch.zeros(d_model)) # Learnable Parameter

  def forward(self, x):

    mean = x.mean(dim=-1, keepdim=True) # Mean
    var = ((x - mean) ** 2).mean(dim=-1, keepdim=True) # Variance
    std = (var + self.eps).sqrt() # Standard Deviation : eps is added to avoid division by 0

    y = (x - mean) / std # Output
    out = self.gamma * y + self.beta # Applying Learnable Parameters
    return out

## Position-wise Feed Forward
$$FFN(x) = max(0, x · W₁ + b₁) · W₂ + b₂$$
- In Transformers, each token’s representation is passed through a feed-forward neural network independently and identically.

- This is known as a Positionwise Feed-Forward Network (FFN) because it operates separately on each position (token) in the sequence.

In [10]:
class PositionWiseFeedForward(nn.Module):

  def __init__(self, d_model, hidden, drop_prob=0.1):
    super().__init__()
    self.linear1 = nn.Linear(d_model, hidden) # Hidden Layer converts dim from d_model to hidden
    self.linear2 = nn.Linear(hidden, d_model) # Hidden Layer converts dim from hidden to d_model
    self.relu = nn.ReLU() # ReLU Activation Function
    self.dropout = nn.Dropout(p=drop_prob) # Dropout Layer

  def forward(self, x):
    x = self.linear1(x)
    x = self.relu(x)
    x = self.dropout(x)
    x = self.linear2(x)
    return x

## Multi-Head Cross Attention
- Cross-attention is a key component of the encoder-decoder architecture used in models like T5 and Transformer for machine translation.

- Query -> Decoder Input
- Key -> Encoder Output
- Value -> Encoder Output

In [11]:
class MultiHeadCrossAttention(nn.Module):

  def __init__(self, d_model, num_heads):
    super().__init__()
    self.d_model = d_model # Dimentions of Input Embedding
    self.num_heads = num_heads # Number of Heads in Multi-Head Cross Attention
    self.head_dim = d_model // num_heads # Dimention of Embedding Passed in each head
    self.kv_layer = nn.Linear(d_model, 2 * d_model) # Projects the Input Embedding as Key, Value from Encoder Output
    self.q_layer = nn.Linear(d_model, d_model) # Projects the Output Embedding as Query from Decoder Input
    self.linear_layer = nn.Linear(d_model, d_model) # Concatenates the results of all heads and returning back the result to d_model dimentions

  def forward(self, x, y, cross_mask = None):
    batch_size, tgt_length, d_model = y.size() # Decoder Input Y of dimentions: [B, T_tgt, d_model]
    src_length = x.size(1) # Encoder Output X

    kv = self.kv_layer(x) # It makes dimentions: [B, T_src, 2 * d_model] i.e Key and Value Combined from Encoder Output
    q = self.q_layer(y) # It makes dimentions: [B, T_tgt, d_model] i.e Query from Decoder Input

    kv = kv.reshape(batch_size, src_length, self.num_heads, 2 * self.head_dim) # 2 * d_model is converted to num_heads as to process in seperate heads and 2 * head_dim i.e combined dimentions of key and value vector
    q = q.reshape(batch_size, tgt_length, self.num_heads, self.head_dim) # d_model is converted to num_heads as to process in seperate heads and head_dim that represents the query vector dimentions

    kv = kv.permute(0, 2, 1, 3)  # For efficient computation, new dimentions: [B, num_heads, T_src, 2 * head_dim]
    q = q.permute(0, 2, 1, 3) # For efficient computation, new dimentions: [B, num_heads, T_tgt, head_dim]

    k, v = kv.chunk(2, dim=-1) # Splitting the last dimention of kv and making it's dimention as [B, num_heads, T_src, head_dim]
    attention, values = scaled_dot_product_attention(q, k, v, cross_mask) # Getting the Attention and values vector

    values = values.transpose(1, 2).contiguous() # Combining back all the heads
    values = values.reshape(batch_size, tgt_length, d_model) # Dimentions: [B, T_tgt, d_model]
    out = self.linear_layer(values) # Learnable Parameter and converts back to original shape

    return out

## Positional Encoding

- The input in attention mechanism is passed all at once and is generally **non auto-regressive** in nature.
- The positionl Encoding help the sentence to get the idea of order otherwise it will treat "I Love You" and "You Love I" as **same**.

In [12]:
class PositionalEncoding(nn.Module):

  def __init__(self, sequence_length, d_model, drop_prob=0.1):
    super().__init__()
    self.pos_embedding = nn.Embedding(sequence_length, d_model) # [T, d_model]
    self.dropout = nn.Dropout(p = drop_prob)

  def forward(self, x):

    batch_size, sequence_length, d_model = x.size() # [B, T, d_model]
    positions = torch.arange(sequence_length, device=x.device) # [T]
    positions = positions.unsqueeze(0) # [1, T]
    pos_emb = self.pos_embedding(positions) # [1, T, d_model]

    x = x + pos_emb # [B, T, d_model] Broadcast Addition
    x = self.dropout(x)
    return x

## Building Blocks: Encoder and Decoder Blocks

In [13]:
class EncoderBlock(nn.Module):

  def __init__(self, d_model, num_heads, hidden, drop_prob):
    super().__init__()

    # SubLayer 1 (MultiHeadAttention + LayerNorm)
    self.self_attention = MultiHeadAttention(d_model, num_heads)
    self.norm1 = LayerNormalization(d_model)
    self.dropout1 = nn.Dropout(p=drop_prob)

    # SubLayer 2 (FeedForwardNetwork + LayerNorm)
    self.ffn = PositionWiseFeedForward(d_model, hidden, drop_prob)
    self.norm2 = LayerNormalization(d_model)
    self.dropout2 = nn.Dropout(p=drop_prob)

  def forward(self, x, pad_mask = None):

    # Self-Attention with residual network
    attn_out = self.self_attention(x, pad_mask)
    x = self.norm1(x + self.dropout1(attn_out))

    # Feed Forward Network with residual network
    ffn_out = self.ffn(x)
    x = self.norm2(x + self.dropout2(ffn_out))

    return x

In [14]:
class DecoderBlock(nn.Module):

  def __init__(self, d_model, num_heads, hidden, drop_prob):
    super().__init__()

    # SubLayer 1 (MultiHeadAttention + LayerNorm)
    self.self_attention = MultiHeadAttention(d_model, num_heads)
    self.norm1 = LayerNormalization(d_model)
    self.dropout1 = nn.Dropout(p=drop_prob)

    # SubLayer2 (MultiHeadCrossAttention + LayerNorm)
    self.cross_attention = MultiHeadCrossAttention(d_model, num_heads)
    self.norm2 = LayerNormalization(d_model)
    self.dropout2 = nn.Dropout(p=drop_prob)

    # SubLayer 3 (FeedForwardNetwork + LayerNorm)
    self.ffn = PositionWiseFeedForward(d_model, hidden, drop_prob)
    self.norm3 = LayerNormalization(d_model)
    self.dropout3 = nn.Dropout(p=drop_prob)

  def forward(self, y, encoder_output=None, self_mask=None, cross_mask=None):

    # x: Encoder Output
    # y: Decoder Input

    # Masked-Self Attention with residual connections
    attn_out = self.self_attention(y, self_mask)
    y = self.norm1(y + self.dropout1(attn_out))

    # Cross Attention (only if Encoder output is provided i.e x)
    if encoder_output is not None:
      cross_attn_out = self.cross_attention(encoder_output, y, cross_mask)
      y = self.norm2(y + self.dropout2(cross_attn_out))

    # Feed Forward Network with residual connections
    ffn_out = self.ffn(y)
    y = self.norm3(y + self.dropout3(ffn_out))

    return y

## Models: Encoder, Decoder, Encoder-Decoder

In [15]:
class Encoder(nn.Module):

  def __init__(self, vocab_size, sequence_length, d_model, num_heads, hidden, num_layers, drop_prob=0.1, pad_token=0):
    super().__init__()

    self.pad_token = pad_token
    self.d_model = d_model

    # Embeddings
    self.embedding = nn.Embedding(vocab_size, d_model) # Input Embedding
    self.pos_enc = PositionalEncoding(sequence_length, d_model, drop_prob) # Positional Encoding

    self.layers = nn.ModuleList([
        EncoderBlock(d_model, num_heads, hidden, drop_prob)
        for _ in range(num_layers)
    ])

    self.qa_head = nn.Linear(d_model, 2) # For Logits Calculation

  def forward(self, input_ids, return_embeddings=False):

    # Embedding + Positional Encoding
    x = self.embedding(input_ids) * math.sqrt(self.d_model) # sqrt(d_model) is multiplied to Stabilize Variance as nn.Embedding generate number between 0 and 1
    x = self.pos_enc(x) # Applying Positional Encodings

    # Creating padding mask
    pad_mask_tensor = pad_mask(input_ids, self.pad_token)

    # Passing Through Multiple Encoder Layers
    for layer in self.layers:
      x = layer(x, pad_mask_tensor)

    if return_embeddings:
      return x
    else:
      logits = self.qa_head(x)
      start_logits = logits[:, :, 0]  # [B, T]
      end_logits = logits[:, :, 1]    # [B, T]
      return start_logits, end_logits

In [16]:
class Decoder(nn.Module):

  def __init__(self, vocab_size, sequence_length, d_model, num_heads, hidden, num_layers, drop_prob=0.1, pad_token=0):
    super().__init__()

    self.pad_token = pad_token
    self.d_model = d_model

    # Embeddings
    self.embedding = nn.Embedding(vocab_size, d_model) # Input Embedding
    self.pos_enc = PositionalEncoding(sequence_length, d_model, drop_prob) # Positional Encoding

    self.layers = nn.ModuleList([
        DecoderBlock(d_model, num_heads, hidden, drop_prob)
        for _ in range(num_layers)
    ])

    self.out = nn.Linear(d_model, vocab_size) # Final Linear Layer for Predicting Probablities

  def forward(self, input_ids):

    seq_len = input_ids.size(1)

    # Creating masks
    pad_mask_tensor = pad_mask(input_ids, self.pad_token)
    causal = causal_mask(seq_len, input_ids.device)

    # Combine masks for self-attention
    padding_expanded = pad_mask_tensor.expand(-1, -1, seq_len, -1)
    padding_self_attn = padding_expanded & padding_expanded.transpose(-1, -2)
    combined_mask = combine_masks(causal, padding_self_attn)

    # Embedding + Positional Encoding
    y = self.embedding(input_ids) * math.sqrt(self.d_model) # sqrt(d_model) is multiplied to Stabilize Variance as nn.Embedding generate number between 0 and 1
    y = self.pos_enc(y) # Applying Positional Encodings

    # Passing Through Multiple Decoder Layers (No Cross Attention)
    for layer in self.layers:
      y = layer(y, encoder_output=None, self_mask=combined_mask)

    logits = self.out(y)
    return logits

In [17]:
class EncoderDecoder(nn.Module):

  def __init__(self, vocab_size, sequence_length, d_model, num_heads, hidden, num_encoder_layers, num_decoder_layers, drop_prob, pad_token=0):
    super().__init__()

    self.pad_token = pad_token
    self.d_model = d_model

    self.encoder_embedding = nn.Embedding(vocab_size, d_model)
    self.decoder_embedding = nn.Embedding(vocab_size, d_model)

    self.encoder_pos_enc = PositionalEncoding(sequence_length, d_model, drop_prob)
    self.decoder_pos_enc = PositionalEncoding(sequence_length, d_model, drop_prob)

    self.encoder_layers = nn.ModuleList([
        EncoderBlock(d_model, num_heads, hidden, drop_prob)
        for _ in range(num_encoder_layers)
    ])

    self.decoder_layers = nn.ModuleList([
        DecoderBlock(d_model, num_heads, hidden, drop_prob)
        for _ in range(num_decoder_layers)
    ])

    self.out = nn.Linear(d_model, vocab_size) # Final Linear Layer for Predicting Probablities

  def encode(self, src_ids):

    # Creating Padding mask
    src_pad_mask = pad_mask(src_ids, self.pad_token)

    # Embedding + Positional Encoding
    x = self.encoder_embedding(src_ids) * math.sqrt(self.d_model) # sqrt(d_model) is multiplied to Stabilize Variance as nn.Embedding generate number between 0 and 1
    x = self.encoder_pos_enc(x) # Applying Positional Encodings

    # Passing Through Multiple Encoder Layers
    for layer in self.encoder_layers:
      x = layer(x, src_pad_mask)

    return x

  def decode(self, tgt_ids, encoder_output, src_ids):

    tgt_len = tgt_ids.size(1)

    # Decoder self-attention mask (causal + padding)
    tgt_pad_mask = pad_mask(tgt_ids, self.pad_token)
    causal = causal_mask(tgt_len, tgt_ids.device)

    tgt_padding_expanded = tgt_pad_mask.expand(-1, -1, tgt_len, -1)
    tgt_padding_self_attn = tgt_padding_expanded & tgt_padding_expanded.transpose(-1, -2)
    self_mask = combine_masks(causal, tgt_padding_self_attn)

    # Cross Attention Mask
    cross_attn_mask = cross_mask(src_ids, tgt_ids, self.pad_token, self.pad_token)

    # Embedding + Positional Encoding
    y = self.decoder_embedding(tgt_ids) * math.sqrt(self.d_model) # sqrt(d_model) is multiplied to Stabilize Variance as nn.Embedding generate number between 0 and 1
    y = self.decoder_pos_enc(y) # Applying Positional Encodings

    for layer in self.decoder_layers:
      y = layer(y, encoder_output, self_mask=self_mask, cross_mask=cross_attn_mask)

    logits = self.out(y)
    return logits # [batch_size, tgt_len, vocab_size]

  def forward(self, src_ids, tgt_ids):

    # Encoder Block
    encoder_output = self.encode(src_ids)

    # Decoder Block
    logits = self.decode(tgt_ids, encoder_output, src_ids)
    return logits

## NeuroFormer

In [18]:
class NeuroFormer(nn.Module):

  def __init__(self, mode, vocab_size, sequence_length, d_model, num_heads, hidden, num_layers, drop_prob=0.1, num_encoder_layers=None, num_decoder_layers=None):
    super().__init__()

    self.mode = mode

    if mode == 'encoder_only':
      self.model = Encoder(vocab_size, sequence_length, d_model, num_heads, hidden, num_layers, drop_prob)

    elif mode == 'decoder_only':
      self.model = Decoder(vocab_size, sequence_length, d_model, num_heads, hidden, num_layers, drop_prob)

    elif mode == 'encoder_decoder':
      enc_layers = num_encoder_layers or num_layers
      dec_layers = num_decoder_layers or num_layers
      self.model = EncoderDecoder(vocab_size, sequence_length, d_model, num_heads, hidden, enc_layers, dec_layers, drop_prob)

  def forward(self, *args, **kwargs):
        return self.model(*args, **kwargs)

In [19]:
def create_neuroformer_models():

    # Common parameters
    vocab_size = 10000
    sequence_length = 256
    d_model = 256
    num_heads = 8
    hidden = 1024
    num_layers = 6

    # 1. Encoder Only - for QA
    bert = NeuroFormer(
        mode='encoder_only',
        vocab_size=vocab_size,
        sequence_length=sequence_length,
        d_model=d_model,
        num_heads=num_heads,
        hidden=hidden,
        num_layers=num_layers
    )

    # 2. Decoder Only - for language modeling and Chat
    gpt = NeuroFormer(
        mode='decoder_only',
        vocab_size=vocab_size,
        sequence_length=sequence_length,
        d_model=d_model,
        num_heads=num_heads,
        hidden=hidden,
        num_layers=num_layers
    )

    # 3. Encoder-Decoder - for translation
    t5 = NeuroFormer(
        mode='encoder_decoder',
        vocab_size=vocab_size,
        sequence_length=sequence_length,
        d_model=d_model,
        num_heads=num_heads,
        hidden=hidden,
        num_layers=num_layers,
        num_encoder_layers=2,
        num_decoder_layers=2
    )

    return bert, gpt, t5

In [20]:
# Example usage:
if __name__ == "__main__":
    bert_model, gpt_model, t5_model = create_neuroformer_models()

    # Example inputs
    batch_size = 4
    seq_len = 256
    vocab_size = 10000

    # Random token IDs
    input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))

    print("NeuroFormer Models Created Successfully!")
    print(f"BERT-like parameters: {sum(p.numel() for p in bert_model.parameters()):,}")
    print(f"GPT-like parameters: {sum(p.numel() for p in gpt_model.parameters()):,}")
    print(f"T5-like parameters: {sum(p.numel() for p in t5_model.parameters()):,}")

NeuroFormer Models Created Successfully!
BERT-like parameters: 7,364,610
GPT-like parameters: 11,516,176
T5-like parameters: 11,507,472
