<a href="https://colab.research.google.com/github/bdtranter/testColab/blob/main/Ben_Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Implement nn.MultiHeadAttention in PyTorch (40 points)

In [1]:
# This is an example of using nn.MultiHeadAttention in PyTorch directly

# Assuming you have your query, key, and value tensors defined
#CHANGED LINE
#CHANGED LINE AGAINN!!!
# Example:
query = torch.randn(10, 32, 128)  # (sequence_length, batch_size, embed_dim)
key = torch.randn(10, 32, 128)
value = torch.randn(10, 32, 128)

mha = nn.MultiheadAttention(embed_dim=128, num_heads=8)

# Compute the attention output
attn_output, _ = mha(query, key, value, need_weights=False)

# attn_output contains the output of the multihead attention
print("Attention Output Shape:", attn_output.shape)  # (sequence_length, batch_size, embed_dim)

NameError: name 'torch' is not defined

In [None]:
# You are required to complete this cell
#
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.query_proj = nn.Linear(embed_dim, embed_dim)
        self.key_proj = nn.Linear(embed_dim, embed_dim)
        self.value_proj = nn.Linear(embed_dim, embed_dim)
        self.output_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(1)

        ##############################TODO (30 points)#######################
        # Write code for linear projections.
        # You only need three lines of code here, for example:
        query = self.query_proj(query)
        key = self.key_proj(key)
        value = self.value_proj(value)

        #####################################################################

        # Split into multiple heads
        query = query.view(query.size(0), batch_size, self.num_heads, self.head_dim).transpose(1, 2)
        key = key.view(key.size(0), batch_size, self.num_heads, self.head_dim).transpose(1, 2)
        value = value.view(value.size(0), batch_size, self.num_heads, self.head_dim).transpose(1, 2)

        # Calculate attention scores
        scores = torch.matmul(query, key.transpose(-2, -1)) / (self.head_dim ** 0.5)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attention_weights = F.softmax(scores, dim=-1)

        ##############################TODO (10 points)#######################
        # Write code to compute the weighted sum of values

        context = torch.matmul(attention_weights, value)
        # Hint: Use attention_weights and value to compute context

        #####################################################################

        # Concatenate heads and project
        context = context.transpose(1, 2).contiguous().view(query.size(0), batch_size, self.embed_dim)
        output = self.output_proj(context)

        return output

# Example Usage
query = torch.randn(10, 32, 128)  # (sequence_length, batch_size, embed_dim)
key = torch.randn(10, 32, 128)
value = torch.randn(10, 32, 128)

mha = MultiHeadAttention(embed_dim=128, num_heads=8)

attn_output = mha(query, key, value)

print("Attention Output Shape:", attn_output.shape)  # (sequence_length, batch_size, embed_dim)

Attention Output Shape: torch.Size([10, 32, 128])


# Implement nn.TransformerEncoderLayer and nn.TransformerDecoderLayer in PyTorch (30 points)

**TransformerEncoderLayer:**

1. **Self-Attention:** Applies multi-head self-attention to the input sequence (`src`).
2. **Add & Norm:** Adds the output of self-attention to the original input and applies layer normalization.
3. **Feedforward:** Passes the normalized output through a feedforward network (two linear layers with an activation function in between).
4. **Add & Norm:** Adds the output of the feedforward network to the input of the feedforward layer and applies layer normalization.

**TransformerDecoderLayer:**

1. **Masked Self-Attention:** Applies masked multi-head self-attention to the decoder input (`tgt`). The mask prevents the decoder from attending to future positions in the sequence.
2. **Add & Norm:** Adds the output of masked self-attention to the original decoder input and applies layer normalization.
3. **Encoder-Decoder Attention:** Applies multi-head attention with the decoder output as query and the encoder output (`memory`) as key and value.
4. **Add & Norm:** Adds the output of encoder-decoder attention to the output of step 2 and applies layer normalization.
5. **Feedforward:** Passes the normalized output through a feedforward network (same as in the encoder layer).
6. **Add & Norm:** Adds the output of the feedforward network to the input of the feedforward layer and applies layer normalization.

