In [1]:
import math
import numpy as np
from IPython.display import Image, display
from matplotlib import pyplot as plt

In [2]:
import torch 
import torch.nn as nn
import torch.nn.functional as F

In [3]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.Linear1 = nn.Linear(d_model, hidden) # 512 x 2048
        self.linear2 = nn.Linear(hidden, d_model) # 512 x 2048
        self.relu = nn.ReLU() 
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x): # 30 x 200 x 512
        x = self.Linear1(x) # 30 x 200 x 2048
        print(f"x after first linear layer: {x.size()}")
        x = self.relu(x) # 30 x 200 x 2048
        print(f"x after activation: {x.size()}")
        x = self.dropout(x) # 30 x 200 x 2048
        print(f"x after dropout: {x.size()}")
        x = self.linear2(x) # 30 x 200 x 512
        print(f"x after 2nd linear layer: {x.size()}")
        return x

In [4]:
class LayerNormalization(nn.Module):
    
    def __init__(self, parameter_shape, eps=1e-5):
        super().__init__()
        self.parameter_shape = parameter_shape # 512
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(parameter_shape)) # 512
        self.beta = nn.Parameter(torch.zeros(parameter_shape)) # 512

    def forward(self, inputs): # inputs = (30 x 200 x 512)
        dims = [-(i+1) for i in range(len(self.parameter_shape))] # Layer normalization is performed on the last dim: [-1]
        print(f"dims: {dims}")
        mean = inputs.mean(dim=dims, keepdim=True) # 30 x 200 x 1
        print(f"Mean ({mean.size()})")
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True) # 30 x 200 x 1
        std = (var + self.eps).sqrt() # 30 x 200 x 1
        print(f"Standard Deviation  ({std.size()})")
        yPred = (inputs - mean) / std # 300 x 200 x 512
        print(f"yPred: {yPred.size()}")
        out = self.gamma * yPred + self.beta # 300 x 200 x 512
        print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
        print(f"out: {out.size()}")
        return out

In [5]:
def scaled_dot_product(q, k, v, mask=None):
    # q, k, v = each tensor is of shape (30 x 8 x 200 x 64)
    # mask is (200 x 200)

    d_k = q.size()[-1] #64. size of each attention head.
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k) # 30 x 8 x 200 x 200
    print(f"scaled.size() : {scaled.size()}")

    if mask is not None:
        print(f"-- ADDING MASK of shape {mask.size()} --") 
        scaled = scaled + mask # Broadcasting to add. So just the last N dimensions need to match. (30 x 8 x 200 x 200)
    
    attention = F.softmax(scaled, dim=-1) # (30 x 8 x 200 x 200)
    values = torch.matmul(attention, v) # (30 x 8 x 200 x 64)
    return values, attention

class MultiHeadCrossAttention(nn.Module):

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model #512
        self.num_heads = num_heads #8
        self.head_dim = d_model // num_heads #64
        self.kv_layer = nn.Linear(d_model, 2 * d_model) # 512 x 1024
        self.q_layer = nn.Linear(d_model, d_model) # 512 x 512
        self.linear_layer = nn.Linear(d_model, d_model) # 512 * 512

    def forward(self, x, y, mask=None):
        batch_size, sequence_length, d_model = x.size() # 30 x 200 x 512
        print(f"x.size(): {x.size()}")
        kv = self.kv_layer(x) # 30 x 200 x 1024
        print(f"kv.size(): {kv.size()}")
        q = self.q_layer(y) # 30 x 200 x 512
        print(f"q.size(): {q.size()}")

        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim) # 30 x 200 x 8 x 128
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim) # 30 x 200 x 8 x 64
        kv = kv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 128
        q = q.permute(0, 2, 1, 3) # 30 x 8 x 200 x 64
        k, v = kv.chunk(2, dim=-1) # 30 x 8 x 200 x 64 for both k and v.
        print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")

        values, attention = scaled_dot_product(q, k, v, mask) # Look at scaled_dot_product function for tensor sizes.
        print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")
        values = values.reshape(batch_size, sequence_length, self.num_heads * self.head_dim) # 30 x 200 x 512
        print(f"values.size(): {values.size()}")
        out = self.linear_layer(values) # 30 x 200 x 512
        print(f"out.size(): {out.size()}")
        return out # 30 x 200 x 512


