In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Embedding,Dense
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.sparse import SparseTensor
from tensorflow.keras.activations import softmax

In [None]:
data  = pd.read_csv('/kaggle/input/fren-to-eng/data.csv')

In [None]:
n_embd = 384
block_size = 202
num_heads = 6
head_size = n_embd//num_heads
batch_size = 2
vocab_size = 259
sos = 256
eos = 257
pad = 258
dropout=0.2
n_layer=6

In [None]:
text = ""
for x in data['fr_clean']:
    text+=x
chars = list(set(text))
chars.sort()
chars=chars +[chr(sos),chr(eos)]

In [None]:
ch_int = {ch:i for i,ch in enumerate(chars)}
int_ch = {i:ch for i,ch in enumerate(chars)}
int_ch[pad]=''

In [None]:
def encode(text:str)->list:
    enc = [ord(c) for c in text]
    enc.insert(0,sos)
    enc.append(eos)
    return enc
def decode(enc:list)->str:
    dec = [int_ch[c] for c in enc]
    text = ''.join(dec)
    return text

In [None]:
def getBatch():
    sample = data.sample(n=batch_size)
    X = []
    Y = []
    target = []
    max_x=0
    max_y=0
    for index,row in sample.iterrows():
        x = encode(row['fr_clean'])
        y = encode(row['eng_clean'])
        tar  = y[1:]
        X.append(x)
        Y.append(y)
        target.append(tar)
        max_x = max(max_x,len(x))
        max_y = max(max_y,len(y))
    for i in range(batch_size):
        while len(X[i])<max_x:
            X[i].append(pad)
        while len(Y[i])<max_y:
            Y[i].append(pad)
        while len(target[i])<max_y:
            target[i].append(pad)
    X = np.array(X) #B*T
    Y = np.array(Y) #B*T
    target = np.array(target) #B*T
    return X,Y,target

