Importar librerias  

In [None]:
import io
import re
import string
import tqdm

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers

SEED = 42
AUTOTUNE = tf.data.AUTOTUNE

Tokenizacion

In [None]:
sentence = "The wide road shimmered in the hot sun"
tokens = list(sentence.lower().split())
print(len(tokens))

8


In [None]:
#Crear vocabulario
vocab, index = {}, 1
vocab['pad']= 0  #padding
for token in tokens:
  if token not in vocab:
    vocab[token] = index
    index +=1

vocab_size=len(vocab)
print(vocab)


{'pad': 0, 'the': 1, 'wide': 2, 'road': 3, 'shimmered': 4, 'in': 5, 'hot': 6, 'sun': 7}


In [None]:
# inversa del vocabulario
inverse_vocab = {index: token for token, index in vocab.items()}
print(inverse_vocab)

{0: 'pad', 1: 'the', 2: 'wide', 3: 'road', 4: 'shimmered', 5: 'in', 6: 'hot', 7: 'sun'}


In [None]:
example_sequence = [vocab[word] for word in tokens]
print(example_sequence)

[1, 2, 3, 4, 5, 1, 6, 7]


Generar skip-grams

In [None]:
window_size = 2
positive_skip_grams, _ = tf.keras.preprocessing.sequence.skipgrams(
    example_sequence,
    vocabulary_size=vocab_size,
    window_size= window_size,
    negative_samples=0)
print(len(positive_skip_grams))

for target, context in positive_skip_grams[:5]:
  print(f"({target}, {context}): ({inverse_vocab[target]},{inverse_vocab[context]}))")

26
(3, 5): (road,in))
(7, 6): (sun,hot))
(5, 3): (in,road))
(5, 1): (in,the))
(7, 1): (sun,the))


Generar muestras negativas

In [None]:
target_word, context_word = positive_skip_grams[0]
num_ns = 4 #Numero de muestras negativas

context_class = tf.reshape(tf.constant(context_word, dtype="int64"), (1,1))

negative_sampling_candidates, _ , _ = tf.random.log_uniform_candidate_sampler(
    true_classes=context_class,
    num_true=1,
    num_sampled = num_ns,
    unique = True,
    range_max = vocab_size,
    seed = SEED,
    name = "negative_sampling"
)

print(negative_sampling_candidates)
print([inverse_vocab[index.numpy()] for index in negative_sampling_candidates])

tf.Tensor([2 1 4 3], shape=(4,), dtype=int64)
['wide', 'the', 'shimmered', 'road']


In [None]:
squeezed_context_class = tf.squeeze(context_class, 1)
context = tf.concat([squeezed_context_class, negative_sampling_candidates], 0)
label = tf.constant([1] + [0] * num_ns, dtype="int64")
target = target_word

print(f"target_index     :{target}")
print(f"target_word      :{inverse_vocab[target_word]}")
print(f"context_indices  :{context}")
print(f"context_words    :{[inverse_vocab[c.numpy()] for c in context]}")
print(f"label            :{label}")

target_index     :3
target_word      :road
context_indices  :[5 2 1 4 3]
context_words    :['in', 'wide', 'the', 'shimmered', 'road']
label            :[1 0 0 0 0]


Generar datos de entrenamiento

In [None]:
def generate_training_data(sequences, window_size, num_ns, vocab_size, seed):
  targets, contexts, labels =[],[],[]

  sampling_table = tf.keras.preprocessing.sequence.make_sampling_table(vocab_size)

  for sequence in tqdm.tqdm(sequences):

    #Generar skip_gram positivo de cada palabra
    positive_skip_grams, _ = tf.keras.preprocessing.sequence.skipgrams(
      sequence,
      vocabulary_size= vocab_size,
      sampling_table=sampling_table,
      window_size=window_size,
      negative_samples=0)

    #Generar muestras negativas
    for target_word, context_word in positive_skip_grams:

      context_class = tf.expand_dims(
          tf.constant([context_word], dtype="int64"), 1)

      negative_sampling_candidates, _, _ =tf.random.log_uniform_candidate_sampler(
          true_classes=context_class,
          num_true=1,
          num_sampled=num_ns,
          unique=True,
          range_max=vocab_size,
          seed = seed,
          name = "negative_sampling")

      # Concatenar y etiquetas
      context = tf.concat([tf.squeeze(context_class,1), negative_sampling_candidates], 0)
      label = tf.constant([1] + [0]*num_ns, dtype="int64")

      #Agregar a la lista
      targets.append(target_word)
      contexts.append(context)
      labels.append(label)

  return targets, contexts, labels

Descargar documento (Text Corpus)

In [None]:
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


In [None]:
text_ds = tf.data.TextLineDataset(path_to_file).filter(lambda x: tf.cast(tf.strings.length(x),bool))

Estandarizacion

