In [None]:
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Reshape, Embedding, LayerNormalization, Softmax, ReLU
from keras.optimizers import Adam
from keras.losses import SparseCategoricalCrossentropy
from keras.initializers import GlorotUniform
from keras.regularizers import L1L2
import tensorflow as tf
import math

In [None]:
def generate_vocab(sentences):
  """
    sentences: (list of str) sentences
    unique_words: (list of str) unique words in the training examples
    V: (int) Size of Vocabulary
    word_to_id: (dict) mapping of words to ids
    id_to_word: (dict) mapping of ids to words
  """
  unique_words = list(set(" ".join(sentences).split()))
  unique_words = ["[PAD]", "[UNK]"] + unique_words
  V = len(unique_words)

  word_to_id = {w: i for i, w in enumerate(unique_words)}
  id_to_word = {i: w for i, w in enumerate(unique_words)}
  return V, word_to_id, id_to_word

In [None]:
def tokenization(sentences, d=32, padding_size=4, decoder=False):
  """
    sentences: (list of str) sentences
    padding_size: (int) Size of padding
    d: (int) embedding dimension
    unique_words: (list of str) unique words in the training examples
    V: (int) Size of Vocabulary
    word_to_id: (dict) mapping of word to ids
    id_to_word: (dict) mapping of ids to words
    m: (int) number of training examples
    n: (int) sequence length in a single example
    word_ids: (numpy array) tokenized word IDs of shape (m, n)
    word_ids_shifted: (numpy array, optional) tokenized word IDs shifted for decoder of shape (m, n)
  """
  sentences = ["<START> "+s+" <END>" for s in sentences]

  V, word_to_id, id_to_word = generate_vocab(sentences)
  m=len(sentences)
  n=padding_size
  split_sentences = [sentence.split() for sentence in sentences]

  pad_id = word_to_id["[PAD]"]
  unk_id = word_to_id["[UNK]"]
  word_ids = np.full((m, n), pad_id, dtype=int)

  for i, words in enumerate(split_sentences):
    truncated = words[:padding_size]

    if decoder:
      truncated_shifted=[j for j in truncated if j!='<END>']
      mapped_shifted=[word_to_id.get(word, unk_id) for word in truncated_shifted]
      word_ids_shifted=word_ids[:,:-1]
      word_ids_shifted[i, :len(mapped_shifted)] = mapped_shifted

    mapped = [word_to_id.get(word, unk_id) for word in truncated]
    word_ids[i, :len(mapped)] = mapped
  if decoder:
    return word_to_id, id_to_word, V, d, n, m, word_ids_shifted, word_ids
  return word_to_id, id_to_word, V, d, n, m, word_ids

In [None]:
sentences_eng=['the dog is not sitting','the cat is sitting','a cat is playing','the dog is not playing','a cat is not sitting']

sentences_spa=['el perro no esta sentado','el gato esta sentado','un gato esta jugando','el perro no esta jugando','un gato no esta sentado']

word_to_id_en, id_to_word_en, vocab_size_en, dimension, sequence_length_en, batch_size, word_ids_en = tokenization(sentences_eng, d=8, padding_size=7)

word_to_id_dec, id_to_word_dec, vocab_size_dec, dimension, sequence_length_dec, batch_size, word_ids_shifted, target = tokenization(sentences_spa, d=8, padding_size=7, decoder = True)

In [None]:
heads = 2
expansion_dim = dimension * 4
num_layers_enc_dec = 6

In [None]:
class CustomEmbedding(tf.keras.layers.Layer):
  def __init__(self, dimension, vocab_size):
    super().__init__()
    self.embeddings = Embedding(vocab_size, dimension, embeddings_initializer=GlorotUniform(), embeddings_regularizer=L1L2(), mask_zero=True)

  def call(self, x):
    return self.embeddings(x)

In [None]:
class Position_Encoding(tf.keras.layers.Layer):
  def __init__(self, dimension, seq_len):
    super().__init__()
    position = np.arange(seq_len)[:, np.newaxis]
    div_term = 10000 ** (np.arange(dimension)/ dimension)
    self.pos_enc = position / div_term
    self.pos_enc[:,0::2] = np.sin(self.pos_enc[:,0::2])
    self.pos_enc[:,1::2] = np.sin(self.pos_enc[:,1::2])
    self.pos_enc = tf.convert_to_tensor(self.pos_enc, dtype=tf.float32)

  def call(self, x):
    return x + self.pos_enc[:x.shape[1]]

