In [None]:
# Import necessary libraries
import torch
import torch.nn as nn
import math

##Implementing the Transformer Encoder Layer

In [None]:
# Define the Transformer Encoder Layer
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)



        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        ######

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        # Self-attention layer
        src2 = self.self_attn(src, src, src, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src = self.norm1(src)


        src2 = self.linear2(self.dropout(torch.relu(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        ######

        return src

##Testing the Transformer Encoder Layer

In [None]:
# Test the TransformerEncoderLayer
# Parameters
d_model = 512
nhead = 8
dim_feedforward = 2048
dropout = 0.1

# Create an instance of the TransformerEncoderLayer
encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)

# Dummy input data (sequence length, batch size, embedding size)
seq_length = 10
batch_size = 2
dummy_input = torch.rand(seq_length, batch_size, d_model)

# Forward pass
output = encoder_layer(dummy_input)

print(f"Output shape: {output.shape}")

Output shape: torch.Size([10, 2, 512])


##Building the Full Transformer Encoder

In [None]:
# Define the Transformer Encoder
class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super(TransformerEncoder, self).__init__()
        self.layers = nn.ModuleList([encoder_layer for _ in range(num_layers)])
        self.num_layers = num_layers

    def forward(self, src, mask=None, src_key_padding_mask=None):
        output = src

        for mod in self.layers:
            output = mod(output, src_mask=mask,
                         src_key_padding_mask=src_key_padding_mask)

        return output

##Testing the Transformer Encoder

In [None]:
# Test the TransformerEncoder
num_layers = 6

# Initialize the TransformerEncoder
transformer_encoder = TransformerEncoder(encoder_layer, num_layers)

# Dummy input data remains the same
# Forward pass
output = transformer_encoder(dummy_input)

print(f"Output shape after TransformerEncoder: {output.shape}")

Output shape after TransformerEncoder: torch.Size([10, 2, 512])


##Implementing Positional Encoding

In [None]:
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)


        # max_len is the maximum length of the input sequence (ie)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)


        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)


    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

### Test the PositionalEncoding

In [None]:
# Test the PositionalEncoding
pos_encoder = PositionalEncoding(d_model, dropout)

# Dummy input embeddings (sequence length, batch size, embedding size)
dummy_embeddings = torch.zeros(seq_length, batch_size, d_model)

# Forward pass
pos_encoded_embeddings = pos_encoder(dummy_embeddings)

print(f"Positional Encoded Embeddings shape: {pos_encoded_embeddings.shape}")

Positional Encoded Embeddings shape: torch.Size([10, 2, 512])


##Assembling the Complete Transformer Model

In [None]:
# Complete Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, ntoken, d_model, nhead, dim_feedforward, num_layers, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.src_mask = None

        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        #####  Initialize the weights of the model #####
        # Hint: Initialize the embedding and decoder weights uniformly, and set decoder biases to zero.
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)


    def forward(self, src, src_mask=None):
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output

##Example usage

In [None]:
# Example usage
ntokens = 1000  # Size of vocabulary
d_model = 512   # Embedding size
nhead = 8       # Number of attention heads
dim_feedforward = 2048  # Feedforward network hidden layer size
num_layers = 6  # Number of encoder layers
dropout = 0.2   # Dropout rate

model = TransformerModel(ntokens, d_model, nhead, dim_feedforward, num_layers, dropout)

# Dummy input data (sequence length, batch size)
batch_size = 32
seq_length = 35
input_data = torch.randint(0, ntokens, (seq_length, batch_size))

# Forward pass
output = model(input_data)
print(f"Output shape: {output.shape}")

Output shape: torch.Size([35, 32, 1000])


##Implementing the Transformer Decoder Layer

