In [2]:
import numpy as np 

In [2]:
class LinearLayer3d():
    
    def __init__(self,num_head,d_emd,d_out,biases=False):
        self.weights = np.random.randn(num_head,d_emd,d_out)*np.sqrt(2/(num_head*d_emd))# make it xavier
        self.biases = np.zeros((1,num_head,1,d_out)) if biases else 0
        self.v_w = 0
        self.s_w = 0
        if biases:
            self.v_b = 0
            self.s_b = 0
        
    def forward(self,inps):
        self.inputs = inps
        return np.matmul(inps,self.weights) + self.biases
        
    
    def backward(self,grads,step_size):
        nabla_w = np.matmul(self.inputs.transpose(0,1,-1,-2),grads).sum(axis=0)
        nabla_inps = np.matmul(grads,self.weights.transpose(0,-1,1)) 
        self.weights -= step_size*nabla_w
        if self.biases is not 0:
            nabla_b = grads.sum(axis=0,keepdims=True).sum(axis=2,keepdims=True)
            self.adam_optim(nabla_w, nabla_b,alpha=step_size)
        else:
            self.adam_optim(nabla_w,alpha=step_size)
        return nabla_inps
    
    def adam_optim(self,d_w,d_b=None,alpha,B1=0.9,B2=0.999):
        if self.v_w is 0:
            self.v_w = d_w
            self.s_w = d_w
            
        if d_b is not None:
            if self.v_b is 0:
                self.v_b = d_b
                self.s_b = d_b
            
            self.v_b = B1*self.v_b + (1-B1)*d_b
            self.s_b = B2*self.s_b + (1-B2)*(d_b**2)

        self.weights -= alpha*self.v_b/(np.sqrt(self.s_b)+1e-8)
            
        self.v_w = B1*self.v_w + (1-B1)*d_w
        self.s_w = B2*self.s_w + (1-B2)*(d_w**2)

        self.weights -= alpha*self.v_w/(np.sqrt(self.s_w)+1e-8)
        
        

In [3]:
class Softmax():
    
    def forward(self,z,axis=-1):
        self.axis = axis
        e = np.exp(z-z.max(axis=axis,keepdims=True))
        self.output = e/np.nansum(e,axis=axis,keepdims=True)
        return self.output
    
    def backward(self,grads):
        return self.output*(grads-np.sum(self.output*grads,axis=self.axis,keepdims=True))
          
    
class CrossEntropyLoss():
    
    def forward(self,out_probs,labels):
        self.in_probs = out_probs
        self.labels = labels
        return np.sum(-labels*np.log(self.in_probs))
    
    def backward(self):
        return -self.labels/self.in_probs
        

In [4]:
class LayerNorm():
    
    def forward(self,inps):
        self.inps = inps
        self.var = np.var(self.inps,axis=-1,keepdims=True)
        self.norm = (self.inps-np.mean(inps,axis=-1,keepdims=True))/np.sqrt(self.var)
        return self.norm
    
    def backward(self,grads):
        n = self.norm.shape[-1]
        return (np.sqrt(self.var)*(n*grads-grads.sum(axis=-1,keepdims=True))-self.norm*((self.inps-
                            np.mean(self.inps,axis=-1,keepdims=True))*grads).sum(axis=-1,keepdims=True))/(n*self.var)


In [5]:
class Dropout():
    
    def __init__(self,drop_prob):
        self.drop_prob = drop_prob
        
    def forward(self,inps):
        self.dropout = (np.random.rand(*inps.shape) > self.drop_prob)
        return np.where(self.dropout,inps,0)
        
    def backward(self,grads):
        return np.where(self.dropout,grads,0)


In [6]:
class MultiHeadAttention():
    
    def __init__(self,num_head,d_model):
        
        self.num_head = num_head
        self.d_model = d_model
        
        self.q_w1 = LinearLayer3d(num_head,d_model,int(d_model/num_head)) 
        self.k_w1 = LinearLayer3d(num_head,d_model,int(d_model/num_head))
        self.v_w1 = LinearLayer3d(num_head,d_model,int(d_model/num_head))
        self.o_w1 = LinearLayer3d(1,d_model,d_model)

        self.sm = Softmax()
        

    def forward(self,query,key,value,valid_lens=None):
        self.mh_q = self.q_w1.forward(query) #multiheaded queries
        self.mh_k = self.k_w1.forward(key) #multiheaded keys
        self.mh_v = self.v_w1.forward(value) #multiheaded values

        score = self.mh_q@self.mh_k.transpose(0,1,-1,-2) #attention score
        score /= np.sqrt(self.d_model) #scaled attention score
        if valid_lens is not None:
            mask = MultiHeadAttention.get_mask(valid_lens,score.shape)
            score *= mask
        self.sm_score = self.sm.forward(score)
        a_o = (self.sm_score@self.mh_v) #(10, 8, 9, 64) #attention output

        ccat_o = a_o.transpose(0,2,1,3).reshape(-1,1,a_o.shape[2],512) #concatenated output
        return self.o_w1.forward(ccat_o) #applying linear transformation


    def backward(self,grads,step_size):

        d_ccat_o = self.o_w1.backward(grads,step_size)
        d_mh_o = d_ccat_o.reshape(-1,d_ccat_o.shape[2],self.num_head,int(self.d_model/self.num_head)).transpose(0,2,1,3)

        d_sm_score = d_mh_o@self.mh_v.transpose(0,1,3,2)
        d_mh_v = self.sm_score.transpose(0,1,3,2)@d_mh_o
        d_score = self.sm.backward(d_sm_score)
        d_score *= np.sqrt(self.d_model)

        d_mh_q = d_score@self.mh_k
        d_mh_k = d_score.transpose(0,1,-1,-2)@self.mh_q

        d_q = self.q_w1.backward(d_mh_q,step_size).sum(axis=1,keepdims=True)
        d_v = self.v_w1.backward(d_mh_v,step_size).sum(axis=1,keepdims=True)

        d_k = self.k_w1.backward(d_mh_k,step_size).sum(axis=1,keepdims=True)
        
        return d_q,d_k,d_v  #gradients of query,value,key
    
    @staticmethod
    def get_mask(self,valid_lens,shape):
        valid_lens = valid_lens.squeeze()
        mask = np.ones((shape[0],shape[2]))
        x1 = np.arange(1,1+shape[2])[np.newaxis,:].repeat(shape[0],axis=0)
        x2 = valid_lens[:,np.newaxis].repeat(shape[2],axis=1)
        mask[x1>x2] = -np.inf
        return mask[:,:,np.newaxis].repeat(shape[3],axis=2)[:,np.newaxis]
        


