In [94]:
import torch.nn as nn
import torch
print(torch.__version__)

2.0.0+cpu


In [95]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, embed_dim): #vocab_size: size of vocabulary, embed_dim: dimension of embeddings
        super(Embedding, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
    def forward(self, x):
        """
        Args:
            x: input vector
        Returns:
            out: embedding vector
        """
        out = self.embed(x)
        return out

In [96]:
class PositionalEmbedding(nn.Module):
    def __init__(self,max_seq_len,embed_model_dim):#seq_len: length of input sequence,embed_model_dim: demension of embedding
        super(PositionalEmbedding, self).__init__()
        self.embed_dim = embed_model_dim

        pe = torch.zeros(max_seq_len,self.embed_dim)
        for pos in range(max_seq_len):
            for i in range(0,self.embed_dim,2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i)/self.embed_dim)))
                pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1))/self.embed_dim)))
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)


    def forward(self, x):
        """
        Args:
            x: input vector
        Returns:
            x: output
        """

        # make embeddings relatively larger
        x = x * math.sqrt(self.embed_dim)
        #add constant to embedding
        seq_len = x.size(1)
        x = x + torch.autograd.Variable(self.pe[:,:seq_len], requires_grad=False)
        return x



In [97]:
import torch
import torch.nn as nn
import torch.nn.functional as func
import math

def scaled_dot_product(Q,K,V):
    d_k=Q.size()[-1]
    scaled=torch.matmul(Q,K.transpose(-1,-2))/math.sqrt(d_k)
    
    mask=torch.full(scaled.size(),float('-inf'))
    mask=torch.triu(mask,diagonal=1)
    attention = func.softmax(scaled+mask,dim=-1)
#     print(attention.size())
#     print(V.size())
    output=torch.matmul(attention,V)
    print(output.size())
    return output,attention

class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,num_heads):
        super().__init__()
        
        self.d_model=d_model
        self.num_heads=num_heads
        self.head_dim=d_model//num_heads
        self.qkv_layer=nn.Linear(d_model,3*d_model)
        self.linear_layer=nn.Linear(d_model,d_model)  # (num_heads*d_v x d_model)
        
    def forward(self,x):
        batch_size,n,d_model=x.size()
        print(f"x.size(): {x.size()}")  
        qkv=self.qkv_layer(x)  # (n x 3*d_model)
        print(f"qkv.size(): {qkv.size()}")
        qkv=self.qkv_layer(x)  # (n x 3*d_model)
        print(f"qkv.size(): {qkv.size()}")
        qkv=qkv.reshape(batch_size,self.num_heads,n,3*self.head_dim) # (n x d_model/num_heads) i.e per head
        print(f"qkv.size(): {qkv.size()} per head")
        
        
        q,k,v=qkv.chunk(3,dim=-1)
        print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}")  # (n x d_model/(3*num_heads))
              
        output,attn=scaled_dot_product(q,k,v)
        print(f"output.size(): {output.size()}, attention.size:{ attn.size()}")
#         attn : (n x n), output : (n x d_v)
        
        output = output.reshape(batch_size, n, self.num_heads * self.head_dim)
        print(f"output.size(): {output.size()}") # output : (n x num_heads*d_v) ..concatenating all heads
        
        out = self.linear_layer(output)
        print(f"out.size(): {out.size()}")  # (n x d_model)
              
        return out

In [98]:
# d_model = 512
# num_heads = 8

# batch_size = 30
# sequence_length = 5
# x = torch.randn((batch_size, sequence_length, d_model_dim))

# model = MultiHeadAttention(d_model, num_heads)
# out = model.forward(x)

In [99]:
class LayerNorm1d: 
  
  def __init__(self, dim, eps=1e-5, momentum=0.1):
    self.eps = eps
    self.gamma = torch.ones(dim)
    self.beta = torch.zeros(dim)
  
  def __call__(self, x):
    # calculate the forward pass
    xmean = x.mean(1, keepdim=True) # batch mean
    xvar = x.var(1, keepdim=True) # batch variance
    xhat = (x - xmean) / torch.sqrt(xvar + self.eps) # normalize to unit variance
    self.out = self.gamma * xhat + self.beta
    return self.out
def parameters(self):
    return [self.gamma, self.beta]


In [100]:
class FeedForward(nn.Module):
    # a simple linear layer followed by a non-linearity

    def __init__(self, n_embd, hidden):
        super(FeedForward, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * hidden),  # Updated to use 'hidden'
            nn.ReLU(),
            nn.Linear(4 * hidden, n_embd),  # Updated to use 'hidden'
            nn.Dropout(p=0.2),
        )

    def forward(self, x):
        return self.net(x)


In [101]:
class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNorm1d(dim=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = FeedForward(n_embd = d_model,hidden=ffn_hidden)
        self.norm2 = LayerNorm1d(dim=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, y):
        _y = y 
        print("MASKED MULTIHEAD ATTENTION")
        y = self.self_attention(y) 
        print("DROP OUT")
        y = self.dropout1(y) 
        print("ADD + NORM")
        y =  self.norm1(y + _y) 

        
        _y = y  
        print("FEED FORWARD")
        y = self.ffn(y) 
        print("DROP OUT")
        y = self.dropout2(y) 
        print("ADD + NORM")
        y = self.norm2(y + _y) 
        return y 

class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y = inputs
        for module in self._modules.values():
            y = module(x, y)
        return y
class Decoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers=1):
        super().__init__()
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) 
                                          for _ in range(num_layers)])

    def forward(self, x, y):
        y = self.layers(x, y)
        return y 


In [102]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

x = torch.randn( (batch_size, max_sequence_length, d_model) )  
y = torch.randn( (batch_size, max_sequence_length, d_model) ) 

decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
out = decoder(x, y)

out.shape

MASKED MULTIHEAD ATTENTION
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 8, 200, 192]) per head
q size: torch.Size([30, 8, 200, 64]), k size: torch.Size([30, 8, 200, 64]), v size: torch.Size([30, 8, 200, 64])
torch.Size([30, 8, 200, 64])
output.size(): torch.Size([30, 8, 200, 64]), attention.size:torch.Size([30, 8, 200, 200])
output.size(): torch.Size([30, 200, 512])
out.size(): torch.Size([30, 200, 512])
DROP OUT
ADD + NORM
FEED FORWARD
DROP OUT
ADD + NORM
MASKED MULTIHEAD ATTENTION
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 8, 200, 192]) per head
q size: torch.Size([30, 8, 200, 64]), k size: torch.Size([30, 8, 200, 64]), v size: torch.Size([30, 8, 200, 64])
torch.Size([30, 8, 200, 64])
output.size(): torch.Size([30, 8, 200, 64]), attention.size:torch.Size([30, 8, 200, 200])
output.size(

torch.Size([30, 200, 512])