In [95]:
import pandas as pd
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

In [96]:
df = pd.read_parquet("./train-00000-of-00001.parquet")  # If using local file

In [97]:

english_sentences = []
deutsch_sentences = []
for i in range(len(df))[:100]:
    english = df.iloc[i].iloc[0]['en']
    deutsch = df.iloc[i].iloc[0]['de']
    english_sentences.append(english)
    deutsch_sentences.append(deutsch)

In [98]:
GLOBALS = {
    "INPUT-VOCABULARY-SIZE" : 30_000, # number of accepted distinct INPUT tokens.
    "OUTPUT-VOCABULARY-SIZE" : 30_000, # number of accepted distinct OUTPUT tokens.
    "D-MODEL" : 512, # dimension of each token's embedding vector.
    "INPUT-SEQUENCE-LENGTH" : 256,
    "NUM-HEADS" : 4 ,# must divide D-MODEL evenly!
    "D-FF" : 1024,
    'NUM-ENCODER-BLOCKS' : 4,
} 

In [99]:
def BPETokenizer(corpus_sentences,vocab_size=30000):
    # Initialize a BPE tokenizer
    tokenizer = Tokenizer(BPE())

    # Set up a trainer with desired vocabulary size
    trainer = BpeTrainer(vocab_size=vocab_size, min_frequency=2, special_tokens=["<unk>", "<pad>", "<bos>", "<eos>"])

    # Define a pre-tokenizer to split input text into words
    tokenizer.pre_tokenizer = Whitespace()

    # Tokenizer expects an iterator of strings
    tokenizer.train_from_iterator(corpus_sentences, trainer=trainer)
    tokenizer.enable_padding(length=GLOBALS['INPUT-SEQUENCE-LENGTH'], pad_id=tokenizer.token_to_id("<pad>"), pad_token="<pad>")

    return tokenizer


english_encoder = BPETokenizer(english_sentences,GLOBALS["INPUT-VOCABULARY-SIZE"])
deutsch_encoder = BPETokenizer(deutsch_sentences,GLOBALS["OUTPUT-VOCABULARY-SIZE"])


In [100]:
import torch
import torch.nn as nn
import math

import torch.nn as nn

class InputEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(InputEmbedding, self).__init__()
        # The layer stores a learnable weight matrix of size (vocab_size, d_model).
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)

    def forward(self, x):
        # x: Tensor of shape (batch_size, input_context_length)
        embedded = self.embedding(x)
        # embedded: Tensor of shape (batch_size, input_context_length, d_model)
        return embedded

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, seq_length):
        super(PositionalEncoding, self).__init__()
        # Create a matrix of shape (seq_length, d_model) to hold the positional encodings
        pe = torch.zeros(seq_length, d_model)
        # Position indices (0, 1, 2, ..., seq_length-1)
        position = torch.arange(0, seq_length, dtype=torch.float).unsqueeze(1)
        # Compute the div_term based on the dimension indices
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # Apply sine to even indices and cosine to odd indices
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # Add a batch dimension
        pe = pe.unsqueeze(0)
        # Register as a buffer to prevent updates during training
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: Tensor of shape (batch_size, input_context_length, d_model)
        x = x + self.pe[:, :x.size(1), :]
        return x
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_model // num_heads
        
        # Learnable weight matrices for Q, K, V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        
        # Output linear transformation
        self.W_o = nn.Linear(d_model, d_model)
        
    def forward(self, x):
        batch_size, seq_length, _ = x.size()
        
        # Linear projections
        Q = self.W_q(x)  # (batch_size, seq_length, d_model)
        K = self.W_k(x)  # (batch_size, seq_length, d_model)
        V = self.W_v(x)  # (batch_size, seq_length, d_model)
        
        # Reshape Q, K, V for multi-head attention
        Q = Q.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_length, d_k)
        K = K.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_length, d_k)
        V = V.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_length, d_k)
        
        # Scaled dot-product attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)  # (batch_size, num_heads, seq_length, seq_length)
        attention_weights = torch.nn.functional.softmax(scores, dim=-1)  # (batch_size, num_heads, seq_length, seq_length)
        attention_output = torch.matmul(attention_weights, V)  # (batch_size, num_heads, seq_length, d_k)
        
        # Concatenate heads and apply final linear transformation
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)  # (batch_size, seq_length, d_model)
        output = self.W_o(attention_output)  # (batch_size, seq_length, d_model)
        
        return output
    