In [None]:
# Define the Transformer Decoder Layer
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super(TransformerDecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        #  Define the feedforward neural network layers
        # Hint: Similar to the encoder layer, use two linear layers with a ReLU activation and dropout in between.
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)


        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        # Self-attention layer
        tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask,
                              key_padding_mask=tgt_key_padding_mask)[0]
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)

        #  Implement the cross-attention layer
        # Hint: Use multihead attention where the query is the decoder input and the key and value are the encoder output.
        tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)

        # Feedforward layer
        tgt2 = self.linear2(self.dropout(torch.relu(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)

        return tgt

##Implementing the Transformer Decoder Layer2

In [None]:
# Define the Transformer Decoder Layer
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super(TransformerDecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        #  Define the feedforward neural network layers
        # Hint: Similar to the encoder layer, use two linear layers with a ReLU activation and dropout in between.
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        # Self-attention layer
        tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask,
                              key_padding_mask=tgt_key_padding_mask)[0]
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)

        # TODO: Implement the cross-attention layer
        # Hint: Use multihead attention where the query is the decoder input and the key and value are the encoder output.
        tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)

        # Feedforward layer
        tgt2 = self.linear2(self.dropout(torch.relu(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)

        return tgt

##Test the TransformerDecoderLayer

In [None]:
# Test the TransformerDecoderLayer
# Parameters
d_model = 512
nhead = 8
dim_feedforward = 2048
dropout = 0.1

# Create an instance of the TransformerDecoderLayer
decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)

# Dummy input data for the decoder (target sequence length, batch size, embedding size)
tgt_seq_length = 12
batch_size = 2
dummy_tgt = torch.rand(tgt_seq_length, batch_size, d_model)

# Dummy memory from the encoder (source sequence length, batch size, embedding size)
memory = torch.rand(seq_length, batch_size, d_model)

# Forward pass
output = decoder_layer(dummy_tgt, memory)

print(f"Output shape: {output.shape}")

Output shape: torch.Size([12, 2, 512])


## Building the Full Transformer Decoder

In [None]:
# Define the Transformer Decoder
class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers):
        super(TransformerDecoder, self).__init__()
        self.layers = nn.ModuleList([decoder_layer for _ in range(num_layers)])
        self.num_layers = num_layers

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        output = tgt

        for mod in self.layers:
            output = mod(output, memory, tgt_mask=tgt_mask,
                         memory_mask=memory_mask,
                         tgt_key_padding_mask=tgt_key_padding_mask,
                         memory_key_padding_mask=memory_key_padding_mask)

        return output

##Test the TransformerDecoder

In [None]:
# Test the TransformerDecoder
num_layers = 6

# Initialize the TransformerDecoder
transformer_decoder = TransformerDecoder(decoder_layer, num_layers)

# Dummy input data remains the same
# Forward pass
output = transformer_decoder(dummy_tgt, memory)

print(f"Output shape after TransformerDecoder: {output.shape}")

Output shape after TransformerDecoder: torch.Size([12, 2, 512])


## Updating the Transformer Model with Decoder

In [None]:
# Complete Transformer Model with Encoder and Decoder
class TransformerModel(nn.Module):
    def __init__(self, src_ntoken, tgt_ntoken, d_model, nhead, dim_feedforward, num_layers, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.src_mask = None
        self.tgt_mask = None

        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.pos_decoder = PositionalEncoding(d_model, dropout)

        encoder_layers = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers)

        decoder_layers = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.transformer_decoder = TransformerDecoder(decoder_layers, num_layers)

        self.src_encoder = nn.Embedding(src_ntoken, d_model)
        self.tgt_encoder = nn.Embedding(tgt_ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, tgt_ntoken)

        self.init_weights()

    def init_weights(self):
        # Initialize the weights of the model
        initrange = 0.1
        self.src_encoder.weight.data.uniform_(-initrange, initrange)
        self.tgt_encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def generate_square_subsequent_mask(self, sz):
        """Generate a square mask for the sequence. Mask out future positions."""
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        src_emb = self.src_encoder(src) * math.sqrt(self.d_model)
        src_emb = self.pos_encoder(src_emb)
        memory = self.transformer_encoder(src_emb, src_mask)

        tgt_emb = self.tgt_encoder(tgt) * math.sqrt(self.d_model)
        tgt_emb = self.pos_decoder(tgt_emb)
        output = self.transformer_decoder(tgt_emb, memory, tgt_mask=tgt_mask, memory_mask=memory_mask)
        output = self.decoder(output)
        return output

##Testing the Complete Transformer Model with Decoder

In [None]:
# Example usage
src_ntokens = 1000  # Size of source vocabulary
tgt_ntokens = 1000  # Size of target vocabulary
d_model = 512       # Embedding size
nhead = 8           # Number of attention heads
dim_feedforward = 2048  # Feedforward network hidden layer size
num_layers = 6      # Number of encoder and decoder layers
dropout = 0.2       # Dropout rate

model = TransformerModel(src_ntokens, tgt_ntokens, d_model, nhead, dim_feedforward, num_layers, dropout)

# Dummy input data
batch_size = 32
src_seq_length = 35
tgt_seq_length = 30
src_input = torch.randint(0, src_ntokens, (src_seq_length, batch_size))
tgt_input = torch.randint(0, tgt_ntokens, (tgt_seq_length, batch_size))

# Generate masks
# TODO: Generate the target mask to prevent the decoder from attending to future positions
# Hint: Use the generate_square_subsequent_mask method provided in the model
tgt_mask = model.generate_square_subsequent_mask(tgt_seq_length)

# Forward pass
output = model(src_input, tgt_input, tgt_mask=tgt_mask)
print(f"Output shape: {output.shape}")

Output shape: torch.Size([30, 32, 1000])