**Key Concepts:**

* **Multi-head Attention:** Allows the model to attend to different parts of the input sequence simultaneously.
* **Layer Normalization:** Normalizes the activations within each layer, helping with training stability.
* **Feedforward Network:** Provides non-linearity and feature transformation.
* **Masking:** Prevents the decoder from attending to future positions during training (masked self-attention).

In [None]:
# This is an example of using nn.TransformerEncoderLayer and nn.TransformerDecoderLayer in PyTorch directly

# Assuming you have your query, key, and value tensors defined
# Example:
query = torch.randn(10, 32, 128)  # (sequence_length, batch_size, embed_dim)
key = torch.randn(10, 32, 128)
value = torch.randn(10, 32, 128)

# Define the encoder layer
encoder_layer = nn.TransformerEncoderLayer(d_model=128, nhead=8)

# Define the decoder layer
decoder_layer = nn.TransformerDecoderLayer(d_model=128, nhead=8)

# Pass the query, key, and value through the encoder layer
encoder_output = encoder_layer(query)

# Pass the encoder output and a target sequence through the decoder layer
# Assuming you have a target sequence:
target = torch.randn(10, 32, 128)
decoder_output = decoder_layer(target, encoder_output)


print("Encoder Output Shape:", encoder_output.shape)  # (sequence_length, batch_size, embed_dim)
print("Decoder Output Shape:", decoder_output.shape)  # (sequence_length, batch_size, embed_dim)

Encoder Output Shape: torch.Size([10, 32, 128])
Decoder Output Shape: torch.Size([10, 32, 128])


In [None]:
from torch.nn.modules.transformer import _get_activation_fn

class TransformerEncoderLayer(nn.Module):
    """
    A single Transformer Encoder layer.

    Args:
        d_model: The dimension of the input and output embeddings.
        nhead: The number of attention heads.
        dim_feedforward: The dimension of the feedforward network model.
        dropout: The dropout value.
        activation: The activation function of intermediate layer, relu or gelu.
    """

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu"):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)


    def forward(self, src, src_mask=None):
        """
        Pass the input through the encoder layer.

        Args:
            src: The sequence to the encoder layer (required).
            src_mask: The mask for the src sequence (optional).

        Shape:
            - src: :math:`(S, N, E)`.
            - src_mask: :math:`(S, S)`.

            - Output: :math:`(S, N, E)`.
        """
        ##################################TODO (10 points)#############################
        # Write code to compute src2 as self attention of src
        # Your code should be like:
        src2 = self.self_attn(src, src, src, attn_mask=src_mask)[0]
        # You only need to specify the arguments (replacing ... with concrete arguments) for self.self_attn which is defined in __init__:
        # self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        ################################################################################

        src = src + self.dropout1(src2)
        src = self.norm1(src)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src


class TransformerDecoderLayer(nn.Module):
    """
    A single Transformer Decoder layer.

    Args:
        d_model: The dimension of the input and output embeddings.
        nhead: The number of attention heads.
        dim_feedforward: The dimension of the feedforward network model.
        dropout: The dropout value.
        activation: The activation function of intermediate layer, relu or gelu.
    """

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu"):
        super(TransformerDecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        """
        Pass the inputs (and mask) through the decoder layer.

        Args:
            tgt: The sequence to the decoder layer (required).
            memory: The sequence from the last layer of the encoder (required).
            tgt_mask: The mask for the tgt sequence (optional).

        Shape:
            - tgt: :math:`(T, N, E)`.
            - memory: :math:`(S, N, E)`.
            - tgt_mask: :math:`(T, T)`.
            - Output: :math:`(T, N, E)`.
        """
        ##################################TODO (10 points)#############################
        # Write code to compute tgt2 as self attention of tgt
        # Your code should be like:
        tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask)[0]
        # tgt2 = self.self_attn(...)[0]
        # You only need to specify the arguments (replacing ... with concrete arguments) for self.self_attn which is defined in __init__:
        # self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        ################################################################################
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)
        ##################################TODO (10 points)#############################
        # Write code to compute tgt2 as multi head attention of tgt and memory
        # Your code should be like:
        tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask)[0]
        # You only need to specify the arguments (replacing ... with concrete arguments) for self.multihead_attn which is defined in __init__:
        # self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        ################################################################################
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)
        return tgt

