<a href="https://colab.research.google.com/github/JuanDavid1217/Generative-AI-from-Scratch/blob/main/Transformers/Embedding_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Primero se necesita entender como funcionan lo Embeddings**

In [None]:
import numpy as np
from pickle import load

In [None]:
class Activation_ReLU:
  def __init__(self):
    self.input=[]
    self.output=[]
    self.dinputs=[]

  def forward(self, input):
    self.input = input
    self.output=np.maximum(0, input)

  def backward(self, dvalues):
    dvalues = np.sum(dvalues, axis=0, keepdims=True)
    self.dinputs=dvalues*np.where(self.output > 0, 1, 0)

class Activation_Sigmoid:
  def __init__(self):
    self.input=[]
    self.output=[]
    self.dinputs=[]

  def forward(self, input):
    self.output = 1 / (1 + np.exp(-input))

  def backward(self, dvalues):
    self.dinputs = dvalues * (1 - self.output) * self.output



In [None]:
class Dense:
  def __init__(self, input_dim, neurons, bias=False):
    self.weights=0.1*np.random.randn(input_dim, neurons)
    self.bias=bias
    if self.bias:
      self.biases=np.zeros((1, neurons))
      self.dbiases=0
    self.input=[]
    self.output=[]
    self.dweights=0
    self.dinputs=0

  def  forward(self, input):
    self.input=input
    if self.bias:
      self.output = np.dot(self.input, self.weights)+self.biases
    else:
      self.output=np.dot(self.input, self.weights)

  def backward(self, dvalues):
    if self.bias:
      self.dbiases=dvalues
    else:
      if dvalues.shape[0]!=1:
        dvalues=dvalues.reshape(1, -1)

    if self.input.shape[0]!=1:
        self.input=self.input.reshape(1, -1)
    self.dweights=np.dot(self.input.T, dvalues)
    self.dinputs=np.dot(dvalues, self.weights.T)


In [None]:
class Optimizer_Adam:
  def __init__(self, learning_rate=0.01, decay=0., epsilon=1e-7,beta_1=0.9, beta_2=0.999):
    self.learning_rate = learning_rate
    self.current_learning_rate = learning_rate
    self.decay = decay
    self.iterations = 0
    self.epsilon = epsilon
    self.beta_1 = beta_1
    self.beta_2 = beta_2


  def pre_update_params(self):
    if self.decay:
      self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

  def update_params(self, layer):
    if not hasattr(layer, 'weight_cache'):
      layer.weight_momentums = np.zeros_like(layer.weights)
      layer.weight_cache = np.zeros_like(layer.weights)
      if hasattr(layer, 'biases'):
        layer.bias_momentums = np.zeros_like(layer.biases)
        layer.bias_cache = np.zeros_like(layer.biases)

    # Update momentum with current gradients
    layer.weight_momentums = self.beta_1 * layer.weight_momentums + (1 - self.beta_1) * layer.dweights
    if hasattr(layer, 'biases'):
      layer.bias_momentums = self.beta_1 * layer.bias_momentums + (1 - self.beta_1) * layer.dbiases
    # Get corrected momentum
    # self.iteration is 0 at first pass
    # and we need to start with 1 here
    weight_momentums_corrected = layer.weight_momentums / (1 - self.beta_1 ** (self.iterations + 1))
    if hasattr(layer, 'biases'):
      bias_momentums_corrected = layer.bias_momentums / (1 - self.beta_1 ** (self.iterations + 1))
    # Update cache with squared current gradients
    layer.weight_cache = self.beta_2 * layer.weight_cache + (1 - self.beta_2) * layer.dweights**2
    if hasattr(layer, 'biases'):
      layer.bias_cache = self.beta_2 * layer.bias_cache + (1 - self.beta_2) * layer.dbiases**2
    # Get corrected cache
    weight_cache_corrected = layer.weight_cache / (1 - self.beta_2 ** (self.iterations + 1))
    if hasattr(layer, 'biases'):
      bias_cache_corrected = layer.bias_cache / (1 - self.beta_2 ** (self.iterations + 1))
    # Vanilla SGD parameter update + normalization
    # with square rooted cache
    layer.weights += -self.current_learning_rate * weight_momentums_corrected / (np.sqrt(weight_cache_corrected) + self.epsilon)
    if hasattr(layer, 'biases'):
      layer.biases += -self.current_learning_rate * bias_momentums_corrected / (np.sqrt(bias_cache_corrected) + self.epsilon)

  # Call once after any parameter updates
  def post_update_params(self):
    self.iterations += 1

