In [25]:
import torch

In [26]:
class PositionalEncoder(torch.nn.Module):
    def __init__(self, max_seq_len, d_model):
        super().__init__()
        self.max_seq_len = max_seq_len
        self.d_model = d_model

        self.pe = torch.zeros(self.max_seq_len, self.d_model)

        # positional encoder 
        for j in range(max_seq_len):
            for i in range(0, d_model):
                if i%2 == 0:
                    k = i//2
                    wk = 1 / (10000**((2*k)/d_model))
                    self.pe[j][i] = torch.sin(torch.tensor(wk*j))
                else:
                    self.pe[j][i] = torch.cos(torch.tensor(wk*j))

        self.pe = self.pe.detach()

    def forward(self, x):
        if len(x.shape) == 2:
            x = x.unsqueeze(0)

        if x.shape[1]>self.max_seq_len:
            raise Exception("Number of tokens exceeds max_seq_len")
        
        if x.shape[2] != self.d_model:
            raise Exception("Token dimension do not match model dimension")

        x = x + self.pe
        return x

In [27]:
class FeedForwardNN(torch.nn.Module):
    def __init__(self, d_model, d_ff):
        self.d_model = d_model
        self.d_ff = d_ff

        self.ff1 = torch.nn.Linear(self.d_model, self.d_ff)
        self.ff2 = torch.nn.Linear(self.d_ff, self.d_model)

    def forward(self, x):
        if len(x.shape) == 2:
            x = x.unsqueeze(0)
            print("Warning: batch size not present")

        if x.shape[2]!=self.d_model:
            raise Exception("Token dimension do not match model dimension")
        
        x = self.ff1(x)
        x = self.ff2(x)

        return x

In [28]:
class LayerNorm(torch.nn.Module):
    def __init__(self, d_model):
        self.d_model = d_model
        self.layerNorm = torch.nn.LayerNorm(d_model)

    def forward(self, x):
        x = self.layerNorm(x)
        return x

In [34]:
class SelfAttentionHead(torch.nn.Module):
    def __init__(self, d_model, d_k, max_seq_len):
        super().__init__()
        self.d_model = d_model
        self.d_k = d_k
        self.max_seq_len = max_seq_len

        self.W_q = torch.nn.Linear(in_features=d_model, out_features=d_k)
        self.W_k = torch.nn.Linear(in_features=d_model, out_features=d_k)
        self.W_v = torch.nn.Linear(in_features=d_model, out_features=d_k)

        self.W_O = torch.nn.Linear(in_features = d_k, out_features=d_model)
        
    def forward(self, Q, K, V, mask = False):
        if len(Q.shape)==2:
            Q.unsqueeze(0)
            print("Warning: batch size not present")

        if len(K.shape)==2:
            K.unsqueeze(0)
            print("Warning: batch size not present")

        if len(V.shape)==2:
            V.unsqueeze(0)
            print("Warning: batch size not present")

        if Q.shape[1] > self.max_seq_len or V.shape[1]> self.max_seq_len or K.shape[1]>self.max_seq_len:
            raise Exception("Number of tokens exceed max sequence length")

        if Q.shape[2] != self.d_model or K.shape[2] != self.d_model or V.shape[2]!=self.d_model:
            raise Exception("Tokens dimension do not match model dimension")

        Q = self.W_q(Q)
        K = self.W_k(K)
        V = self.W_v(V)

        logits = (torch.matmul(Q, torch.transpose(K, 1, 2)) )/ (self.d_k ** 0.5)
        if mask is True:
            ones = torch.ones(Q.shape[0], Q.shape[1], Q.shape[1])
            mask = torch.tril(ones)
            logits = logits.masked_fill(mask == 0, -float('1e9'))

        scores = torch.nn.functional.softmax(logits, dim = 1)
        attention = torch.matmul(scores, V)

        output = self.W_O(attention)
        
        return output

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, num_heads, d_model, d_k, max_seq_len):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k=d_k
        self.max_seq_len = max_seq_len

        self.heads = torch.nn.ModuleList([SelfAttentionHead(d_model=self.d_model, d_k=self.d_k, max_seq_len=self.max_seq_len) for _ in range(num_heads)])

        self.W_O = torch.nn.Linear(num_heads*self.d_model, self.d_model)

    def forward(self, Q, K, V, mask = False):
        if len(Q.shape)==2:
            Q.unsqueeze(0)
            print("Warning: batch size not present")

        if len(K.shape)==2:
            K.unsqueeze(0)
            print("Warning: batch size not present")

        if len(V.shape)==2:
            V.unsqueeze(0)
            print("Warning: batch size not present")

        if Q.shape[1] > self.max_seq_len or V.shape[1]> self.max_seq_len or K.shape[1]>self.max_seq_len:
            raise Exception("Number of tokens exceed max sequence length")

        if Q.shape[2] != self.d_model or K.shape[2] != self.d_model or V.shape[2]!=self.d_model:
            raise Exception("Tokens dimension do not match model dimension")

        head_outputs = [head(Q, K, V, mask) for head in self.heads]
        concatenated = torch.cat(head_outputs, dim=-1)
        return self.W_O(concatenated)

