In [1]:
import math

import torch
import torch.nn as nn
import torch.nn.functional as F

### Embedding layer

In [2]:
class InputEmbedding(nn.Module):
    def __init__(self, vocab_size: int, d_model: int):
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

In [3]:
embed = InputEmbedding(vocab_size=100, d_model=16)
dummy_input = torch.randint(0, 100, (2, 5))
print("InputEmbedding Output:", embed(dummy_input).shape)

InputEmbedding Output: torch.Size([2, 5, 16])


### Positional Encoding

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_seq_length: int):
        super().__init__()
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


In [5]:
pos_enc = PositionalEncoding(d_model=16, max_seq_length=10)
dummy_embed = torch.rand(2, 5, 16)
print("PositionalEncoding Output:", pos_enc(dummy_embed).shape)

PositionalEncoding Output: torch.Size([2, 5, 16])


### MultiHead Attention

In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, nhead: int):
        super().__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.head_dim = d_model // nhead
        assert d_model % nhead == 0, "d_model must be divisible by nhead"
        self.query = nn.Linear(d_model, d_model, bias=False)
        self.key = nn.Linear(d_model, d_model, bias=False)
        self.value = nn.Linear(d_model, d_model, bias=False)
        self.output = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        x = x.reshape(batch_size, -1, self.nhead, self.head_dim)
        return x.permute(0, 2, 1, 3)

    def compute_attention(self, query, key, value, mask=None):
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_dim)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attention_weights = F.softmax(scores, dim=-1)
        return torch.matmul(attention_weights, value)

    def combine_heads(self, x, batch_size):
        x = x.permute(0, 2, 1, 3).contiguous()
        return x.view(batch_size, -1, self.d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        query = self.split_heads(self.query(query), batch_size)
        key = self.split_heads(self.key(key), batch_size)
        value = self.split_heads(self.value(value), batch_size)
        out = self.compute_attention(query, key, value, mask)
        return self.output(self.combine_heads(out, batch_size))

In [7]:
attn = MultiHeadAttention(d_model=16, nhead=4)
x = torch.rand(2, 5, 16)
print("MultiHeadAttention Output:", attn(x, x, x).shape)

MultiHeadAttention Output: torch.Size([2, 5, 16])


### Feed Forward

In [8]:
class FeedForwardSubLayer(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(F.relu(self.fc1(x)))

In [9]:
ff = FeedForwardSubLayer(d_model=16, d_ff=32)
print("FeedForwardSubLayer Output:", ff(torch.rand(2, 5, 16)).shape)

FeedForwardSubLayer Output: torch.Size([2, 5, 16])


### Encoder Block

In [10]:
class EncodedLayer(nn.Module):  # Fixed typo in name
    def __init__(self, nhead, d_ff, d_model, dropout):
        super().__init__()
        self.self_attention = MultiHeadAttention(d_model, nhead)
        self.feed_forward = FeedForwardSubLayer(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_out = self.self_attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_out))
        ff_out = self.feed_forward(x)
        return self.norm2(x + self.dropout(ff_out))

In [11]:
layer = EncodedLayer(nhead=4, d_ff=32, d_model=16, dropout=0.1)
print("EncodedLayer Output:", layer(torch.rand(2, 5, 16), None).shape)

EncodedLayer Output: torch.Size([2, 5, 16])


In [12]:
class EncoderBlock(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, nheads, d_ff, dropout, max_seq_len):
        super().__init__()
        self.embedding = InputEmbedding(vocab_size, d_model)
        self.position = PositionalEncoding(d_model, max_seq_len)
        self.layers = nn.ModuleList([
            EncodedLayer(nheads, d_ff, d_model, dropout) for _ in range(num_layers)
        ])

    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.position(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [13]:
enc = EncoderBlock(100, 16, 2, 4, 32, 0.1, 10)
print("EncoderBlock Output:", enc(torch.randint(0, 100, (2, 5)), None).shape)

EncoderBlock Output: torch.Size([2, 5, 16])


### Encoder Transformers Heads

In [14]:
class ClassificationHead(nn.Module):
    def __init__(self, d_model, num_classes):
        super().__init__()
        self.classification_layer = nn.Linear(d_model, num_classes)

    def forward(self, x):
        return F.log_softmax(self.classification_layer(x), dim=-1)

In [15]:
clf = ClassificationHead(d_model=16, num_classes=5)
print("ClassificationHead Output:", clf(torch.rand(2, 5, 16)).shape)

ClassificationHead Output: torch.Size([2, 5, 5])


In [16]:
class RegressionHead(nn.Module):
    def __init__(self, d_model, out_dim):
        super().__init__()
        self.regression_layer = nn.Linear(d_model, out_dim)

    def forward(self, x):
        return self.regression_layer(x)

In [17]:
reg = RegressionHead(d_model=16, out_dim=1)
print("RegressionHead Output:", reg(torch.rand(2, 5, 16)).shape)

RegressionHead Output: torch.Size([2, 5, 1])


### Encoder Transformer Example

In [18]:
vocab_size = 100
seq_len = 5
d_model = 16
nhead = 4
d_ff = 32
num_layers = 2
dropout = 0.1
num_classes = 5

x = torch.randint(0, vocab_size, (2, seq_len))
mask = None

encoder = EncoderBlock(vocab_size, d_model, num_layers, nhead, d_ff, dropout, max_seq_len=10)
classification_head = ClassificationHead(d_model, num_classes)

encoded = encoder(x, mask)
output = classification_head(encoded)

print("Final Transformer Output (classification):", output.shape)

Final Transformer Output (classification): torch.Size([2, 5, 5])


### Decoder Blcok

In [19]:
seq_length = 3
mask = (1 - torch.triu(
    torch.ones(1, seq_length, seq_length), diagonal=1)
    ).bool()

note that:
- decoder layer is the same as encoder layer but the mask is passed
- The decoder Head is implemented within the Decoder Block

In [20]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.ff_sublayer = FeedForwardSubLayer(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_out, tgt_mask, cross_mask):
        x = self.norm1(x + self.dropout(self.self_attn(x, x, x, tgt_mask)))
        x = self.norm2(x + self.dropout(self.cross_attn(x, encoder_out, encoder_out, cross_mask)))
        x = self.norm3(x + self.dropout(self.ff_sublayer(x)))
        return x

In [21]:
layer = DecoderLayer(d_model=16, num_heads=4, d_ff=32, dropout=0.1)
x = torch.rand(2, 5, 16)
enc = torch.rand(2, 6, 16)
print("DecoderLayer Output:", layer(x, enc, None, None).shape)

DecoderLayer Output: torch.Size([2, 5, 16])


In [22]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, nhead, d_ff, dropout, max_seq_length):
        super().__init__()
        self.embedding = InputEmbedding(vocab_size, d_model)
        self.position = PositionalEncoding(d_model, max_seq_length)
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, nhead, d_ff, dropout) for _ in range(num_layers)
        ])
        self.output_fc = nn.Linear(d_model, vocab_size)

    def forward(self, x, encoder_out, tgt_mask, cross_mask):
        x = self.embedding(x)
        x = self.position(x)
        for layer in self.layers:
            x = layer(x, encoder_out, tgt_mask, cross_mask)
        return F.log_softmax(self.output_fc(x), dim=-1)

