In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Tuple


In [6]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, dropout_rate=0.):
        super(ScaledDotProductAttention, self).__init__()
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, Q, K, V):
        d_k = K.size(-1)  # Dimension of the key vectors
        
        # Compute the scaled dot-product attention scores
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32))
        
        # Apply softmax to get the attention weights
        attention_weights = F.softmax(scores, dim=-1)
        
        # Apply dropout to the attention weights
        attention_weights = self.dropout(attention_weights)
        
        # Compute the final output by multiplying the attention weights with the value matrix
        outputs = torch.matmul(attention_weights, V)
        
        return outputs, attention_weights


In [33]:
class MultiHeadAttention(nn.Module):
    """Multi-head attention module.
    """
    def __init__(self, num_heads, d_model, dropout_rate=0.):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_model // num_heads

        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # separate projections for each head
        self.W_q = nn.ModuleList([nn.Linear(d_model, self.d_k) for _ in range(num_heads)])
        self.W_k = nn.ModuleList([nn.Linear(d_model, self.d_k) for _ in range(num_heads)])
        self.W_v = nn.ModuleList([nn.Linear(d_model, self.d_k) for _ in range(num_heads)])
        
        # output projection
        self.W_o = nn.Linear(d_model, d_model)
        
        self.attention = ScaledDotProductAttention(dropout_rate)


    def forward(self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor) -> Tuple[torch.Tensor, List[torch.Tensor]]:
        batch_size, seq_len, _ = Q.size()
        
        # checkinput dimensions
        assert Q.size() == K.size() == V.size(), f"Dimensions of Q, K, V must be the same. Got Q: {Q.size()}, K: {K.size()}, V: {V.size()}"
        assert Q.size(-1) == self.d_model, f"Input dimension must be {self.d_model}. Got {Q.size(-1)}"

        Q_heads = [self.W_q[i](Q) for i in range(self.num_heads)]
        K_heads = [self.W_k[i](K) for i in range(self.num_heads)]
        V_heads = [self.W_v[i](V) for i in range(self.num_heads)]

        # print out the size of the weights matrices W_q, W_k, W_v
        print("Size of W_q: ", self.W_q[7].weight.size())
        print("Size of W_k: ", self.W_k[0].weight.size())
        print("Size of W_v: ", self.W_v[0].weight.size())
    
        
        
        
        # check projected dimensions
        for i, (Q_h, K_h, V_h) in enumerate(zip(Q_heads, K_heads, V_heads)):
            assert Q_h.size() == K_h.size() == V_h.size() == (batch_size, seq_len, self.d_k), \
                f"Projected dimension for head {i} is incorrect. Expected {(batch_size, seq_len, self.d_k)}, got Q: {Q_h.size()}, K: {K_h.size()}, V: {V_h.size()}"
        
        outputs = []
        attentions = []
        for Q_h, K_h, V_h in zip(Q_heads, K_heads, V_heads):
            output, attention = self.attention(Q_h, K_h, V_h)
            outputs.append(output)
            attentions.append(attention)
        
        # check attention output dimensions
        for i, output in enumerate(outputs):
            print(output.size())
            assert output.size() == (batch_size, seq_len, self.d_k), \
                f"Attention output dimension for head {i} is incorrect. Expected {(batch_size, seq_len, self.d_k)}, got {output.size()}"
        
        output = torch.cat(outputs, dim=-1)

        print(output.size())
        
        # check concatenated output dimension
        assert output.size() == (batch_size, seq_len, self.d_model), \
            f"Concatenated output dimension is incorrect. Expected {(batch_size, seq_len, self.d_model)}, got {output.size()}"
        
        output = self.W_o(output)
        
        # Check final output dimension
        assert output.size() == (batch_size, seq_len, self.d_model), \
            f"Final output dimension is incorrect. Expected {(batch_size, seq_len, self.d_model)}, got {output.size()}"
        
        return output, attentions


In [32]:

# Example usage:
batch_size = 32
seq_len = 29
d_k = 128

query = torch.randn(batch_size, seq_len, d_k)
key = torch.randn(batch_size, seq_len, d_k)
value = torch.randn(batch_size, seq_len, d_k)

attention = ScaledDotProductAttention()
output, weights = attention(query, key, value)

print("Output shape:", output.shape)
print("Attention weights shape:", weights.shape)

MHA = MultiHeadAttention(num_heads=8, d_model=128)
output, weights = MHA(query, key, value)

print("Output shape:", output.shape)
print("Attention weights shape:", weights[0].shape)


Output shape: torch.Size([32, 29, 128])
Attention weights shape: torch.Size([32, 29, 29])
Size of W_q:  torch.Size([16, 128])
Size of W_k:  torch.Size([16, 128])
Size of W_v:  torch.Size([16, 128])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 16])
torch.Size([32, 29, 128])
Output shape: torch.Size([32, 29, 128])
Attention weights shape: torch.Size([32, 29, 29])


In [15]:
import torch
import pytest


@pytest.fixture
def mha_model():
    return MultiHeadAttention(num_heads=8, d_model=128, dropout_rate=0.1)