class AddNorm(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super(AddNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer_output):
        # Apply dropout to the sublayer output
        sublayer_output = self.dropout(sublayer_output)
        # Add the original input (residual connection) and normalize
        return self.layer_norm(x + sublayer_output)
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        """
        Initializes the PositionwiseFeedForward layer.

        Args:
            d_model (int): Dimensionality of the input and output features.
            d_ff (int): Dimensionality of the hidden layer.
            dropout (float): Dropout rate for regularization.
        """
        super(PositionwiseFeedForward, self).__init__()
        
        # First linear transformation: projects from d_model to d_ff
        self.linear1 = nn.Linear(d_model, d_ff)
        
        # ReLU activation function introduces non-linearity
        self.relu = nn.ReLU()
        
        # Second linear transformation: projects back from d_ff to d_model
        self.linear2 = nn.Linear(d_ff, d_model)
        


    def forward(self, x):
        """
        Defines the forward pass of the PositionwiseFeedForward layer.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_length, d_model).

        Returns:
            torch.Tensor: Output tensor of the same shape as input.
        """
        # Apply first linear transformation
        x = self.linear1(x)
        
        # Apply ReLU activation
        x = self.relu(x)
        
        # Apply second linear transformation
        x = self.linear2(x)
        
        return x


In [101]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        """
        Initializes the Transformer Encoder Layer.

        Args:
            d_model (int): Dimensionality of the input embeddings.
            num_heads (int): Number of attention heads.
            d_ff (int): Dimensionality of the feed-forward network's hidden layer.
            dropout (float): Dropout rate for regularization.
        """
        super(EncoderBlock, self).__init__()
        
        # Multi-Head Self-Attention mechanism
        self.self_attention = MultiHeadSelfAttention(d_model, num_heads)
        
        # Add & Norm layer after self-attention
        self.add_norm1 = AddNorm(d_model)
        
        # Position-wise Feed-Forward Network
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff)
        
        # Add & Norm layer after feed-forward network
        self.add_norm2 = AddNorm(d_model)

    def forward(self, x):
        """
        Defines the forward pass of the Transformer Encoder Layer.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_length, d_model).

        Returns:
            torch.Tensor: Output tensor of the same shape as input.
        """
        # Apply multi-head self-attention
        attention_output = self.self_attention(x)
        
        # Apply Add & Norm
        x = self.add_norm1(x, attention_output)
        
        # Apply position-wise feed-forward network
        ff_output = self.feed_forward(x)
        
        # Apply Add & Norm
        x = self.add_norm2(x, ff_output)
        
        return x

class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff):
        """
        Initializes the Transformer Encoder.

        Args:
            num_layers (int): Number of encoder layers to stack.
            d_model (int): Dimensionality of the input embeddings.
            num_heads (int): Number of attention heads.
            d_ff (int): Dimensionality of the feed-forward network's hidden layer.
        """
        super(Encoder, self).__init__()
        
        # Stack of encoder layers
        self.layers = nn.ModuleList([
            EncoderBlock(d_model, num_heads, d_ff)
            for _ in range(num_layers)
        ])
        
        # Layer normalization applied after the last encoder layer
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x):
        """
        Defines the forward pass of the Transformer Encoder.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_length, d_model).

        Returns:
            torch.Tensor: Output tensor of the same shape as input.
        """
        # Pass the input through each encoder layer
        for layer in self.layers:
            x = layer(x)
        
        # Apply layer normalization to the final output
        x = self.layer_norm(x)
        
        return x


In [102]:

import torch

input_embedding_layer = InputEmbedding(GLOBALS['INPUT-VOCABULARY-SIZE'],GLOBALS['D-MODEL'])
input_positional_encoding_layer = PositionalEncoding(GLOBALS['D-MODEL'],GLOBALS['INPUT-SEQUENCE-LENGTH'])

encoder = Encoder(GLOBALS['NUM-ENCODER-BLOCKS'],GLOBALS['D-MODEL'],GLOBALS['NUM-HEADS'],GLOBALS["D-FF"])

for english_sentence,deutsch_sentence in zip(english_sentences,deutsch_sentences):
    
    english_encoding = english_encoder.encode(english_sentence).ids
    
    english_encoding = torch.LongTensor(english_encoding).reshape(1,-1)    # (batch,input_context_len)
    x = input_embedding_layer.forward(english_encoding) # (batch_size, input_context_length, d_model)
    x = input_positional_encoding_layer(x) # (batch_size, input_context_length, d_model)
    
    x = encoder.forward(x)
    print(x.shape)
    break

torch.Size([1, 256, 512])