# Assuming you have your input sequence and target sequence
# Example:
src = torch.randn(10, 32, 128)  # (sequence_length, batch_size, embed_dim)
tgt = torch.randn(10, 32, 128)

# Instantiate an encoder layer
encoder_layer = TransformerEncoderLayer(d_model=128, nhead=8)

# Instantiate a decoder layer
decoder_layer = TransformerDecoderLayer(d_model=128, nhead=8)

# Pass the input through the encoder layer
encoder_output = encoder_layer(src)

# Pass the target and encoder output through the decoder layer
decoder_output = decoder_layer(tgt, encoder_output)

# Print the output shapes
print("Encoder output shape:", encoder_output.shape)  # Expected: (10, 32, 128)
print("Decoder output shape:", decoder_output.shape)  # Expected: (10, 32, 128)

Encoder output shape: torch.Size([10, 32, 128])
Decoder output shape: torch.Size([10, 32, 128])


# Example implementation of nn.TransformerEncoder and nn.TransformerDecoder
**TransformerEncoder:**

1. **Initialization:**
   - Takes an `encoder_layer` (an instance of `TransformerEncoderLayer`) as input, which defines the structure of a single encoder layer.
   - `num_layers` specifies the number of encoder layers to stack.
   - `norm` is an optional layer normalization layer applied to the final output.
2. **Forward Pass:**
   - Iterates through the `num_layers` of `encoder_layer` instances.
   - Passes the output of the previous layer as input to the current layer.
   - Applies the optional `norm` layer to the final output.

**TransformerDecoder:**

1. **Initialization:**
   - Takes a `decoder_layer` (an instance of `TransformerDecoderLayer`) as input, which defines the structure of a single decoder layer.
   - `num_layers` specifies the number of decoder layers to stack.
   - `norm` is an optional layer normalization layer applied to the final output.
2. **Forward Pass:**
   - Iterates through the `num_layers` of `decoder_layer` instances.
   - Passes the output of the previous layer, along with the encoder output (`memory`), as input to the current layer.
   - Applies various masks as needed (e.g., `tgt_mask`, `memory_mask`).
   - Applies the optional `norm` layer to the final output.

**Key Concepts:**

- **Stacking Layers:** Both `TransformerEncoder` and `TransformerDecoder` simply stack multiple instances of their respective layer types to create a deeper model.
- **Layer Normalization:** The optional `norm` layer helps stabilize training by normalizing the activations within each layer.
- **Masking:** The decoder utilizes various masks to control the attention mechanism.

In [None]:
from torch.nn.modules.transformer import _get_clones

class TransformerEncoder(nn.Module):
    """
    TransformerEncoder is a stack of N encoder layers.

    Args:
        encoder_layer: an instance of the TransformerEncoderLayer() class (required).
        num_layers: the number of sub-encoder-layers in the encoder (required).
        norm: the layer normalization component (optional).
    """
    def __init__(self, encoder_layer, num_layers, norm=None):
        super(TransformerEncoder, self).__init__()
        self.layers = _get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm

    def forward(self, src, mask=None):
        """
        Pass the input through the encoder layers in turn.

        Args:
            src: the sequence to the encoder (required).
            mask: the mask for the src sequence (optional).

        Shape:
            - src: :math:`(S, N, E)`.
            - mask: :math:`(S, S)`.
            - Output: :math:`(S, N, E)`.
        """
        output = src

        for mod in self.layers:
            output = mod(output, src_mask=mask)

        if self.norm is not None:
            output = self.norm(output)

        return output


