In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [3]:
## Define positional encoding. Since Transformer model is not sequential we need a way to record position of the token.

In [32]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # Initialize with shape (max_len, d_model) to match the position and model dimensions
        self.encoding = torch.zeros(max_len, d_model, dtype=torch.float)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        # Apply sin to even indices and cos to odd indices of the encoding matrix
        self.encoding[:, 0::2] = torch.sin(position * div_term)  # For even indices
        self.encoding[:, 1::2] = torch.cos(position * div_term)  # For odd indices
        
        # Add a batch dimension at the start, making encoding of shape (1, max_len, d_model)
        self.encoding = self.encoding.unsqueeze(0)
    
    def forward(self, x):
        # Ensure encoding matches the input shape (batch_size, seq_len, d_model) and send to correct device
        return x + self.encoding[:, :x.size(1), :].to(x.device)


In [33]:
def scaled_dot_product_attention(query, key, value, mask=None):
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(query.size(-1))
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    attention_weights = F.softmax(scores, dim=-1)
    
    return torch.matmul(attention_weights, value), attention_weights

In [52]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        query = self.query_linear(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        key = self.key_linear(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        value = self.value_linear(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)

        attention_output, _ = scaled_dot_product_attention(query, key, value, mask)
        attention_output = attention_output.transpose(1,2).contigous().view(batch_size, -1, self.num_heads * self.d_k)
        return self.out_linear(attention_output)

In [53]:
class FeedForward(nn.Module):
    def __init__(self, d_module, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.lienar2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))

In [54]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.dropout(self.self_attention(x, x, x, mask))
        x = self.layer_norm1(x + attn_output)
        ff_output = self.dropout(self.feed_forward(x))
        x = self.layer_norm2(x + ff_output)
        return x

In [55]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [56]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attetntion = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.layer_norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, tgt_mask = None, memory_mask=None):
        x = self.layer_norm1(x + self.dropout(self.self_attention(x, x, x, tgt_mask)))
        x = self.layer_norm2(x + self.dropout(self.enc_dec_attention(x, enc_output, enc_output, memory_mask)))
        x = self.layer_norm3(x + self.dropout(self.feed_forward(x)))
        return x
        

In [57]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.linear = nn.Linear(d_model, vocab_size)

    def forward(self, x, enc_output, tgt_mask=None, memory_mask=None):
        x= self.embedding(x)
        x= self.pos_encoding(x)
        for layer in self.layers:
            x= layer(x, enc_output, tgt_mask, memory_mask)
        return self.linear(x)

In [58]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_layers, num_heads, d_ff, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, num_layers, num_heads, d_ff, dropout)
        self.decoder = Encoder(src_vocab_size, d_model, num_layers, num_heads, d_ff, dropout)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        enc_output = self.encoder(src, src_mask)
        return self.decoder(tgt, enc_output, tgt_mask, memory_mask)

In [59]:
## Definign the Hyperparameters and the Optimizer

In [60]:
src_vocab_size = 10000       # Size of the source vocabulary
tgt_vocab_size = 10000       # Size of the target vocabulary
d_model = 512                # Embedding dimension
num_layers = 6               # Number of encoder and decoder layers
num_heads = 8                # Number of attention heads
d_ff = 2048                  # Dimension of feed-forward network in each layer
dropout = 0.1                # Dropout rate

# Instantiate the Transformer model with these parameters
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_layers, num_heads, d_ff, dropout)

optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

In [62]:
padding_idx = 0
loss_fn = nn.CrossEntropyLoss(ignore_index=padding_idx)

In [64]:
## define the number of epochs
num_epochs = 5

for epoch in range(num_epochs):
    for batch in train_loader:
        inputs, targets = batch
        # Move to device (CPU, GPU)
        inputs, targets = inputs.to(device), targets.to(device)

        # Forwards pass
        outputs = transformer(inputs)

        # Calculate loss
        loss = loss_fn(outputs.view(-1, vocab_size), targets.view(-1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        scheduler.step()

NameError: name 'train_loader' is not defined

In [None]:
transformer.eval()  # Set model to evaluation mode
with torch.no_grad():
    for batch in val_loader:
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)
        
        outputs = transformer(inputs)
        loss = loss_fn(outputs.view(-1, vocab_size), targets.view(-1))
        # Calculate metrics if needed (e.g., BLEU for translation tasks)