In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd

In [None]:
import tensorflow_datasets as tfds

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert")

#Creating Embedding and positional embedding

In [None]:
splits = {'train': 'data/train-00000-of-00001.parquet', 'validation': 'data/validation-00000-of-00001.parquet', 'test': 'data/test-00000-of-00001.parquet'}
df = pd.read_parquet("hf://datasets/cfilt/iitb-english-hindi/" + splits["train"])[:100_000]
df = np.array(df)

##Tokenizing Inputs

In [None]:
hi_text = []
en_text = []

max_len = 0
for i in range(10000):
  hi_text.append(df[i][0]["hi"])
  en_text.append(df[i][0]["en"])

In [None]:
assert len(hi_text) == len(en_text), "length words in input language does not match length of words in output language"

hi_token= tokenizer(hi_text,padding=True,return_tensors='tf')["input_ids"]
en_token= tokenizer(en_text,padding=True,return_tensors='tf')["input_ids"]

##Creating Embedding of tokens

In [None]:
vocab_size = tokenizer.vocab_size

embedding_layer = tf.keras.layers.Embedding(input_dim=vocab_size + 1,output_dim=128)
hi_embedding = embedding_layer(hi_token)
en_embedding = embedding_layer(en_token)

In [None]:
arr= [4,5,6,7]
arr[::3]

##Positional encoding

In [None]:
# pos_embed_arr = []
# for pos in range(58):
#       for i in range(128):
#         if i % 2 == 0:
#           pe = np.sin(pos/10000**(2*i/128))
#           pos_embed_arr.append(pe)
#         elif i % 2 != 0:
#           pe = np.cos(pos/10000**(2*i/128))
#           pos_embed_arr.append(pe)

# pos_embed_arr = np.array(pos_embed_arr)
# pos_embed_arr = pos_embed_arr.reshape(58,128)
# pos_embed_arr

In [None]:
d_model_encoder = en_embedding.shape[2]
d_model_decoder = hi_embedding.shape[2]
max_token_encoder = en_embedding.shape[1]
max_token_decoder = hi_embedding.shape[1]

In [None]:
class PositionalEncoder(tf.keras.layers.Layer):
  def __init__(self,max_token,d_model,dtype = np.float32,**kwargs):
    super().__init__(dtype = dtype,**kwargs)
    self.d_model = d_model
    self.max_token = max_token
    assert d_model % 2 == 0, "d_model should be even"

    self.pos_embed_arr = []
    for pos in range(self.max_token):
      for i in range(d_model):
        if i % 2 == 0:
          pe = np.sin(pos/10000**(2*i/self.d_model))
          self.pos_embed_arr.append(pe)
        elif i % 2 != 0:
          pe = np.cos(pos/10000**(2*i/self.d_model))
          self.pos_embed_arr.append(pe)

    self.pos_embed_arr = np.array(self.pos_embed_arr)
    self.pos_embed_arr = self.pos_embed_arr.reshape(self.max_token,self.d_model)

  def call(self,embedding_vector):
      return embedding_vector + self.pos_embed_arr

In [None]:
pos_embedding_encoder = PositionalEncoder(max_token_encoder,d_model_encoder)
final_en_embedding = pos_embedding_encoder.call(en_embedding)
final_en_embedding

#Add & Norm Layer

In [None]:
class Add_Norm(tf.keras.layers.Layer):
  def __init__(self,**kwargs):
    super().__init__(dtype=np.float32,**kwargs)
    self.norm_layer = tf.keras.layers.LayerNormalization(axis=-1,epsilon=0.001)

  def call(self,sublayer_output,residual_input):
    return self.norm_layer(sublayer_output + residual_input)

#Multi-Head Attention / Scaled Dot Product Attention

In [None]:
class ScaledDotProduct_Attention(tf.keras.layers.Layer):
  def __init__(self,d_k:int,**kwargs):
    super().__init__(dtype=np.float32,**kwargs)
    self.d_k = d_k

  def call(self,q,k,v):
    dot_product = tf.matmul(q,k,transpose_b=True) #score
    scaled_dot_product = dot_product / tf.sqrt(tf.cast(self.d_k,dtype=tf.float32))
    attention_weight = tf.nn.softmax(scaled_dot_product,axis = -1)
    output = tf.matmul(attention_weight,v)
    return output

In [None]:
class MultiHead_Attention(tf.keras.layers.Layer):
  def __init__(self,d_model:int,heads:int,**kwargs):
    super().__init__(dtype=np.float32,**kwargs)
    self.d_model = d_model
    self.heads = heads
    assert d_model % heads == 0, "d_model must be perfectly divisible by heads"
    self.d_k = d_model // heads
    self.attention_fuction = ScaledDotProduct_Attention(self.d_k)
    self.wq = [tf.keras.layers.Dense(self.d_k,use_bias=False) for _ in range(self.heads)]
    self.wk = [tf.keras.layers.Dense(self.d_k,use_bias=False) for _ in range(self.heads)]
    self.wv = [tf.keras.layers.Dense(self.d_k,use_bias=False) for _ in range(self.heads)]
    self.final_linear_transform = tf.keras.layers.Dense(self.d_model,use_bias=False)
    self.add_norm_layer = Add_Norm()

  def call(self,initial_embedding:tf.Tensor):
    concat_arr = []
    for head in range(self.heads):
      q = self.wq[head](initial_embedding)
      k = self.wk[head](initial_embedding)
      v = self.wv[head](initial_embedding)

      output = self.attention_fuction.call(q,k,v)
      concat_arr.append(output)
    concat_output = tf.concat(concat_arr,axis=-1)
    mha_output = self.final_linear_transform(concat_output)
    residual_output = self.add_norm_layer.call(mha_output,initial_embedding)
    return residual_output

#Feed Forward Network

In [None]:
class FFN(tf.keras.layers.Layer):
  def __init__(self,d_model:int,**kwargs):
    super().__init__(dtype=np.float32,**kwargs)
    self.d_model = d_model
    self.add_norm_layer = Add_Norm()
    self.network = tf.keras.Sequential([
        tf.keras.layers.Dense(self.d_model * 4,activation='relu'),
        tf.keras.layers.Dense(self.d_model)
    ])

  def call(self,mha_output):
    nn_output = self.network(mha_output)
    ffn_output = self.add_norm_layer.call(nn_output,mha_output)
    return ffn_output

#**Encoder**

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, d_model:int, heads:int,**kwargs):
    super().__init__(dtype=np.float32,**kwargs)
    self.d_model = d_model
    self.heads = heads
    self.mha = MultiHead_Attention(d_model=self.d_model,heads = self.heads)
    self.ffn = FFN(d_model=self.d_model)

  def call(self,input_embedding):
    first_sublayer_out = self.mha.call(initial_embedding = input_embedding)
    ffn_output = self.ffn.call(mha_output = first_sublayer_out)
    encoder_output = ffn_output
    return encoder_output

In [None]:
encoder = Encoder(d_model=d_model_encoder,heads=4)
encoder.call(final_en_embedding)