<a href="https://colab.research.google.com/github/AdnanSakal/transformer/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
import os
import zipfile

In [None]:
!wget 'http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip'

In [None]:
zip_ref =zipfile.ZipFile("/content/spa-eng.zip")
zip_ref.extractall()
zip_ref.close()

In [None]:
with open("/content/spa-eng/spa.txt","r") as f:
  text = f.read()

In [None]:
len(text.splitlines())

118964

In [None]:
all_text = text.splitlines()
all_text_ = [i.split("\t") for i in all_text]

In [None]:
context = np.array([c for c,t in all_text_])
target = np.array([t for c,t in all_text_])

In [None]:
context[0],target[0]

('Go.', 'Ve.')

In [None]:
pip install tensorflow_text

In [None]:
import tensorflow_text as tf_text

In [None]:
def split(context,target):
  train_size = int(len(context)*0.8)
  train_data = context[:train_size]
  test_data = context[train_size:]
  train_label = target[:train_size]
  test_label = target[train_size:]
  return train_data,test_data,train_label,test_label

In [None]:
train_context,test_context,train_label,test_label = split(context,target)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_context,train_label)).batch(64).prefetch(tf.data.AUTOTUNE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_context,test_label)).batch(64).prefetch(tf.data.AUTOTUNE)

In [None]:
def preprocess(data):
  text = tf_text.normalize_utf8(data,"NFKD")
  text = tf.strings.lower(text)
  text = tf.strings.regex_replace(text,"[^ a-z.?!,¿]","")
  text = tf.strings.regex_replace(text,"[.?!,¿]",r" \0 ")
  text = tf.strings.strip(text)
  text = tf.strings.join(["[START]",text,"[END]"],separator=" ")
  return text

In [None]:
from tensorflow.keras.layers import TextVectorization

In [None]:
context_vect = TextVectorization(max_tokens = 5000,
                                 standardize = preprocess,
                                 output_sequence_length = 13
                                 )

In [None]:
context_vect.adapt(train_dataset.map(lambda i,l : i))

In [None]:
len_ = [len(i.split()) for i in context]

In [None]:
len__ = [len(i.split()) for i in target]

In [None]:
np.percentile(len_,98)

13.0

In [None]:
np.percentile(len__,98)

13.0

In [None]:
context_vect.get_vocabulary()[:10]

['', '[UNK]', '[START]', '[END]', '.', 'i', 'to', 'you', 'the', 'tom']

In [None]:
target_vect = TextVectorization(max_tokens = 5000,
                                 standardize = preprocess,
                                 output_sequence_length = 13
                                 )

In [None]:
target_vect.adapt(train_dataset.map(lambda i,l: l))

In [None]:
def another_perprocess(context,target):
  context = context_vect(context)
  target = target_vect(target)
  target_in = target[:,:-1]
  target_out = target[:,1:]
  return (context,target_in),target_out

In [None]:
new_train_dataset = train_dataset.map(another_perprocess)

In [None]:
new_test_dataset = test_dataset.map(another_perprocess)

In [None]:
for i,l in new_train_dataset.take(1):
  print(i[0])