def test_correct_input(mha_model):
    batch_size, seq_len, d_model = 32, 29, 128
    Q = K = V = torch.randn(batch_size, seq_len, d_model)
    output, attentions = mha_model(Q, K, V)
    
    assert output.shape == (batch_size, seq_len, d_model), f"Expected output shape {(batch_size, seq_len, d_model)}, but got {output.shape}"
    assert len(attentions) == 8, f"Expected 8 attention matrices, but got {len(attentions)}"
    assert attentions[0].shape == (batch_size, seq_len, seq_len), f"Expected attention shape {(batch_size, seq_len, seq_len)}, but got {attentions[0].shape}"

def test_incorrect_input_dimension(mha_model):
    batch_size, seq_len, d_model = 32, 29, 256  # Incorrect d_model
    Q = K = V = torch.randn(batch_size, seq_len, d_model)
    with pytest.raises(AssertionError):
        mha_model(Q, K, V)

def test_mismatched_input_shapes(mha_model):
    batch_size, seq_len, d_model = 32, 29, 128
    Q = torch.randn(batch_size, seq_len, d_model)
    K = torch.randn(batch_size, seq_len + 1, d_model)  # Different sequence length
    V = torch.randn(batch_size, seq_len, d_model)
    with pytest.raises(AssertionError):
        mha_model(Q, K, V)

def test_incorrect_batch_size(mha_model):
    batch_size, seq_len, d_model = 64, 29, 128  # Different batch size
    Q = K = V = torch.randn(batch_size, seq_len, d_model)
    output, attentions = mha_model(Q, K, V)
    
    assert output.shape == (batch_size, seq_len, d_model), f"Expected output shape {(batch_size, seq_len, d_model)}, but got {output.shape}"

def test_different_q_k_dimensions(mha_model):
    batch_size, seq_len, d_model = 32, 29, 128
    Q = torch.randn(batch_size, seq_len, d_model)
    K = V = torch.randn(batch_size, seq_len + 5, d_model)  # Different sequence length for K and V
    with pytest.raises(AssertionError):
        mha_model(Q, K, V)


In [1]:
import torch
import torch.nn as nn

class LayerNorm(nn.Module):
    def __init__(self, features: int, eps: float = 1e-8):
        super(LayerNorm, self).__init__()
        self.gamma: nn.Parameter = nn.Parameter(torch.ones(features))
        self.beta: nn.Parameter = nn.Parameter(torch.zeros(features))
        self.eps: float = eps

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        mean: torch.Tensor = x.mean(-1, keepdim=True)
        var: torch.Tensor = x.var(-1, keepdim=True)
        return self.gamma * (x - mean) / torch.sqrt(var + self.eps) + self.beta


In [3]:

layer_norm = LayerNorm(features=128)


x = torch.randn(32, 50, 128)


normalized_x = layer_norm(x)


assert normalized_x.shape == x.shape

# Create a LayerNorm instance
ln = LayerNorm(features=128)

# Create a sample input
x = torch.randn(32, 50, 128)

# Apply LayerNorm
out = ln(x)

# Check output shape
assert out.shape == x.shape

# Check that gamma and beta have the correct shape
assert layer_norm.gamma.shape == (128,)
assert layer_norm.beta.shape == (128,)

print("LayerNorm test passed!")


LayerNorm test passed!


In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000):
        super(PositionalEncoding, self).__init__()
        
        # Create a long enough P matrix
        pe: torch.Tensor = torch.zeros(max_len, d_model)
        position: torch.Tensor = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term: torch.Tensor = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:x.size(0), :]
        return x


In [7]:
import math

# Initialize the PositionalEncoding module
d_model: int = 128
pe_module: PositionalEncoding = PositionalEncoding(d_model)

# Create a sample input tensor
seq_len: int = 129
batch_size: int = 32
x: torch.Tensor = torch.randn(seq_len, batch_size, d_model)

# Apply positional encoding
output: torch.Tensor = pe_module(x)

# Check that the output shape matches the input shape
assert output.shape == x.shape, f"Expected shape {x.shape}, but got {output.shape}"

print("Positional encoding applied successfully!")


Positional encoding applied successfully!


In [9]:
class PositionWiseFeedforward(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout_rate: float = 0.1):
        super(PositionWiseFeedforward, self).__init__()
        
        self.feedforward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model),
            nn.Dropout(dropout_rate) # is dropout needed here?
        )
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # feedforward network and  residual connection
        out = self.feedforward(x) + x
        
        #layer normalization
        out = self.layer_norm(out)
        
        return out


In [10]:
import torch

# Initialize the PositionWiseFeedforward module
d_model: int = 128
d_ff: int = 1204
ffn: PositionWiseFeedforward = PositionWiseFeedforward(d_model, d_ff)

# Create a sample input tensor
batch_size: int = 32
seq_len: int = 29
x: torch.Tensor = torch.randn(batch_size, seq_len, d_model)

# Apply position-wise feedforward network
output: torch.Tensor = ffn(x)

# Check that the output shape matches the input shape
assert output.shape == x.shape, f"Expected shape {x.shape}, but got {output.shape}"

print("Position-wise Feed-Forward Network applied successfully!")


Position-wise Feed-Forward Network applied successfully!
