In [30]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [31]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionWiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden) #512 x 2048
        self.linear2 = nn.Linear(hidden, d_model) #2048 x 512
        self.relu = nn.ReLU()       
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):   # 30 x 200 x 512
        x = self.linear1(x) # 30 x 200 x 2048
        x = self.relu(x)    # 30 x 200 x 2048
        x = self.dropout(x) # 30 x 200 x 2048
        x = self.linear2(x) # 30 x 200 x 512
        return x

In [39]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super(LayerNormalization, self).__init__()
        self.parameters_shape=parameters_shape # [512]
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape)) #learnable parameters [512] sd
        self.beta =  nn.Parameter(torch.zeros(parameters_shape)) #learnable parameters [512] mean

    def forward(self, inputs): # 30 x 200 x 512
        dims = [-(i + 1) for i in range(len(self.parameters_shape))] # [-1] last dimension
        mean = inputs.mean(dim=dims, keepdim=True) # mean of all 512 values per word 30 x 200 x 1
        print(f"Mean \n ({mean.size()})") 
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)# variance of all 512 values per word 30 x 200 x 1
        std = (var + self.eps).sqrt() #sd
        print(f"Standard Deviation \n ({std.size()})")
        y = (inputs - mean) / std # 30 x 200 x 512
        print(f"y \n ({y.size()})")
        out = self.gamma * y  + self.beta
        print(f"out \n ({out.size()})")
        return out

In [33]:
def scaled_dot_product(q, k, v, mask = None):
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        scaled += mask
    attention = F.softmax(scaled, dim = -1)
    values = torch.matmul(attention, v)

    return values, attention

class MultiHeadAttention(nn.Module):
    def __init__(self,d_model, num_heads):
        super().__init__()
        self.d_model = d_model  ## attention head embedding size
        assert d_model % num_heads == 0, "d_model should be divisible by num_heads"
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_linear = nn.Linear(d_model, 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    
    def forward(self, x, mask = None):
        batch_size, sequence_length, d_model = x.size() # 30 x 200 x 512
        print(f"x.size(),{x.size()}")
        qkv = self.qkv_linear(x) # (batch_size, seq_len, 3 * d_model) 30 x 200 x1536
        print(f"qkv.size(),{qkv.size()}")
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3*self.head_dim) # 30 x 200 x 8 x 192
        print(f"qkv.size(), {qkv.size()}")
        qkv = qkv.permute(0, 2, 1, 3) # (batch_size, num_heads, seq_len, head_dim) # 30 x 8 x 200 x 192
        print(f"qkv.size(), {qkv.size()}")
        q, k, v = qkv.chunk(3, dim=-1) # 30 x 8 x 200 x 64 each
        print(f"q.size(), {q.size()}", f"k.size(), {k.size()}", f"v.size(), {v.size()}")
        values, attention_weights = scaled_dot_product(q, k, v, mask) # values:  30 x 8 x 200 x 192, mask: 30 x 8 x 200 x 200
        print(f"values.size(), {values.size()}", f"attention_weights.size(), {attention_weights.size()}") 
        values = values.reshape(batch_size, sequence_length, self.d_model) # 30 x 200 x 512
        print(f"values.size(), {values.size()}")
        out = self.linear_layer(values) # 30 x 200 x 512
        print(f"out.size(), {out.size()}")
        return out

In [34]:
class MultiHeadCrossAttention(nn.Module):
    def __init__(self,d_model, num_heads):
        super().__init__()
        self.d_model = d_model  ## attention head embedding size
        assert d_model % num_heads == 0, "d_model should be divisible by num_heads"
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_linear = nn.Linear(d_model, 2 * d_model)
        self.q_linear = nn.Linear(d_model, d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    
    def forward(self, x, y, mask = None):
        batch_size, sequence_length, d_model = x.size() # 30 x 200 x 512
        print(f"x.size(),{x.size()}")
        
        kv = self.kv_linear(x) # (batch_size, seq_len, 2 * d_model) 30 x 200 x 1024
        print(f"qkv.size(),{kv.size()}")
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2*self.head_dim) # 30 x 200 x 8 x 128
        print(f"qkv.size(), {kv.size()}")
        kv = kv.permute(0, 2, 1, 3) # (batch_size, num_heads, seq_len, head_dim) # 30 x 8 x 200 x 128
        print(f"qkv.size(), {kv.size()}")

        q = self.q_linear(y) # (batch_size, seq_len, d_model) 30 x 200 x 512
        print(f"qkv.size(),{q.size()}")
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim) # 30 x 200 x 8 x 64
        print(f"qkv.size(), {kv.size()}")
        q = q.permute(0, 2, 1, 3) # (batch_size, num_heads, seq_len, head_dim) # 30 x 8 x 200 x 64
        print(f"qkv.size(), {q.size()}")

        k, v = kv.chunk(2, dim=-1) # 30 x 8 x 200 x 64 each
        print(f"q.size(), {q.size()}", f"k.size(), {k.size()}", f"v.size(), {v.size()}")

        values, attention_weights = scaled_dot_product(q, k, v, mask) # values:  30 x 8 x 200 x 192, mask: 30 x 8 x 200 x 200
        print(f"values.size(), {values.size()}", f"attention_weights.size(), {attention_weights.size()}") 
        values = values.reshape(batch_size, sequence_length, self.d_model) # 30 x 200 x 512
        print(f"values.size(), {values.size()}")
        out = self.linear_layer(values) # 30 x 200 x 512
        print(f"out.size(), {out.size()}")
        return out