In [23]:
decoder = TransformerDecoder(100, 16, 2, 4, 32, 0.1, 10)
x = torch.randint(0, 100, (2, 5))
enc_out = torch.rand(2, 6, 16)
print("TransformerDecoder Output:", decoder(x, enc_out, None, None).shape)

TransformerDecoder Output: torch.Size([2, 5, 100])


### Full Transformer

In [24]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, nheads, d_ff, max_seq_len, dropout):
        super().__init__()
        self.encoder = EncoderBlock(vocab_size, d_model, num_layers, nheads, d_ff, dropout, max_seq_len)
        self.decoder = TransformerDecoder(vocab_size, d_model, num_layers, nheads, d_ff, dropout, max_seq_len)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, cross_mask=None):
        encoder_output = self.encoder(src, src_mask)
        decoder_output = self.decoder(tgt, encoder_output, tgt_mask, cross_mask)
        return decoder_output

In [25]:
model = Transformer(vocab_size=100, d_model=16, num_layers=2, nheads=4, d_ff=32, max_seq_len=10, dropout=0.1)
src = torch.randint(0, 100, (2, 6))
tgt = torch.randint(0, 100, (2, 5))
print("Transformer Output:", model(src, tgt).shape)

Transformer Output: torch.Size([2, 5, 100])