In [7]:
class EncoderBlock():
    
    def __init__(self,num_heads, d_model, drop_prob):
        self.Ln1 = LayerNorm()
        self.Ln2 = LayerNorm()
        self.ff1 = LinearLayer3d(1,d_model,d_model*4,biases=True)
        self.ff2 = LinearLayer3d(1,d_model*4,d_model,biases=True)
        self.mha = MultiHeadAttention(num_heads, d_model)
        self.dp1 = Dropout(drop_prob)
        self.dp2 = Dropout(drop_prob)
        
        
    def forward(self,inp):
        
        attn_o = self.mha.forward(inp,inp,inp)

        dp1_o = self.dp1.forward(attn_o,0.85)
        norm1 = self.Ln1.forward(dp_o+q)
        
        ff1_out = self.ff1.forward(norm1)
        self.reLu = ff1_out<0
        ff1_out[self.reLu] = 0
        ff2_out = self.ff2.forward(ff2_in)
        
        dp2_o = self.dp1.forward(ff2_out)
        norm2 = self.Ln2.forward(dp2_o+attn_o)
        
        return norm2
    
    
    def backward(self,grads,step_size):
        
        d_add1 = self.Ln2.backward(grads)
        d_ff2_out = self.dp2.backward(d_add1)
        d_ff1_out = self.ff2.backward(d_ff2_out,step_size)
        d_ff1_out[self.reLu] = 0
        d_dp_o = self.Ln1.backward(self.ff1.backward(d_ff1_out,step_size))
        
        d_q1 = d_dp_o
        d_attn_o = self.dp1.backward(d_dp_o) + d_add1

        d_q2, d_k, d_v = self.mha.backward(d_attn_o, step_size)
        
        return d_q1 + d_q2 + d_k + d_v

In [8]:
class DecoderBlock():
    def __init__(self,num_heads,d_model,drop_prob):  
        
        self.Ln1 = LayerNorm()
        self.Ln2 = LayerNorm()
        self.Ln3 = LayerNorm()
        
        self.dp1 = Dropout(drop_prob)
        self.dp2 = Dropout(drop_prob)
        self.dp3 = Dropout(drop_prob)
        
        self.ff1 = LinearLayer3d(1,d_model,d_model*4)
        self.ff2 = LinearLayer3d(1,d_model*4,d_model)
        
        self.masked_mh_attn = MultiHeadAttention(num_heads,d_model)
        self.mh_attn = MultiHeadAttention(num_heads,d_model)
        
        
    def forward(self,inp,enc_o):
        
        attn1_out = self.masked_mh_attn.forward(inp,inp,inp)
        dp1_o = self.dp1.forward(attn1_out)
        norm1_out = self.Ln1.forward(dp1_o+inp)
        
        attn2_out = self.mh_attn.forward(norm1_out,enc_o,enc_o)
        dp2_o = self.dp2.forward(attn2_out)
        norm2_out = self.Ln2.forward(dp2_o+norm1_out)
        
        ff1_out = self.ff1.forward(norm2_out)
        self.reLu = ff1_out < 0
        ff1_out[self.reLu] = 0
        ff2_out = self.ff2.forward(ff1_out)
        dp3_o = self.dp3.forward(ff2_out)
        norm3_out = self.Ln3.forward(dp3_o+norm2_out)
        
        return norm3_out
        
        
    def backward(self,grads,step_size):
        
        d_add3 = self.Ln3.backward(grads)
        d_ff2 = self.dp3.backward(d_add3)
        d_ff1_out = self.ff2.backward(d_ff2,step_size)
        d_ff1_out[self.reLu] = 0
        d_norm2 = self.ff1.backward(d_ff1_out,step_size) + d_add3
        d_add2 = self.Ln2.backward(d_norm2)
        
        d_mh_attn = self.dp2.backward(d_add2) 
        d_norm1, d_enc_o1, d_enc_o2 = self.mh_attn.backward(d_mh_attn,step_size)  
        d_norm1 += d_add2
        d_add1 = self.Ln1.backward(d_norm1)
        
        d_masked_mh_attn = self.dp1.backward(d_add1)
        d_inp, d_k, d_v = self.masked_mh_attn.backward(d_masked_mh_attn,step_size)
        
        d_inp = d_inp + d_k + d_v + d_add1
        d_enc_o = d_enc_o1 + d_enc_o2
        return d_inp, d_enc_o
        

In [73]:
def get_pos_embedding(n_pos,d_model):
    angles = np.fromfunction(lambda i,j:i/10000**(2*j/d_model),(n_pos,int(d_model/2)))
    pos_enc = np.ones((n_pos,d_model))
    pos_enc[:,::2] = np.sin(angles)
    pos_enc[:,1::2] = np.cos(angles)
    return pos_enc

