In [10]:
import torch
import numpy
import torch.nn as nn
import torch.nn.functional as F
import math

In [11]:
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.shape[-1]
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
    if mask != None:
        scaled += mask
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model, 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, sequence_length, d_model = x.shape
        print(f"x.shape: {x.shape}")
        qkv = self.qkv_layer(x)
        print(f"qkv.shape: {qkv.shape}")
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
        print(f"qkv.shape: {qkv.shape}")
        qkv = qkv.permute(0, 2, 1, 3)
        print(f"qkv.shape: {qkv.shape}")
        q, k, v = qkv.chunk(3, dim=-1)
        print(f"q.shape: {q.shape} k.shape: {q.shape}, v.shape: {q.shape}")
        values, attention = scaled_dot_product(q, k, v, mask)
        print(f"values.shape: {values.shape}, attention.shape: {attention.shape}")
        values = values.reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
        print(f"values.shape: {values.shape}")
        out = self.linear_layer(values)
        print(f"out.shape: {out.shape}")
        return out

In [13]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape = parameters_shape
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta = nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, input):
        dims = [-(i+1) for i in range(len(self.parameters_shape))]
        mean = input.mean(dim=dims, keepdim=True)
        print(f"Mean shape: ({mean.shape})")
        var = ((input - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        print(f"Standard deviation shape ({std.shape})")
        y = (input - mean) / std
        out = self.gamma * y + self.beta
        print(f"out shape ({out.shape})")
        return out

In [14]:
class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.q_layer = nn.Linear(d_model, d_model)
        self.kv_layer = nn.Linear(d_model, 2 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, y, mask=None):
        batch_size, sequence_length, d_model = x.shape
        print(f"x.shape: {x.shape}")
        q = self.q_layer(y)
        kv = self.kv_layer(x)
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim)
        print(f"q.shape: {q.shape}")
        print(f"kv.shape: {kv.shape}")
        q = q.permute(0, 2, 1, 3)
        kv = kv.permute(0, 2, 1, 3)
        k, v = kv.chunk(2, dim=-1)
        print(f"q.shape: {q.shape} k.shape: {k.shape}, v.shape: {v.shape}")
        values, attention = scaled_dot_product(q, k, v, mask)
        print(f"values.shape: {values.shape}, attention.shape: {attention.shape}")
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
        print(f"values.shape: {values.shape}")
        out = self.linear_layer(values)
        print(f"out.shape: {out.shape}")
        return out

In [15]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)
    
    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        print(f"FF output shape: {x.shape}")
        return x

In [16]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.cross_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)
    
    def forward(self, x, y, decoder_mask):
        residual_y = y
        print('--- Self Attention ---')
        y = self.self_attention(y, mask)
        print('--- Dropout 1 ---')
        y = self.dropout1(y)
        print('--- Add & Norm 1 ---')
        y = self.norm1(y + residual_y)

        residual_y = y
        print('--- Cross Attention ---')
        y = self.cross_attention(x, y, decoder_mask)
        print('--- Dropout 2 ---')
        y = self.dropout2(y)
        print('--- Add & Norm 2 ---')
        y = self.norm2(y + residual_y)

        residual_y = y
        print('--- Feed Forward ---')
        y = self.ffn(y)
        print('--- Dropout 3 ---')
        y = self.dropout3(y)
        print('--- Add & Norm 3 ---')
        y = self.norm3(y + residual_y)
        return y

In [17]:
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, mask = inputs
        for module in self._modules.values():
            y = module(x, y, mask)
        return y

class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def forward(self, x, y, mask):
        y = self.layers(x, y, mask)
        return y 

### Test with random data

In [18]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

encoder_input = torch.randn((batch_size, max_sequence_length, d_model))
decoder_input = torch.randn((batch_size, max_sequence_length, d_model))
mask = torch.full((max_sequence_length, max_sequence_length), float('-inf'))
mask = torch.triu(mask, diagonal=1)
decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(encoder_input, decoder_input, mask)

--- Self Attention ---
x.shape: torch.Size([30, 200, 512])
qkv.shape: torch.Size([30, 200, 1536])
qkv.shape: torch.Size([30, 200, 8, 192])
qkv.shape: torch.Size([30, 8, 200, 192])
q.shape: torch.Size([30, 8, 200, 64]) k.shape: torch.Size([30, 8, 200, 64]), v.shape: torch.Size([30, 8, 200, 64])
values.shape: torch.Size([30, 8, 200, 64]), attention.shape: torch.Size([30, 8, 200, 200])
values.shape: torch.Size([30, 200, 512])
out.shape: torch.Size([30, 200, 512])
--- Dropout 1 ---
--- Add & Norm 1 ---
Mean shape: (torch.Size([30, 200, 1]))
Standard deviation shape (torch.Size([30, 200, 1]))
out shape (torch.Size([30, 200, 512]))
--- Cross Attention ---
x.shape: torch.Size([30, 200, 512])
q.shape: torch.Size([30, 200, 8, 64])
kv.shape: torch.Size([30, 200, 8, 128])
q.shape: torch.Size([30, 8, 200, 64]) k.shape: torch.Size([30, 8, 200, 64]), v.shape: torch.Size([30, 8, 200, 64])
values.shape: torch.Size([30, 8, 200, 64]), attention.shape: torch.Size([30, 8, 200, 200])
values.shape: torch.Si