In [None]:
class LossBinaryCrossEntropy():
  def forward(self, y_pred, y_true):
    y_pred_clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)

    sample_losses = -(y_true * np.log(y_pred_clipped) + (1 - y_true) * np.log(1 - y_pred_clipped))
    #sample_losses = np.mean(sample_losses, axis=1)

    return sample_losses

  def backward(self, dvalues, y_true):
    samples = len(dvalues)
    outputs = len(dvalues[0])

    clipped_dvalues = np.clip(dvalues, 1e-7, 1 - 1e-7)

    self.dinputs = -(y_true / clipped_dvalues - (1 - y_true)
                     /(1 - clipped_dvalues))/outputs

    self.dinputs = self.dinputs / samples

In [None]:
class Embedding:
  def __init__(self, vocab_size, sentence_len, latent_dim):
    self.vocab_size = vocab_size
    self.sentence_len = sentence_len
    self.layer1 = Dense(vocab_size, latent_dim, True)
    self.activationLayer1 = Activation_ReLU()
    self.layer2 = Dense(latent_dim, vocab_size, True)
    self.activationLayer2 = Activation_Sigmoid()
    self.optimizer = Optimizer_Adam()
    self.trainableLayers = [self.layer1, self.layer2]
    self.lossFunction = LossBinaryCrossEntropy()

  def forward(self, input):
    self.layer1.forward(input)
    self.activationLayer1.forward(self.layer1.output)
    self.layer2.forward(self.activationLayer1.output)
    self.activationLayer2.forward(self.layer2.output)
    return self.activationLayer2.output

  def backward(self, loss):
    self.activationLayer2.backward(loss)
    self.layer2.backward(self.activationLayer2.dinputs)
    self.activationLayer1.backward(self.layer2.dinputs)
    self.layer1.backward(self.activationLayer1.dinputs)

  def generateOneHotVector(self, input):
    oneHotVectors = []
    for position in input:
      oneHot = np.zeros(self.vocab_size)
      oneHot[position] = 1
      oneHotVectors.append(oneHot)
    return oneHotVectors

  def get_SumContext(self, vector):
    y_labels=[]
    for i in range(len(vector)):
        contexts=[]
        if i == 0:
          contexts = vector[i+1:i+3]
        else:
          min=i-2
          if min<=0:
            min=0
          if len(vector[i+1:i+3])!=0 and len(vector[min:i])!=0:
            contexts = np.concatenate((vector[min:i], vector[i+1:i+3]))
          elif len(vector[min:i])==0:
            contexts = vector[i+1:i+3]
          else:
            contexts = vector[min:i]
          #print("vector[min:i]: ", vector[min:i])
        #print("Contexts: ", contexts)
        context=[]
        for array in contexts:
          if len(context)==0:
            context = array
          else:
            context += array
        #print("Sum_context: ", context)
        y_labels.append(context)
        #print("y_labels: ", y_labels)
    print(f"\nx_labels: {vector}")
    return y_labels

  def train(self, input, epoch):
    for i in range(epoch):
      lossByEpoch=0
      for sentence in input:
        lossBySentence=0
        x_labels= self.generateOneHotVector(sentence)
        print(f"\nx_labels: {x_labels}")
        y_labels = self.get_SumContext(x_labels)
        #print("\n")
        for i in range(len(x_labels)):
          prediction = self.forward(y_labels[i])
          print("prediction: ", prediction)
          print(f"waited {i}: {x_labels[i]}")
          loss = prediction - x_labels[i]#self.lossFunction.forward(prediction,x_labels[i])
          lossBySentence = np.mean(np.sqrt(loss*loss))
          self.backward(loss[0])
          self.optimizer.pre_update_params()
          for layer in self.trainableLayers:
            self.optimizer.update_params(layer)
          self.optimizer.post_update_params()
        lossByEpoch+=lossBySentence/len(x_labels)
      print("Loss: ", lossByEpoch/len(input))

  def getEmbedding(self, input):
    oneHotVectors = self.generateOneHotVector(input)
    embeddings = []
    for oneHotVector in oneHotVectors:
      self.layer1.forward(oneHotVector)
      self.activationLayer1.forward(self.layer1.output)
      embeddings.append(self.activationLayer1.output)
    return embeddings



