In [8]:
import pandas as pd
import numpy as np
import math
import torch
import torch.nn as nn

## Embedding

In [20]:
# Revised Class for Embedding and Positional Encoding with correct dimension matching
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, dropout):
        super().__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.dropout = nn.Dropout(dropout)
        
        # Initialize positional encoding
        self.positional_encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        self.positional_encoding[:, 0::2] = torch.sin(position * div_term)
        self.positional_encoding[:, 1::2] = torch.cos(position * div_term)
        self.positional_encoding = self.positional_encoding.unsqueeze(0)
        
        # Register positional_encoding as a buffer
        self.register_buffer('positional_encoding_buffer', self.positional_encoding)
        
    def forward(self, x):
        # Perform embedding
        embedding = self.embedding(x)
        
        # Add positional encoding (ensuring dimensions match)
        seq_len = x.size(1)
        embedding += self.positional_encoding[:, :seq_len, :]
        
        return self.dropout(embedding)

# Test the revised class
vocab_size = 3000
d_model = 512
max_len = 100
dropout = 0.1
embedding = TokenEmbedding(vocab_size, d_model, max_len, dropout)

# Forward pass
sample_input = torch.randint(0, vocab_size, (16, 100))
output = embedding(sample_input)

# Check the output shape [batch_size, seq_len, d_model]
output.shape

torch.Size([16, 100, 512])

In [9]:
class PositionalEncoding(nn.Module):
    "Implement the PE function."

    def __init__(self, d_model, dropout, max_len):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

class Embeddings(nn.Module):
    def __init__(self, d_model, vocab_size, max_len, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

## Positional Encoding

In [10]:
class PositionalEncoding(nn.Module):
    "Implement the PE function."

    def __init__(self, d_model, dropout, max_len):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

## Multi-Head Attention

In [11]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model: int, n_heads: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model # Embedding vector size
        self.n_heads = n_heads # Number of heads
        # Make sure d_model is divisible by n_heads
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"

        self.d_k = d_model // n_heads # Dimension of vector seen by each head
        self.w_q = nn.Linear(d_model, d_model, bias=False) # Wq
        self.w_k = nn.Linear(d_model, d_model, bias=False) # Wk
        self.w_v = nn.Linear(d_model, d_model, bias=False) # Wv
        self.w_o = nn.Linear(d_model, d_model, bias=False) # Wo
        self.dropout = nn.Dropout(dropout)

    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1]
        # Just apply the formula from the paper
        # (batch, n_heads, seq_len, d_k) --> (batch, n_heads, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            # Write a very low value (indicating -inf) to the positions where mask == 0
            attention_scores.masked_fill_(mask == 0, -1e9)
        attention_scores = attention_scores.softmax(dim=-1) # (batch, n_heads, seq_len, seq_len) # Apply softmax
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        # (batch, n_heads, seq_len, seq_len) --> (batch, n_heads, seq_len, d_k)
        # return attention scores which can be used for visualization
        return (attention_scores @ value), attention_scores

    def forward(self, q, k, v, mask):
        query = self.w_q(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        key = self.w_k(k) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        value = self.w_v(v) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)

        # (batch, seq_len, d_model) --> (batch, seq_len, n_heads, d_k) --> (batch, n_heads, seq_len, d_k)
        query = query.view(query.shape[0], query.shape[1], self.n_heads, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.n_heads, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.n_heads, self.d_k).transpose(1, 2)

        # Calculate attention
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)
        
        # Combine all the heads together
        # (batch, n_heads, seq_len, d_k) --> (batch, seq_len, n_heads, d_k) --> (batch, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.n_heads * self.d_k)

        # Multiply by Wo
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)  
        return self.w_o(x)

In [None]:
# Redefining the Transformer class to include hyperparameters similar to torch.nn.Transformer
class Transformer(nn.Module):
    def __init__(self, d_model=512, n_head=8, n_encoder_layers=6, n_decoder_layers=6,
                 d_feedforward=2048, dropout=0.1):
        super(Transformer, self).__init__()
        self.d_model = d_model
        self.n_head = n_head
        self.n_encoder_layers = n_encoder_layers
        self.n_decoder_layers = n_decoder_layers
        self.d_feedforward = d_feedforward
        self.dropout = dropout

        # Placeholder components for now
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.src_embed = Embedding()
        self.tgt_embed = Embedding()
        self.generator = Generator()

    def forward(self, src, tgt, src_mask, tgt_mask):
        enc_out = self.encoder(self.src_embed(src), src_mask)
        output = self.decoder(self.tgt_embed(tgt), enc_out, src_mask, tgt_mask)
        return output

# Create a Transformer model with hyperparameters
# model = Transformer()

# Forward pass with dummy input tensors
# output = model(src, tgt, src_mask, tgt_mask)


In [7]:
# Define the Multi-Head Attention layer
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head, dropout):
        super(MultiHeadAttention, self).__init__()
        # Placeholder implementation for now
        pass

    def forward(self, query, key, value, mask=None):
        return query  # Placeholder

# Define the Feed-Forward Network
class FeedForward(nn.Module):
    def __init__(self, d_model, d_feedforward, dropout):
        super(FeedForward, self).__init__()
        # Placeholder implementation for now
        pass

    def forward(self, x):
        return x  # Placeholder

# Define a single Encoder layer with one Multi-Head Attention and one Feed-Forward Network
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_feedforward, dropout):
        super(EncoderLayer, self).__init__()
        self.multi_head_attention = MultiHeadAttention(d_model, n_head, dropout)
        self.feed_forward = FeedForward(d_model, d_feedforward, dropout)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # Multi-Head Attention
        attn_output = self.multi_head_attention(x, x, x, mask)
        x = x + self.dropout(attn_output)
        x = self.layer_norm1(x)

        # Feed-Forward Network
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        x = self.layer_norm2(x)
        
        return x

# Define the Encoder that stacks multiple Encoder layers
class Encoder(nn.Module):
    def __init__(self, d_model, n_head, num_layers, d_feedforward, dropout):
        super(Encoder, self).__init__()
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_head, d_feedforward, dropout) for _ in range(num_layers)])

    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return x

# Test the new Encoder implementation
encoder_test = Encoder(d_model=512, n_head=8, num_layers=6, d_feedforward=2048)
output_test = encoder_test(torch.rand(10, 32, 512))  # Batch size: 10, Sequence length: 32, Feature size: 512
output_test.shape

torch.Size([10, 32, 512])