In [41]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = LayerNormalization(parameters_shape = [d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.enc_dec_attention = MultiHeadCrossAttention(d_model, num_heads)
        self.norm2 = LayerNormalization(parameters_shape = [d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)
        self.ffn = PositionWiseFeedForward(d_model, ffn_hidden, drop_prob)
        self.norm3 = LayerNormalization(parameters_shape = [d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)
    
    def forward(self, x, y, decoder_mask):
        _y = y
        print("MASKED SELF ATTENTION")
        y = self.self_attn(y, mask=decoder_mask) # 30 x 200 x 512\n",
        print("DROP OUT 1")
        y = self.dropout1(y) # 30 x 200 x 512\n",
        print("ADD + LAYER NORMALIZATION 1")
        y = self.norm1(y + _y) # 30 x 200 x 512\n",

        _y = y
        print("CROSS ATTENTION")
        y = self.enc_dec_attention(y, x, mask=None) # 30 x 200 x 512\n",
        print("DROP OUT 2")
        y = self.dropout2(y) # 30 x 200 x 512\n",
        print("ADD + LAYER NORMALIZATION 2")
        y = self.norm2(y + _y) # 30 x 200 x 512\n",

        _y = y       # 30 x 200 x 512\n",
        print("FFN")
        y = self.ffn(y) # 30 x 200 x 512\n",
        print("DROP OUT 3")
        y = self.dropout3(y) # 30 x 200 x 512\n",

        return y
        

In [42]:
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, mask = inputs
        for module in self._modules.values():
            y = module(x, y, mask)  # 30 x 200 x 512\n",
            print(f"y = {y.size()}")
        return y
    
class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers = 1):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) 
                                      for _ in range(num_layers)]) # num_layers개의 EncoderLayer를 ���음

    def forward(self, x, y, mask):
        #x 30 x 200 x 512
        #y 30 x 200 x 512
        #mask 200x 200
        return self.layers(x, y, mask) # 30 x 200 x 512\n",

In [43]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden_dim = 2048
num_layers = 5

x = torch.randn((batch_size, max_sequence_length, d_model)) # English sentence positional encoded
y = torch.randn((batch_size, max_sequence_length, d_model)) # Kannada sentence positional encoded
mask = torch.full([max_sequence_length, max_sequence_length], float('-inf'))
mask = torch.triu(mask, diagonal=1) # Lower triangular matrix
decoder = Decoder(d_model, ffn_hidden_dim, num_heads, drop_prob, num_layers)
out = decoder(x, y, mask)

MASKED SELF ATTENTION
x.size(),torch.Size([30, 200, 512])
qkv.size(),torch.Size([30, 200, 1536])
qkv.size(), torch.Size([30, 200, 8, 192])
qkv.size(), torch.Size([30, 8, 200, 192])
q.size(), torch.Size([30, 8, 200, 64]) k.size(), torch.Size([30, 8, 200, 64]) v.size(), torch.Size([30, 8, 200, 64])
values.size(), torch.Size([30, 8, 200, 64]) attention_weights.size(), torch.Size([30, 8, 200, 200])
values.size(), torch.Size([30, 200, 512])
out.size(), torch.Size([30, 200, 512])
DROP OUT 1
ADD + LAYER NORMALIZATION 1
Mean 
 (torch.Size([30, 200, 1]))
Standard Deviation 
 (torch.Size([30, 200, 1]))
y 
 (torch.Size([30, 200, 512]))
out 
 (torch.Size([30, 200, 512]))
CROSS ATTENTION
x.size(),torch.Size([30, 200, 512])
qkv.size(),torch.Size([30, 200, 1024])
qkv.size(), torch.Size([30, 200, 8, 128])
qkv.size(), torch.Size([30, 8, 200, 128])
qkv.size(),torch.Size([30, 200, 512])
qkv.size(), torch.Size([30, 8, 200, 128])
qkv.size(), torch.Size([30, 8, 200, 64])
q.size(), torch.Size([30, 8, 200, 64