def positional_encoding(T,n_embd):
    pos = np.arange(T).reshape(T,1)
    i = np.arange(n_embd).reshape(1,n_embd)//2
    angles = pos/np.power(10000,2*i/n_embd)
    angles[:, 0::2] = np.sin(angles[:, 0::2])
    angles[:, 1::2] = np.cos(angles[:, 1::2])
    pos_encoding = angles[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
# B = Batchsize
# T = Number of tokens
# C = Embedding dimension
# N = Number of Heads
# K = Head Size

    

class EncoderHead():
    def __init__(self):
        self.key = Dense(head_size,use_bias=False)
        self.query = Dense(head_size,use_bias=False)
        self.value = Dense(head_size,use_bias=False)
        self.drop =  tf.keras.layers.Dropout(dropout)

    def __call__(self,x,pad_mask,training):
        B,T,C = x.shape
        k = self.key(x) #B*T*K
        q = self.query(x) #B*T*K
        v = self.value(x) #B*T*K
        K = k.shape[-1]
        k_T = tf.transpose(k, perm=[0,2,1]) #B*K*T
        weight = tf.matmul(q,k_T)/K**0.5 #B*T*T
        pad_mask = pad_mask[:,tf.newaxis,:] #B*1*T
        pad_mask = tf.tile(pad_mask,[1,T,1]) #B*T*T 
        weight = tf.where(pad_mask==1,float('-inf'),weight) #B*T*T
        weight =  softmax(weight) #B*T*T
        weight = self.drop(weight,training=training)
        out = tf.matmul(weight,v) #B*T*K
        return out
    
class EncoderMultiHead():
    def __init__(self):
        self.heads = [EncoderHead() for _ in range(num_heads)]
        self.project = Dense(n_embd)
        self.drop =  tf.keras.layers.Dropout(dropout)
    
    def __call__(self,x,pad_mask,training):
        out = tf.concat([h(x,pad_mask,training) for h in self.heads],axis=-1) #B*T*(K*N)
        out = self.drop(self.project(out),training=training) #B*T*C
        return out

class FeedForward():
    def __init__(self):
        self.net =  tf.keras.Sequential([
            Dense(4*n_embd, activation='relu'),
            Dense(n_embd),
        ])
        self.drop = tf.keras.layers.Dropout(dropout)

    def __call__(self,x,training):
        return self.drop(self.net(x),training=training) #B*T*C

class EncodeBlock():
    def __init__(self):
        self.sa = EncoderMultiHead()
        self.frwd = FeedForward()
        self.lln1 = tf.keras.layers.LayerNormalization(axis=[-1])
        self.lln2 = tf.keras.layers.LayerNormalization(axis=[-1])
    
    def __call__(self,x,pad_mask,training):
        x = self.lln1(x+self.sa(x,pad_mask,training)) #B*T*C
        x = self.lln2(x+self.frwd(x,training)) #B*T*C
        return x
        
class Encoder():
    def __init__(self):
        self.tokenEmbedding = Embedding(vocab_size,n_embd)
        self.blocks = [EncodeBlock() for _ in range(n_layer)]
    
    def __call__(self,idx,pad_mask,training):
        B,T = idx.shape
        tokEmbd = self.tokenEmbedding(idx) #B * T * C
        posEmbd = positional_encoding(T,n_embd) #1*T*C
        embd = tokEmbd+posEmbd #B*T*C
        for block in self.blocks:
            embd = block(embd,pad_mask,training) #B*T*C
        return embd

In [None]:

# t  = Number of tokens in enocoder input
class DecoderHead():
    def __init__(self):
        self.key = Dense(head_size,use_bias=False)
        self.query = Dense(head_size,use_bias=False)
        self.value = Dense(head_size,use_bias=False)
        self.drop =  tf.keras.layers.Dropout(dropout)

    def __call__(self,y,encoder_output,encoder_pad_mask,training):
        B,T,C = y.shape
        t  = encoder_pad_mask.shape[1]
        k = self.key(encoder_output) #B*t*K
        q = self.query(y) #B*T*K
        v = self.value(encoder_output) #B*t*K
        K = k.shape[-1]
        k_T = tf.transpose(k, perm=[0,2,1]) #B*K*t
        weight = tf.matmul(q,k_T)/K**0.5 #B*T*t
        encoder_pad_mask = encoder_pad_mask[:,tf.newaxis,:] #B*1*t
        encoder_pad_mask = tf.tile(encoder_pad_mask,[1,T,1]) #B*T*t 
        weight = tf.where(encoder_pad_mask==1,float('-inf'),weight) #B*T*t
        weight =  softmax(weight) #B*T*t
        weight = self.drop(weight,training=training)
        out = tf.matmul(weight,v) #B*T*K
        return out
    
class DecoderMultiHead():
    def __init__(self):
        self.heads = [DecoderHead() for _ in range(num_heads)]
        self.project = Dense(n_embd)
        self.drop =  tf.keras.layers.Dropout(dropout)
    
    def __call__(self,y,encoder_output, encoder_pad_mask,training):
        out = tf.concat([h(y,encoder_output, encoder_pad_mask,training) for h in self.heads],axis=-1) #B*T*(K*N)
        out = self.drop(self.project(out),training=training) #B*T*C
        return out
class DecoderMaskHead():
    def __init__(self):
        self.key = Dense(head_size,use_bias=False)
        self.query = Dense(head_size,use_bias=False)
        self.value = Dense(head_size,use_bias=False)
        self.drop =  tf.keras.layers.Dropout(dropout)
    def createDecoderMask(self,relevantToken:int,totalToken:int):
        lower_triangular = tf.linalg.band_part(tf.ones((totalToken,totalToken),dtype=tf.int32), -totalToken, 0) #T*T
        cols = tf.range(totalToken)
        mask = tf.logical_or(False, cols >= relevantToken) #T*T
        masked_tensor = tf.where(mask, 0, lower_triangular)      #T*T
        return masked_tensor
    def __call__(self,y,decoder_pad_mask,training):
        B,T,C = y.shape
        pad_count = np.sum(decoder_pad_mask,axis=1) #B*1
        attention_mask = tf.stack([self.createDecoderMask(T-i,T) for i in pad_count]) #B*T*T
        k = self.key(y) #B*T*K
        q = self.query(y) #B*T*K
        v = self.value(y) #B*T*K
        K = k.shape[-1]
        k_T = tf.transpose(k, perm=[0,2,1]) #B*K*T
        weight = tf.matmul(q,k_T)/(K**0.5) #B*T*T
        weight = tf.where(attention_mask==0,float('-inf'),weight) #B*T*T
        weight =  softmax(weight) #B*T*T
        weight = self.drop(weight,training=training)
        out = tf.matmul(weight,v) #B*T*K
        return out
    
class DecoderMaskMultiHead():
    def __init__(self):
        self.heads = [DecoderMaskHead() for _ in range(num_heads)]
        self.project = Dense(n_embd)
        self.drop =  tf.keras.layers.Dropout(dropout)
    
    def __call__(self,y,decoder_pad_mask,training):
        out = tf.concat([h(y,decoder_pad_mask,training) for h in self.heads],axis=-1) #B*T*(K*N)
        out = self.drop(self.project(out),training=training) #B*T*C
        return out
      
class DecodeBlock():
    def __init__(self):
        self.self = DecoderMaskMultiHead()
        self.cross = DecoderMultiHead()
        self.frwd = FeedForward()
        self.lln1 = tf.keras.layers.LayerNormalization(axis=[-1])
        self.lln2 = tf.keras.layers.LayerNormalization(axis=[-1])
        self.lln3 = tf.keras.layers.LayerNormalization(axis=[-1])
    
    def __call__(self,y,decoder_pad_mask,encoder_output,encoder_pad_mask,training):
        y = self.lln1(y+self.self(y,decoder_pad_mask,training)) #B*T*C
        y = self.lln1(y+self.cross(y,encoder_output, encoder_pad_mask,training)) #B*T*C
        y = self.lln2(y+self.frwd(y,training)) #B*T*C
        return y
    

    

class Decoder():
    def __init__(self):
        self.tokenEmbedding = Embedding(vocab_size,n_embd)
        self.blocks = [DecodeBlock() for _ in range(n_layer)]
        
    def __call__(self,decoder_input,decoder_pad_mask,encoder_output,encoder_pad_mask,training):  
        B,t,C = encoder_output.shape
        T = decoder_input.shape[1]
        tokEmbd = self.tokenEmbedding(decoder_input) #B * T * C
        posEmbd = positional_encoding(T,n_embd) #1*T*C
        embd = tokEmbd+posEmbd #B*T*C
        for block in self.blocks:
            embd = block(embd,decoder_pad_mask,encoder_output,encoder_pad_mask,training) #B*T*C
        return embd

In [None]:
#R = relevant tokens for loss
class Transformer():
    def __init__(self):
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.lm_head = Dense(vocab_size)
    
    def loss(self,encoder_input,decoder_input,targets,training):
        encoder_pad_mask  = (encoder_input == pad).astype(int) # B*t
        decoder_pad_mask  = (decoder_input == pad).astype(int) #B*T
        encoder_output = self.encoder(encoder_input,encoder_pad_mask,training) #B*t*C
        decoder_output = self.decoder(decoder_input,decoder_pad_mask,encoder_output,encoder_pad_mask,training) #B*T*C
        logits = self.lm_head(decoder_output) #B*T*V
        B,T,V = logits.shape
        logits = tf.reshape(logits,[B*T,V]) #(B*T)*V
        loss_mask = np.logical_or(decoder_input == sos, decoder_input == eos).astype(int).reshape(B*T) #(B*T)
        loss_mask = tf.convert_to_tensor(loss_mask==0) # B*T
        fitered_logits = tf.boolean_mask(logits,loss_mask) #(B*R)*V
        targets = targets.reshape(B*T) #(B*T)
        filtered_targets = tf.boolean_mask(targets,loss_mask)#(B*R)
        m  = filtered_targets.shape[0]
        idx = tf.Variable(np.arange(m),dtype=tf.int64)
        filtered_targets = tf.cast(filtered_targets, dtype=tf.int64)
        indices=tf.stack([idx,filtered_targets],axis=1)
        filtered_targets = tf.sparse.to_dense(SparseTensor(indices=indices, values=[1]*m, dense_shape=[m,V])) #(B*R)*(vocab_size)
        loss = CategoricalCrossentropy(from_logits=True)(filtered_targets,fitered_logits)
        return loss
    
    def translate(self,encoder_input):
        T = encoder_input.shape[1]
        encoder_pad_mask  = (encoder_input == 258).astype(int) # 1*T
        encoder_output = self.encoder(encoder_input,encoder_pad_mask,training=False) #1*T*C
        output = [[256]] 
        i=0
        while output[0][-1]!=257 and i<2*T:
            i+=1
            
            decoder_input = np.array(output) #1*(current output length)
            decoder_pad_mask  = (decoder_input == 258).astype(int) #1*(current output length)
            decoder_output = self.decoder(decoder_input,decoder_pad_mask,encoder_output,encoder_pad_mask,training=False) #1*(current output length)*C
            logits = self.lm_head(decoder_output) #1*(current output length)*V
            logits = logits[:,-1,:] #1*V
            idx_next = tf.random.categorical(logits,1)[0][0].numpy()
            output[0].append(idx_next)
        
        return output[0]
            


In [None]:
B,T = x.shape
q = x[0].reshape(1,T)
Transformer().translate(q)
for _ in range(max_iters):
    print(_)
    xb, yb, target = get_Batch('train')
    with tf.GradientTape() as tape:
        logits, loss = model.forward(xb,yb)
    gradients = tape.gradient(loss, model.tokenEmbeddingTable.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.tokenEmbeddingTable.trainable_variables))



l= model.generate(tf.zeros([1,1]),1000)[0].numpy().tolist()
print(decode(l))