Manejo de datos

In [None]:
# Leer set de entrenamiento
filename = './english-spanish.pkl'

#dataset = load(open(filename, 'rb'))
#print(dataset[120000,0])
#print(dataset[120000,1])
dataset = np.array([np.array(["el rey de Inglaterra es un hombre","the king of England is a man"]),
                    np.array(["la reina de Inglaterra es una mujer","the queen of England is a woman"]),
                    np.array(["Carlos es un rey","Carlos is a king"]),
                    np.array(["Andrea es una reina","Andrea is a queen"])
                    ])
print(dataset[0,1])

the king of England is a man


In [None]:
# Crear "tokens"
source_tokens = []
for sentence in dataset[:,0]:
  source_tokens.append(sentence.split(' '))
print(source_tokens[0])

target_tokens = []
for sentence in dataset[:,1]:
  target_tokens.append(sentence.split(' '))
print(target_tokens[0])

['el', 'rey', 'de', 'Inglaterra', 'es', 'un', 'hombre']
['the', 'king', 'of', 'England', 'is', 'a', 'man']


In [None]:
def build_token_dict(token_list):
  token_dict = {
  #    '<PAD>': 0,
  #    '<START>': 1,
  #    '<END>': 2
  }
  for tokens in token_list:
    for token in tokens:
      if token not in token_dict:
        token_dict[token] = len(token_dict)
  return token_dict

In [None]:
source_token_dict = build_token_dict(source_tokens)
source_token_dict_inv = {v:k for k,v in source_token_dict.items()}
target_token_dict = build_token_dict(target_tokens)
target_token_dict_inv = {v:k for k,v in target_token_dict.items()}

print(source_token_dict)
print(target_token_dict)
print(target_token_dict_inv)

{'el': 0, 'rey': 1, 'de': 2, 'Inglaterra': 3, 'es': 4, 'un': 5, 'hombre': 6, 'la': 7, 'reina': 8, 'una': 9, 'mujer': 10, 'Carlos': 11, 'Andrea': 12}
{'the': 0, 'king': 1, 'of': 2, 'England': 3, 'is': 4, 'a': 5, 'man': 6, 'queen': 7, 'woman': 8, 'Carlos': 9, 'Andrea': 10}
{0: 'the', 1: 'king', 2: 'of', 3: 'England', 4: 'is', 5: 'a', 6: 'man', 7: 'queen', 8: 'woman', 9: 'Carlos', 10: 'Andrea'}


In [None]:
# Agregar start, end y pad a cada frase del set de entrenamiento
#encoder_tokens = [['<START>'] + tokens + ['<END>'] for tokens in source_tokens]
encoder_tokens = [tokens for tokens in source_tokens]
#decoder_tokens = [['<START>'] + tokens + ['<END>'] for tokens in target_tokens]
decoder_tokens = [tokens for tokens in target_tokens]
#output_tokens = [tokens + ['<END>'] for tokens in target_tokens]
output_tokens = [tokens for tokens in target_tokens]

source_max_len = max(map(len, encoder_tokens))
target_max_len = max(map(len, decoder_tokens))

#encoder_tokens = [tokens + ['<PAD>']*(source_max_len-len(tokens)) for tokens in encoder_tokens]
#decoder_tokens = [tokens + ['<PAD>']*(target_max_len-len(tokens)) for tokens in decoder_tokens]
#output_tokens = [tokens + ['<PAD>']*(target_max_len-len(tokens)) for tokens in output_tokens ]

In [None]:
print(encoder_tokens[1])

['la', 'reina', 'de', 'Inglaterra', 'es', 'una', 'mujer']


In [None]:
encoder_input = [list(map(lambda x: source_token_dict[x], tokens)) for tokens in encoder_tokens]
decoder_input = [list(map(lambda x: target_token_dict[x], tokens)) for tokens in decoder_tokens]
output_decoded = [list(map(lambda x: [target_token_dict[x]], tokens)) for tokens in output_tokens]