In [30]:
class DecoderBlock(torch.nn.Module):
    def __init__(self, num_heads, d_model, d_k, d_ff, max_seq_len):
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_k
        self.d_ff = d_ff
        self.max_seq_len = max_seq_len

        self.multiHeadAttention = MultiHeadAttention(num_heads=self.num_heads, d_model=self.d_model, d_k=self.d_k, max_seq_len=self.max_seq_len)
        self.layerNorm = LayerNorm(d_model=self.d_model)
        self.ffnn = FeedForwardNN(d_model=self.d_model, d_ff=self.d_ff)

    def forward(self, x):
        if len(x.shape) == 2:
            x = x.unsqueeze(0)
            print("Warning: batch size not present")

        if x.shape[2]!=self.d_model:
            raise Exception("Token dimension do not match model dimension")
        
        y = self.layerNorm(x)
        y = self.multiHeadAttention(y)

        x = x+y
        y = x

        y = self.layerNorm(y)
        y = self.ffnn(y)

        x = x+y
        return x

In [37]:
# shape of input tensor: [batch_size, seq_len, d_model]
# seq_len is the number of tokens in each sequence
# d_model is the dimensionality of the embedding
x = torch.randn(2, 3, 4)
heads = MultiHeadAttention(num_heads=1, d_model=4, d_k=5, max_seq_len=10)
head = SelfAttentionHead(d_model=4, d_k=5, max_seq_len=10)
print(head(x, x, x, True))
print("\n")
print(heads(x, x, x, True))

tensor([[[ 0.3365, -0.1356,  0.1461, -0.4321],
         [ 0.1768,  0.0380,  0.2529, -0.6829],
         [ 0.0842,  0.5899,  1.2571, -2.5007]],

        [[ 0.3618, -0.0701,  0.2028, -0.5407],
         [ 0.3962,  0.0548,  0.4298, -0.8842],
         [-0.0091,  0.8302,  1.1952, -2.2897]]], grad_fn=<ViewBackward0>)


tensor([[[-0.5321,  0.2413, -0.3688,  0.0663],
         [-0.3270,  0.1900, -0.4453,  0.1423],
         [-0.1919,  0.1343, -0.3536,  0.0599]],

        [[-0.5773,  0.2365, -0.3365,  0.0230],
         [-0.7091,  0.2350, -0.2842, -0.0582],
         [-0.7269,  0.1400, -0.2344, -0.1868]]], grad_fn=<ViewBackward0>)


In [None]:
import torch


max_seq_length = 3
d_model = 6

my_tensor = torch.ones(4, 3, 6)

pe = torch.zeros(max_seq_length, d_model)

for j in range(max_seq_length):
    for i in range(0, d_model):
        if i%2 == 0:
            k = i//2
            wk = 1 / (10000**((2*k)/d_model))
            pe[j][i] = torch.sin(torch.tensor(wk*j))
        else:
            pe[j][i] = torch.cos(torch.tensor(wk*j))
pe


tensor([[[1.0000, 2.0000, 1.0000, 2.0000, 1.0000, 2.0000],
         [1.8415, 1.5403, 1.0464, 1.9989, 1.0022, 2.0000],
         [1.9093, 0.5839, 1.0927, 1.9957, 1.0043, 2.0000]],

        [[1.0000, 2.0000, 1.0000, 2.0000, 1.0000, 2.0000],
         [1.8415, 1.5403, 1.0464, 1.9989, 1.0022, 2.0000],
         [1.9093, 0.5839, 1.0927, 1.9957, 1.0043, 2.0000]],

        [[1.0000, 2.0000, 1.0000, 2.0000, 1.0000, 2.0000],
         [1.8415, 1.5403, 1.0464, 1.9989, 1.0022, 2.0000],
         [1.9093, 0.5839, 1.0927, 1.9957, 1.0043, 2.0000]],

        [[1.0000, 2.0000, 1.0000, 2.0000, 1.0000, 2.0000],
         [1.8415, 1.5403, 1.0464, 1.9989, 1.0022, 2.0000],
         [1.9093, 0.5839, 1.0927, 1.9957, 1.0043, 2.0000]]])

In [None]:
# NLP Example
batch, sentence_length, embedding_dim = 20, 5, 10
embedding = torch.randn(batch, sentence_length, embedding_dim)
layer_norm = torch.nn.LayerNorm(embedding_dim)
# Activate module
layer_norm(embedding)


In [None]:
x = torch.rand(2,2,3)

ones = torch.ones(x.shape[0], x.shape[1], x.shape[1])
mask = torch.tril(ones)
print(mask)
scores = torch.nn.functional.softmax(mask, dim = 1)
scores

tensor([[[1., 0.],
         [1., 1.]],

        [[1., 0.],
         [1., 1.]]])


tensor([[[0.5000, 0.2689],
         [0.5000, 0.7311]],

        [[0.5000, 0.2689],
         [0.5000, 0.7311]]])