class TransformerDecoder(nn.Module):
    """
    TransformerDecoder is a stack of N decoder layers.

    Args:
        decoder_layer: an instance of the TransformerDecoderLayer() class (required).
        num_layers: the number of sub-decoder-layers in the decoder (required).
        norm: the layer normalization component (optional).
    """
    def __init__(self, decoder_layer, num_layers, norm=None):
        super(TransformerDecoder, self).__init__()
        self.layers = _get_clones(decoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        """
        Pass the inputs (and mask) through the decoder layer in turn.

        Args:
            tgt: the sequence to the decoder (required).
            memory: the sequence from the last layer of the encoder (required).
            tgt_mask: the mask for the tgt sequence (optional).
            memory_mask: the mask for the memory sequence (optional).

        Shape:
            - tgt: :math:`(T, N, E)`.
            - memory: :math:`(S, N, E)`.
            - tgt_mask: :math:`(T, T)`.
            - memory_mask: :math:`(T, S)`.
            - Output: :math:`(T, N, E)`.
        """
        output = tgt

        for mod in self.layers:
            output = mod(output, memory, tgt_mask=tgt_mask,
                         memory_mask=memory_mask)

        if self.norm is not None:
            output = self.norm(output)

        return output

# ... (assuming you have src, tgt, encoder_layer, decoder_layer from previous example)

# Instantiate encoder and decoder
encoder = TransformerEncoder(encoder_layer, num_layers=6)
decoder = TransformerDecoder(decoder_layer, num_layers=6)

# Pass through encoder and decoder
encoder_output = encoder(src)
decoder_output = decoder(tgt, encoder_output)

print("Encoder Output Shape (Memory):", encoder_output.shape)  # (sequence_length, batch_size, embed_dim)
print("Decoder Output Shape:", decoder_output.shape)  # (sequence_length, batch_size, embed_dim)

Encoder Output Shape (Memory): torch.Size([10, 32, 128])
Decoder Output Shape: torch.Size([10, 32, 128])


# A simplified Transformer model with masking or padding

In [None]:
import math

class Transformer(nn.Module):
    """
    A simplified Transformer model without masking or padding.

    Args:
        d_model: the number of expected features in the encoder/decoder inputs (default=512).
        nhead: the number of heads in the multiheadattention models (default=8).
        num_encoder_layers: the number of sub-encoder-layers in the encoder (default=6).
        num_decoder_layers: the number of sub-decoder-layers in the decoder (default=6).
        dim_feedforward: the dimension of the feedforward network model (default=2048).
        dropout: the dropout value (default=0.1).
        activation: the activation function of encoder/decoder intermediate layer, relu or gelu (default=relu).
    """

    def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                 activation="relu"):
        super(Transformer, self).__init__()

        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, activation)
        encoder_norm = nn.LayerNorm(d_model)
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, activation)
        decoder_norm = nn.LayerNorm(d_model)
        self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm)

        self._reset_parameters()

        self.d_model = d_model
        self.nhead = nhead

    def forward(self, src, tgt):
        """
        Take in and process source/target sequences without masking or padding.

        Args:
            src: the sequence to the encoder (required).
            tgt: the sequence to the decoder (required).

        Shape:
            - src: :math:`(S, N, E)`.
            - tgt: :math:`(T, N, E)`.
            - output: :math:`(T, N, E)`.

            where S is the source sequence length, T is the target sequence length, N is the
            batch size, E is the feature number
        """

        if src.size(1) != tgt.size(1):
            raise RuntimeError("the batch number of src and tgt must be equal")

        if src.size(2) != self.d_model or tgt.size(2) != self.d_model:
            raise RuntimeError("the feature number of src and tgt must be equal to d_model")

        memory = self.encoder(src)
        output = self.decoder(tgt, memory)
        return output

    def _reset_parameters(self):
        """
        Initiate parameters in the transformer model.
        """
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)


# ... (assuming you have src and tgt from previous examples)

# Instantiate Transformer
transformer_model = Transformer(d_model=128, nhead=8, num_encoder_layers=6, num_decoder_layers=6)

# Pass through Transformer (no masks needed)
output = transformer_model(src, tgt)

print("Output shape:", output.shape)

Output shape: torch.Size([10, 32, 128])