In [None]:
print(encoder_input[1])

[7, 8, 2, 3, 4, 9, 10]


Entrenamos el Embedding

In [None]:
vocab_size = len(source_token_dict)
print(vocab_size)
print(target_max_len)
print(len(decoder_input))

13
7
4


In [None]:
embedding = Embedding(vocab_size, source_max_len, 3)

In [None]:
embedding.train(encoder_input, 800)

[1;30;43mSe truncaron las últimas líneas 5000 del resultado de transmisión.[0m

x_labels: [array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.]), array([0., 0., 0., 0., 1., 2., 0., 0., 0., 0., 0., 0., 0.]), array([0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.]), array([0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])]
prediction:  [[1.17169675e-14 2.95406755e-37 5.39085154e-22 9.03836208e-15
  8.77136147e-07 2.60834392e-05 1.55670983e-13 8.42730801e-11
  1.15918790e-45 1.37138535e-04 5.60737184e-15 1.37273693e-14
  4.71413292e-15]]
waited 0: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
prediction:  [[9.17098582e-04 4.82705569e-29 1.51439786e-23 1.51855782e-11
  9.99983402e-01 9.99999997e-01 2.54041428e-10 1.19892284e-15
  1.43674086e-52 8.20655803e-04 9.23404750e-12 1.54200380e-10
  2.24080447e-11]]
waited 1: [0. 0. 0. 0. 1. 2. 0. 0. 0. 0. 0. 0. 0.]
prediction:  [[8.28664480e-08 1.83994435e-36 3.34060143e-26 2.35786641e-16
  1.27685176e-04 9.99566941e-01 1.11387985e-14 2.3

In [None]:
clave=source_token_dict['Andrea']
print(clave)
valor=source_token_dict_inv[clave]
print(valor)
#print(source_token_dict['women'])
#print(source_token_dict['queen'])

12
Andrea


In [None]:
embedding_dic={}
for i in range(len(source_token_dict)):
  response = embedding.getEmbedding([i])
  embedding_dic[i]=response
print(len(embedding_dic))


13


In [None]:
embedding_of_rey = embedding_dic[1]
print(embedding_of_rey)

[array([[0.43811475, 7.39692599, 2.19951235]])]


In [None]:
embedding_of_hombre = embedding_dic[6]
print(embedding_of_hombre)

[array([[0.24362699, 9.6184082 , 0.        ]])]


In [None]:
embedding_of_mujer = embedding_dic[10]
print(embedding_of_mujer)

[array([[9.55433612, 3.05476335, 0.        ]])]


In [None]:
analogia = (embedding_of_rey[0]-embedding_of_hombre[0])+embedding_of_mujer[0]
print(analogia)
distances={}
for key, value in embedding_dic.items():
  distance=cosine_similarity(analogia, value)
  distances[source_token_dict_inv[key]]=distance
sorted_similarities = sorted(distances.items(), key=lambda x: x[1])
print(sorted_similarities[0])
#print(source_token_dict_inv[item])
print(embedding_dic[8])

[[9.74882388 0.83328114 2.19951235]]
('reina', 2.405833143970405)
[array([[8.3246671 , 0.98033764, 1.36489249]])]


In [None]:
def cosine_similarity(v1, v2):
  #return (np.array(v1)@np.array(v2))/(np.linalg.norm(v1)*np.linalg.norm(v2))
  return np.sum(np.sqrt(np.power((v1-v2),2)))

In [None]:
def most_similar(word, word_dict, top_k=4):
  if word not in word_dict:
    raise ValueError(f"{word} not found in the dictionary")
  else:
    key = word_dict[word]

    queryVector = embedding_dic[key]

    similarities = {}
    for key2, value in embedding_dic.items():
      if key!=key2:
        similarity = cosine_similarity(queryVector[0][0], value[0][0])
        similarities[source_token_dict_inv[key2]]=similarity
  sorted_similarities = sorted(similarities.items(), key=lambda x: x[1])
  return sorted_similarities[:top_k]

In [None]:
most_similar('rey', source_token_dict, 4)

[('hombre', 4.615482320223763),
 ('Carlos', 5.788760441115224),
 ('una', 6.911265368515236),
 ('un', 8.33205470704645)]