In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np


In [None]:
class Embeddings(nn.Module):
    def __init__(self, vocab_size,d_model):
        super().__init__
        self.d_model = d_model
        self.vocab_size = vocab_size 
        self.embedding = nn.Embedding(vocab_size, d_model) #initializes embeddings to be learned to training process
    def forward(self,x):
        return self.embedding(x) * math.sqrt(d_model) #normalize the variance of the embeddings


#if 

#if feature=2i, sin(pos/1000^(2i/d_model), if feature=2i+k, cos(pos/1000^(2i/d_model)
def Positional_Encoding(nn.Module):
    def __init(self,seq_length,d_model,Pdrop):
         super(LayerNormalization,self).__init__()
        self.dropout = nn.Droupout(Pdrop)
        #if you add self before, then it gets learned as a parameter for the model
        pos = torch.arange(0,seq_length).unsqueeze(1) #pos --> seq_length_pos x 1, adds an extra dimension
        div_term = torch.exp(torch.arange(0,d_model,2)*np.log(1000)/d_model) #div term --> 1 x div_term_i
        #so there is a different pe value for each token (pos) and feature (2i,2i+1)
        pe = np.zeros(seq_length, d_model)
        pe[:,0:2] = torch.sin(pe/div_term)
        pe[:,1:2] = torch.cos(pe/div_term)
        pe = pe.unsqueeze(0) # --> (1, seq, d_model)
        self.register_buffer('positional_encoder',pe) #self.positional_encoder is not a learnable parameter

    def forward(self,x):
        x = x + self.positional_encoder(x)
        return self.dropout(x)



class LayerNormalization(nn.Module):
    def __init__(self):
        super(LayerNormalization,self).__init__()
        self.epsilon = 1**-10 #so not divide by 0
        self.a = nn.Parameter(torch.ones(1)) #1 dimension
        self.bias = nn.Parameter(torch.zeros(1)) #1 dimension

    def forward(x):
        mean = x.mean(dim=1, KeepDim=True) #keep all dimensions
        std = x.std(dim=1, KeepDim=True)

        return self.a*(x-mean/(std+self.epsilon)+self.bias


#add non linearity
class FeedForwardNetwork(nn.Module):
    def __init__(self,d_model,Pdrop, d_ff=None)
        if d_ff == None:
            d_ff = d_model*4
        super(FeedForwardNetwork, self).__init__()
        self.linear1 = nn.Linear(d_model,d_ff)
        self.dropout= nn.Dropout(Pdrop)
        self.linear2 = nn.Linear(d_ff, dmodel)
    
    def forward(self,x):
        x = self.dropout(F.relu(self.linear1(x)))
        return self.linear2(x)


class AddAndNorm(nn.Module):
    def __init__(self,LayerNorm, Pdrop):
        super(AddAndNorm, self).__init__()
        self.dropout = nn.Dropout(Pdrop)
        self.LayerNorm = LayerNorm()
    def forward(self,x,sublayer):
        return self.LayerNorm(x+self.dropout(sublayer))
        

In [None]:
#same computation overhead as self attention but more semantic representations

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, h, Pdrop):
        super(MultiHeadAttention, self).__init__()
        if d_model % h == 0, "d_k=d_v=d_model/h":
            raise ValueError()

        
        self.h = h #num_heads
        self.d_model = d_model #length of embeddings
        self.d_k = self.d_v = int(self.d_model / self.h)

        self.dropout= nn.Dropout(Pdrop)

        self.W_Q = nn.Linear(d_model,d_model)
        self.W_K = nn.Linear(d_model,d_model)
        self.W_V = nn.Linear(d_model,d_model)

        self.W_O = nn.Linear(d_model,d_model)
        
    
        
    def forward(self,Q,K,V, mask):
        batch_size, seq_length, d_model = Q.size()
        
        queries = self.W_Q(Q)
        keys = self.W_K(K)
        values = self.W_V(V)

        queries = queries.reshape(batch_size, self.h, seq_length, self.d_k) #(batch size, seq_length, d_model) ==> (batch size, seq_length, h, d_k)
        keys = keys.reshape(batch_size, self.h, seq_length, self.d_k)
        values = values.reshape(batch_size, self.h, seq_length, self.d_v)

        queries = queries.transpose(1,2) #(batch size, h, seq_length, d_k), flips the dims 1 and 2
        keys = keys.transpose(1,2)
        values = values.transpose(1,2)

        scores = torch.matmul(queries, keys.transpose(-2,-1))/self.d_k**.5

        scores = scores.masked_fill(mask=0, -1*10^10) #bc e^-inf=0 in softmax will be equal to 0
        attention = F.softmax(scores, dim=-1)
        
        attention = self.dropout(attention) #dropout after activation function
            
        weighted = torch.matmul(attention, values)
        concat = weighted.reshape(batch_size, seq_length, d_model)
        out = self.W_O(concat) #W_O
        return out
        