tf.Tensor(
[[   2   45    4    3    0    0    0    0    0    0    0    0    0]
 [   2   45    4    3    0    0    0    0    0    0    0    0    0]
 [   2   45    4    3    0    0    0    0    0    0    0    0    0]
 [   2   45    4    3    0    0    0    0    0    0    0    0    0]
 [   2 1840    4    3    0    0    0    0    0    0    0    0    0]
 [   2  415   90    3    0    0    0    0    0    0    0    0    0]
 [   2  415    4    3    0    0    0    0    0    0    0    0    0]
 [   2   89   10    3    0    0    0    0    0    0    0    0    0]
 [   2  399   90    3    0    0    0    0    0    0    0    0    0]
 [   2  399   90    3    0    0    0    0    0    0    0    0    0]
 [   2  399   90    3    0    0    0    0    0    0    0    0    0]
 [   2   88   90    3    0    0    0    0    0    0    0    0    0]
 [   2   88   90    3    0    0    0    0    0    0    0    0    0]
 [   2   88   90    3    0    0    0    0    0    0    0    0    0]
 [   2 1682   90    3    0    0    0 

In [None]:
def positional_encoder(length,depth):
  depth = depth/2

  position = np.arange(length)[:,np.newaxis]

  depths = np.arange(depth)/depth
  angle_rates = 1 / 10000**depths

  angle_rads = position * angle_rates

  pos_encoding = np.concatenate([np.sin(angle_rads),np.cos(angle_rads)],axis=-1)


  return tf.cast(pos_encoding,dtype= tf.float32)

In [None]:
class positional_encoding(tf.keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.embedding = tf.keras.layers.Embedding(input_dim=5000,
                                               output_dim = 128
                                               )
    self.position_encoder = positional_encoder(length = 2048,depth = 128)
  def call(self,input):
    length = tf.shape(input)[1]
    x = self.embedding(input)
    x = x * tf.sqrt(tf.cast(128,dtype =tf.float32))
    x = x + self.position_encoder[tf.newaxis,:length,:]
    return x

In [None]:
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self,**k):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**k)
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

In [None]:
class CrossAttention(BaseAttention):
  def call(self,input,context):
    x = self.mha(query =input,
                 key = context,
                 value = context
                 )
    x = self.add([input,x])
    x = self.layer_norm(x)
    return x

In [None]:
class GlobalAttention(BaseAttention):
  def call(self,input):
    x = self.mha(query =input,
                 key = input,
                 value = input
                 )
    x = self.add([input,x])
    x = self.layer_norm(x)
    return x

In [None]:
class MaskAttention(BaseAttention):
  def call(self,input):
    x = self.mha(query =input,
                 key = input,
                 value = input,
                 use_causal_mask = True
                 )
    x = self.add([input,x])
    x = self.layer_norm(x)
    return x

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.dense = tf.keras.Sequential([
        tf.keras.layers.Dense(512,activation = "relu"),
        tf.keras.layers.Dense(128),
        tf.keras.layers.Dropout(0.1)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()
  def call(self,input):
      x = self.dense(input)
      x = self.add([input,x])
      x = self.layer_norm(x)
      return x

In [None]:
class Encoder_layer(tf.keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.global_attention = GlobalAttention(num_heads = 8,key_dim = 128,dropout = 0.1)
    self.fnn = FeedForward()
  def call(self,input):
    x = self.global_attention(input)
    x = self.fnn(x)
    return x

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.positional_encoding = positional_encoding()
    self.encoder_layer = [Encoder_layer() for _ in range(4)]
  def call(self,input):
    x = self.positional_encoding(input)
    for i in range(4):
      x = self.encoder_layer[i](x)
    return x

In [None]:
class Decoder_Layer(tf.keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.mask_attention = MaskAttention(num_heads= 8,key_dim = 128,dropout = 0.1)
    self.cross_attention = CrossAttention(num_heads= 8,key_dim = 128,dropout = 0.1)
    self.fnn = FeedForward()
  def call(self,input,context_):
    x = self.mask_attention(input)
    x = self.cross_attention(input = x,context = context_)
    x = self.fnn(x)
    return x

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self):
    super().__init__()
    self.position = positional_encoding()
    self.decoder_layer = [Decoder_Layer() for _ in range(4)]
  def call(self,input,context):
    x = self.position(input)
    for i in range(4):
      x = self.decoder_layer[i](x,context)
    return x

In [None]:
class Transformer(tf.keras.Model):
  def __init__(self):
    super().__init__()
    self.Encoder = Encoder()
    self.Decoder = Decoder()
    self.final_layer= tf.keras.layers.Dense(5000)
  def call(self,input):
    context,x = input
    en_output = self.Encoder(context)
    de_output = self.Decoder(x,en_output)
    output_layer = self.final_layer(de_output)
    return output_layer

In [None]:
class custom_lr(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self):
    super().__init__()
    self.d_model = tf.cast(128,dtype = tf.float32)
    self.warm_up = 4000
  def __call__(self,step):
    step = tf.cast(step,dtype = tf.float32)
    arg_1 = tf.math.rsqrt(step)
    arg_2 = step * (self.warm_up**-1.5)
    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg_1,arg_2)


In [None]:
learning_rate = custom_lr()

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

In [None]:
transformer = Transformer()

In [None]:
transformer.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
                    optimizer = optimizer,
                    metrics =["accuracy"]
                    )
transformer.fit(new_train_dataset.prefetch(tf.data.AUTOTUNE),
            epochs = 10,

            )

Epoch 1/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m121s[0m 42ms/step - accuracy: 0.5648 - loss: 4.8587
Epoch 2/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.8139 - loss: 1.1042
Epoch 3/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.8489 - loss: 0.7895
Epoch 4/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.8623 - loss: 0.6745
Epoch 5/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.8837 - loss: 0.5346
Epoch 6/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.8921 - loss: 0.4789
Epoch 7/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.9016 - loss: 0.4251
Epoch 8/10
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 32ms/step - accuracy: 0.9084 - loss: 0.3888
Epoch 9

<keras.src.callbacks.history.History at 0x7d1051a52d40>

In [None]:
transformer.evaluate(new_test_dataset)

[1m372/372[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 26ms/step - accuracy: 0.7200 - loss: 1.4344


[1.9026317596435547, 0.6459006667137146]

In [None]:
new_word_to_id = tf.keras.layers.StringLookup(vocabulary = target_vect.get_vocabulary(),
                                              mask_token = "",
                                              oov_token = '[UNK]'
                                              )

In [None]:
new_id_to_word = tf.keras.layers.StringLookup(vocabulary = target_vect.get_vocabulary(),
                                              mask_token = "",
                                              oov_token = '[UNK]',
                                              invert = True
                                              )

In [None]:
def translate(text):
  text = preprocess(text)
  text = context_vect([text])

  start = new_word_to_id("[START]")[np.newaxis]
  end = new_word_to_id("[END]")[np.newaxis]
  output_array = tf.TensorArray(dtype = tf.int64,size=0,dynamic_size= True)
  output_array.write(0,start)
  for i in range(100):
    output = tf.transpose(output_array.stack())
    predictions = transformer([text,output])
    pred = predictions[:,-1:,:]
    pred_id = tf.argmax(pred,axis=-1)

    output_array = output_array.write(i+1, pred_id[0])

    if pred_id == end:
      break
  return output_array

In [None]:
x = translate(["what is your name?"]).stack()

In [None]:
tf.strings.reduce_join(new_id_to_word(x.numpy().T),axis=1,separator = " ")

<tf.Tensor: shape=(1,), dtype=string, numpy=
array([b'[START] \xc2\xbf cual es tu nombre de [UNK] ? [END]'],
      dtype=object)>