In [None]:
def custom_stadardization(input_data):
  lowercase = tf.strings.lower(input_data)
  return tf.strings.regex_replace(lowercase,
                                  '[%s]' % re.escape(string.punctuation), '')

vocab_size = 4096
sequence_length = 10

vectorize_layer = layers.TextVectorization(
    standardize = custom_stadardization,
    max_tokens = vocab_size,
    output_mode = 'int',
    output_sequence_length = sequence_length)

vectorize_layer.adapt(text_ds.batch(1024))

In [None]:
inverse_vocab = vectorize_layer.get_vocabulary()
print(inverse_vocab[:20])

['', '[UNK]', 'the', 'and', 'to', 'i', 'of', 'you', 'my', 'a', 'that', 'in', 'is', 'not', 'for', 'with', 'me', 'it', 'be', 'your']


In [None]:
text_vector_ds = text_ds.batch(1024).prefetch(AUTOTUNE).map(vectorize_layer).unbatch()

In [None]:
sequences = list(text_vector_ds.as_numpy_iterator())
print(len(sequences))
for seq in sequences[:10]:
  print(f"{seq} => {[inverse_vocab[i] for i in seq]}")

32777
[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']
[138  36 982 144 673 125  16 106   0   0] => ['before', 'we', 'proceed', 'any', 'further', 'hear', 'me', 'speak', '', '']
[34  0  0  0  0  0  0  0  0  0] => ['all', '', '', '', '', '', '', '', '', '']
[106 106   0   0   0   0   0   0   0   0] => ['speak', 'speak', '', '', '', '', '', '', '', '']
[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']
[   7   41   34 1286  344    4  200   64    4 3690] => ['you', 'are', 'all', 'resolved', 'rather', 'to', 'die', 'than', 'to', 'famish']
[34  0  0  0  0  0  0  0  0  0] => ['all', '', '', '', '', '', '', '', '', '']
[1286 1286    0    0    0    0    0    0    0    0] => ['resolved', 'resolved', '', '', '', '', '', '', '', '']
[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']
[  89    7   93 1187  225   12 2442  592    4    2] => ['first', 'you', 'kno

Generar datos de entrenamiento

In [None]:
targets, contexts, labels = generate_training_data(
    sequences = sequences,
    window_size= 2,
    num_ns=4,
    vocab_size=vocab_size,
    seed=SEED
)

targets = np.array(targets)
contexts = np.array(contexts)
labels= np.array(labels)

print('\n')
print(f"targets.shape: {targets.shape}")
print(f"contexts.shape: {contexts.shape}")
print(f"labels.shape: {labels.shape}")

100%|██████████| 32777/32777 [00:58<00:00, 562.17it/s]




targets.shape: (64721,)
contexts.shape: (64721, 5)
labels.shape: (64721, 5)


In [None]:
BATCH_SIZE = 1024
BUFFER_SIZE = 10000

dataset = tf.data.Dataset.from_tensor_slices(((targets,contexts),labels))
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder = True)

print(dataset)

<_BatchDataset element_spec=((TensorSpec(shape=(1024,), dtype=tf.int64, name=None), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None)), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None))>


In [None]:
dataser = dataset.cache().prefetch(buffer_size=AUTOTUNE)

Modelo de la RED

In [None]:
class Word2Vec(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim):
    super(Word2Vec, self).__init__()
    self.target_embedding = layers.Embedding(vocab_size,
                                             embedding_dim,
                                             input_length = 1,
                                             name="W2v_embedding")
    self.context_embedding = layers.Embedding(vocab_size,
                                             embedding_dim,
                                             input_length = 5)
  def call(self,pair):
    target, context = pair
    if len(target.shape) == 2:
      target = tf.squeeze(target,axis=1)

    word_emb = self.target_embedding(target)

    context_emb = self.context_embedding(context)
    dots = tf.einsum('be,bce->bc', word_emb, context_emb)

    return dots

Definir función de perdida y compilar

In [None]:
def custom_loss(x_logit,y_true):
  return tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=y_true)

In [None]:
embedding_dim=128
word2vec = Word2Vec(vocab_size, embedding_dim)
word2vec.compile(optimizer = 'adam',
                 loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                 metrics = ['accuracy'])

Entrenamiento de la red

In [None]:
word2vec.fit(dataset, epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x7ca59a637f10>

Descargar pesos

In [None]:
weight = word2vec.get_layer('W2v_embedding').get_weights()[0]
vocab = vectorize_layer.get_vocabulary()

out_v = io.open('vectors.tsv', 'w', encoding='utf-8')
out_m = io.open('metadata.tsv', 'w', encoding='utf-8')

for index, word in enumerate(vocab):
  if index == 0:
    continue
  vec = weight[index]
  out_v.write('\t'.join([str(x) for x in vec]) + "\n")
  out_m.write(word + "\n")
out_v.close()
out_m.close()