In [8]:
class EncoderBlock(nn.Module):
    def __init__(self,d_model,h,Pdrop):
        super(EncoderBlock, self).__init__()
        self.MHA1 = MultiHeadAttention(d_model,h,Pdrop)
        self.FFN = FeedForwardNetwork(d_model, Pdrop, d_ff=d_model*4)
        self.residual = nn.ModuleList([AddAndNorm(LayerNorm, Pdrop) for x in range(2)])
                                      
    def forward(self,x,mask):
       x = self.residual[0](self.MHA1(x,x,x,mask))
       x = self.residual[1](self.FFN(x))
       return x

class Encoder(nn.Module):
    def __init___(self,N,d_model,h,Pdrop):
        super(Encoder, self).__init__()
        self.layers = [EncoderBlock(d_model,h,Pdrop) for x in range(N)]
    def forward(x,mask):
        for layer in layers:
            x = layer(x,mask)
        return x

In [15]:
class DecoderBlock(nn.Module):
    def __init__(self,d_model,h,Pdrop):
        super(DecoderBlock, self).__init__()
        self.MHA2 = MultiHeadAttention(d_model,h,Pdrop)
        self.MHA3 = MultiHeadAttention(d_model,h,Pdrop)
        self.FFN = FeedForwardNetwork(d_model, Pdrop, d_ff=d_model*4)
        self.residual = nn.ModuleList([AddAndNorm(LayerNorm, Pdrop) for x in range(3)])
    def forward(self,x,encoder_output,tgt_mask, src_mask):
        x = self.residual[0](self.MHA2(x,x,x,tgt_mask))
        x = self.residual[1](self.MHA3(x,encoder_output,encoder_output,src_mask))
        x = self.residual[1](self.FFN(x))
        return x


class Decoder(nn.Module):
    def __init___(self,N,d_model,h,Pdrop):
        super(Decoder, self).__init__()
        self.layers = [DecoderBlock(d_model,h,Pdrop) for x in range(N)]
    def forward(x,mask):
        for layer in layers:
            x = layer(x,mask)
        return x
        
        
    

In [16]:
class ProjectionLayer(nn.Module):
    def __init__(self,vocab_size,d_model):
        super(ProjectionLayer, self).__init__()
        self.proj = nn.Linear(d_model,vocab_size)
    def forward(x):
        torch.log_softmax(self.proj(x),dim=-1) #softmax along the vocab size dim --> batch_size x seq_length, vocab_size (vocab_probabilities)
        


In [None]:
class Transformer(nn.Module):
    def __init__(self,N,src_vocab_length,tgt_vocab_length, src_seq_length, tgt_seq_length, d_model,h,Pdrop):
        self.encoder = Encoder(N,d_model,h,Pdrop)
        self.decoder = Decoder(N,d_model,h,Pdrop)
        self.src_embeddings = Embeddings(src_vocab_size, d_model)
        self.tgt_embeddings = Embeddings(tgt_vocab_size, d_model)
        self.src_positional_encoding = Positional_Encoding(src_seq_length,d_model,Pdrop)
        self.tgt_positional_encoding = Positional_Encoding(tgt_seq_length,d_model,Pdrop)
        self.projection_layer = ProjectionLayer(tgt_vocab_size,d_model)

    def encode(self,tokens):
        x = self.src_embeddings(tokens)
        x = self.src_positional_encoding(x)
        x = self.encoder(x)
        return x
        
    def decode(self,tokens):
        x = self.tgt_embeddings(tokens)
        x = self.tgt_positional_encoding(x)
        x = self.decoder(x)
        return x
        
    def project(self):
        return self.projection_layer(x)
    

In [None]:
def build_transformer():
    

In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')