In [None]:
class SelfAttention(tf.keras.layers.Layer):
  def __init__(self, heads, dimension):
    super().__init__()
    self.heads = heads
    self.head_dim = int(dimension/self.heads)
    self.dimension = dimension

    self.Q = Dense(self.head_dim)
    self.K = Dense(self.head_dim)
    self.V = Dense(self.head_dim)

    self.softmax = Softmax()

    self.linear = Dense(self.dimension)
    self.norm = LayerNormalization()

  def get_mask(self, attention_scores):
    batch, heads, query_len, key_len = attention_scores.shape
    mask = tf.experimental.numpy.tril(tf.ones((query_len, key_len)))
    mask = tf.expand_dims(tf.expand_dims(mask,axis=0), axis=1)
    return mask

  def call(self, q, k, v, m=False):
    batch = q.shape[0]
    q_len = q.shape[1]
    k_len = k.shape[1]
    v_len = v.shape[1]

    Q = self.Q(tf.reshape(q, [batch, q_len, self.heads, self.head_dim]))
    K = self.K(tf.reshape(k, [batch, k_len, self.heads, self.head_dim]))
    V = self.V(tf.reshape(v,[batch, v_len, self.heads, self.head_dim]))

    dot_Q_K = tf.einsum("bqhd, bkhd -> bhqk", Q, K) / math.sqrt(self.dimension)

    if m:
      mask = self.get_mask(dot_Q_K)
      dot_Q_K = tf.where(mask == 0, tf.fill(tf.shape(dot_Q_K), float('-inf')), dot_Q_K)

    softmaxx = self.softmax(dot_Q_K)

    attention = tf.einsum("bhqk, bvhd -> bqhd", softmaxx, V)

    multi_heads = tf.reshape(attention, [batch, q_len, self.dimension])

    linear = self.linear(multi_heads)

    addnorm = self.norm(linear + q)

    return addnorm

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, dimension, expansion_dim):
    super().__init__()
    self.linear1 = Dense(expansion_dim, activation='relu')
    self.linear2 = Dense(dimension)
    self.norm = LayerNormalization()

  def call(self, x):
    return self.norm(x + self.linear2(self.linear1(x)))

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, heads, dimension, expansion_dim):
    super().__init__()
    self.attention = SelfAttention(heads, dimension)
    self.ff = FeedForward(dimension, expansion_dim)

  def call(self, q, k, v):
    multiheadout = self.attention(q,k,v)
    return self.ff(multiheadout)

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, heads, dimension, expansion_dim):
    super().__init__()
    self.masked_attention = SelfAttention(heads, dimension)
    self.attention = SelfAttention(heads, dimension)
    self.ff = FeedForward(dimension, expansion_dim)

  def call(self, q, k_dec, v_dec, k_en, v_en):
    masked_headout = self.masked_attention(q, k_dec, v_dec, m=True)
    multiheadout = self.attention(masked_headout, k_en, v_en)
    feed_forw = self.ff(multiheadout)
    return feed_forw

In [None]:
class Transformer(tf.keras.layers.Layer):
  def __init__(self, dimension, vocab_size_en, vocab_size_dec, sequence_length_en, sequence_length_dec, heads, expansion_dim, N):
    super().__init__()
    self.embedding_enc = CustomEmbedding(dimension, vocab_size_en)
    self.embedding_dec = CustomEmbedding(dimension, vocab_size_dec)
    self.pos_enc_en = Position_Encoding(dimension, sequence_length_en)
    self.pos_enc_dec = Position_Encoding(dimension, sequence_length_dec)
    self.encoder = [Encoder(heads, dimension, expansion_dim) for _ in range(N)]
    self.decoder = [Decoder(heads, dimension, expansion_dim) for _ in range(N)]
    self.linear = Dense(vocab_size_dec)

  def call(self, inn):
    word_ids_en = inn[0]
    print(word_ids_en.shape)
    word_ids_dec = inn[1]
    embeddings_en = self.embedding_enc(word_ids_en)
    embeddings_dec = self.embedding_dec(word_ids_dec)
    enc_in = self.pos_enc_en(embeddings_en)
    dec_in = self.pos_enc_dec(embeddings_dec)
    for enc in self.encoder:
        enc_in = enc(enc_in, enc_in, enc_in)
    for dec in self.decoder:
        dec_in = dec(dec_in, dec_in, dec_in, enc_in, enc_in)
    out = self.linear(dec_in)
    return out

In [None]:
word_ids_en_tensor = tf.convert_to_tensor(word_ids_en, dtype=tf.int64)
word_ids_dec_tensor = tf.convert_to_tensor(word_ids_shifted, dtype=tf.int64)

target_label_tensor = tf.convert_to_tensor(word_ids_shifted, dtype=tf.int64)

In [None]:
transformerr = Sequential([Transformer(dimension, vocab_size_en, vocab_size_dec, sequence_length_en, sequence_length_dec, heads, expansion_dim, num_layers_enc_dec)])
transformerr.compile(optimizer=Adam(learning_rate=1e-4), loss=SparseCategoricalCrossentropy(from_logits=True))

In [None]:
transformerr.fit((word_ids_en_tensor, word_ids_dec_tensor), target_label_tensor, batch_size=5, epochs=5000)

Epoch 1/5000
(5, 7)




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch 2501/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 71ms/step - loss: 1.8871
Epoch 2502/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 147ms/step - loss: 1.8870
Epoch 2503/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 76ms/step - loss: 1.8870
Epoch 2504/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 78ms/step - loss: 1.8869
Epoch 2505/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 147ms/step - loss: 1.8868
Epoch 2506/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 143ms/step - loss: 1.8868
Epoch 2507/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step - loss: 1.8867
Epoch 2508/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 79ms/step - loss: 1.8867
Epoch 2509/5000
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 148ms/step - loss: 1.8866
Epoc

<keras.src.callbacks.history.History at 0x79121513ac20>