class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model #512
        self.num_heads = num_heads #8
        self.head_dim = d_model // num_heads #64
        self.qkv_layer = nn.Linear(d_model, 3 * d_model) # 512 x 1536
        self.linear_layer = nn.Linear(d_model, d_model) # 512 * 512

    def forward(self, x, mask=None):
        batch_size, sequence_length, d_model = x.size() # 30 x 200 x 512
        print(f"x.size(): {x.size()}")
        qkv = self.qkv_layer(x) # 30 x 200 x 1536
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim) # 30 * 200 * 8 * 192 (breaking it into 8 heads)
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 192
        print(f"qkv.size(): {qkv.size()}")
        q, k, v = qkv.chunk(3, dim=-1) # each chunk is broken based on the last dim creating 3 tensors of (30 x 8 x 200 x 64)
        print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")
        values, attention = scaled_dot_product(q, k, v, mask) # Look at scaled_dot_product function for tensor sizes.
        print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")
        values = values.reshape(batch_size, sequence_length, self.num_heads * self.head_dim) # 30 x 200 x 512
        print(f"values.size(): {values.size()}")
        out = self.linear_layer(values) # 30 x 200 x 512
        print(f"out.size(): {out.size()}")
        return out # 30 x 200 x 512

In [6]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameter_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = LayerNormalization(parameter_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNormalization(parameter_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, decoder_mask):
        _y = y.clone()
        print("------- MASKED SELF ATTENTION ------")
        y = self.self_attention(y, mask=decoder_mask) # 30 x 200 x 512
        print("------- DROPOUT 1 ------")
        y = self.dropout1(y) # 30 x 200 x 512
        print("------- ADD AND LAYER NORMALIZATION 1 ------")
        y = self.norm1(y + _y) # 30 x 200 x 512

        _y = y.clone() # 30 x 200 x 512
        print("------- CROSS ATTENTION ------")
        y = self.encoder_decoder_attention(x, y, mask=None) # 30 x 200 x 512
        print("------- DROPOUT 2 ------")
        y = self.dropout2(y) # 30 x 200 x 512
        print("------- ADD AND LAYER NORMALIZATION 2 ------")
        y = self.norm2(y + _y) # 30 x 200 x 512

        _y = y.clone() # 30 x 200 x 512
        print("------- FEED FORWARD 1 ------")
        y = self.ffn(y)
        print("------- DROPOUT 3 ------")
        y = self.dropout3(y) # 30 x 200 x 512
        print("------- ADD AND LAYER NORMALIZATION 3 ------")
        y = self.norm3(y + _y) # 30 x 200 x 512

        return y

In [7]:
# Defining custom sequentialDecoder class as the in-built nn.Sequential class
# only accepts one input. In our case, we wanted to pass 3 inputs (x, y, and mask).
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, mask = inputs
        for module in self._modules.values():
            y = module(x, y, mask) # 30 x 200 x 512
        return y

class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers=1):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)])
        
    def forward(self, x, y, mask):
        #x: 30 x 200 x 512
        #y: 30 x 200 x 512
        #mask: 200 x 200 (max-sequence-length x max-sequence-length)
        y = self.layers(x, y, mask)
        return y # 30 x 200 x 512

In [8]:
d_model = 512 # Based on paper
num_heads = 8 # Based on paper

drop_prob = 0.1 # Helps with regularizatiion and model performance.
batch_size = 30 # Heps with gradient descent. Mini-batch gradient descent.

max_sequence_length = 200 # Largest number of words/tokens that will be passed in/out.
ffn_hidden = 2048 # Hyper-parameter mentioned in the paper.
num_layers = 5 # Number of transformer layers. For example, 5 encoder & decoder layers.

In [9]:
x = torch.randn( (batch_size, max_sequence_length, d_model) ) # English sentence positional encoded.
y = torch.randn( (batch_size, max_sequence_length, d_model) ) # Italian sentence positional encoded.

mask = torch.full([max_sequence_length, max_sequence_length], float('-inf'))
mask = torch.triu(mask, diagonal=1)

In [10]:
decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(x, y, mask)

------- MASKED SELF ATTENTION ------
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
qkv.size(): torch.Size([30, 8, 200, 192])
q size: torch.Size([30, 8, 200, 64]), k size: torch.Size([30, 8, 200, 64]), v size: torch.Size([30, 8, 200, 64]), 
scaled.size() : torch.Size([30, 8, 200, 200])
-- ADDING MASK of shape torch.Size([200, 200]) --
values.size(): torch.Size([30, 8, 200, 64]), attention.size:torch.Size([30, 8, 200, 200]) 
values.size(): torch.Size([30, 200, 512])
out.size(): torch.Size([30, 200, 512])
------- DROPOUT 1 ------
------- ADD AND LAYER NORMALIZATION 1 ------
dims: [-1]
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
yPred: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 200, 512])
------- CROSS ATTENTION ------
x.size(): torch.Size([30, 200, 512])
kv.size(): torch.Size([30, 200, 1024])
q.size(): torch.Size([30, 