In [1]:
import io
import re
import string
import tqdm

import numpy as np

import tensorflow as tf
from tensorflow.keras import layers

### Word2Vec


We'll be using Skip-Gram for this example. <br/>
The objective of skip-gram is to maximize the probability of predicting the context of the given target word. 

In [2]:
SEED = 42
AUTOTUNE = tf.data.AUTOTUNE

In [3]:
# let tokenize this sentence 
sentence = "The quick brown fox jumps over the lazy dogs"
tokens = list(sentence.lower().split())

In [4]:
# create a vocabulary to save the mappings from tokens to their integer indices

vocab = {}
index = 1
vocab['<pad>'] = 0 # add a padding token

for token in tokens:
    if token not in vocab:
        vocab[token] = index
        index += 1
vocab_size = len(vocab)
print(vocab_size, vocab)

9 {'<pad>': 0, 'the': 1, 'quick': 2, 'brown': 3, 'fox': 4, 'jumps': 5, 'over': 6, 'lazy': 7, 'dogs': 8}


In [5]:
# generate skip-grams for one sentence
# skip grams are basically combinations of target word and surrounding words in the sliding window 

example_sequence = [vocab[word] for word in tokens]
window_size = 2

positive_skip_grams, _ = tf.keras.preprocessing.sequence.skipgrams(
    example_sequence, vocabulary_size=vocab_size, window_size=window_size, negative_samples=0
)

print(positive_skip_grams)
print(len(positive_skip_grams))

for target, context in positive_skip_grams[:5]:
    print(f"({target}, {context}): ({tokens[target]}, {tokens[context]})")

[[4, 5], [2, 4], [5, 1], [1, 5], [5, 4], [5, 6], [3, 1], [6, 1], [7, 1], [7, 8], [2, 1], [8, 7], [6, 7], [6, 4], [3, 5], [1, 7], [5, 3], [1, 2], [2, 3], [7, 6], [1, 6], [1, 3], [4, 2], [3, 4], [4, 6], [3, 2], [8, 1], [1, 8], [6, 5], [4, 3]]
30
(4, 5): (jumps, over)
(2, 4): (brown, jumps)
(5, 1): (over, quick)
(1, 5): (quick, over)
(5, 4): (over, jumps)


In [6]:
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

with open(path_to_file) as f:
  lines = f.read().splitlines()


In [7]:
text_ds = tf.data.TextLineDataset(path_to_file).filter(lambda x: tf.cast(tf.strings.length(x), bool))

In [8]:
# vectorize sentences from the corpus

# custom standardizing function to convert text to lowercase and remove punctuations
def custom_std(input_data):
    lowercase = tf.strings.lower(input_data)
    return tf.strings.regex_replace(lowercase, '[%s]' % re.escape(string.punctuation), '')

vocab_size = 4096
sequence_length = 10

vectorize_layer = layers.TextVectorization(
    standardize=custom_std,
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length
)

vectorize_layer.adapt(text_ds.batch(1024))

In [9]:
inverse_vocab = vectorize_layer.get_vocabulary()
print(inverse_vocab[:20])

['', '[UNK]', 'the', 'and', 'to', 'i', 'of', 'you', 'my', 'a', 'that', 'in', 'is', 'not', 'for', 'with', 'me', 'it', 'be', 'your']


In [10]:
# Vectorize the data in text_ds.
text_vector_ds = text_ds.batch(1024).prefetch(AUTOTUNE).map(vectorize_layer).unbatch()

In [11]:
sequences = list(text_vector_ds.as_numpy_iterator())
print(len(sequences))

32777


In [12]:
for seq in sequences[:5]:
  print(f"{seq} => {[inverse_vocab[i] for i in seq]}")

[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']
[138  36 982 144 673 125  16 106   0   0] => ['before', 'we', 'proceed', 'any', 'further', 'hear', 'me', 'speak', '', '']
[34  0  0  0  0  0  0  0  0  0] => ['all', '', '', '', '', '', '', '', '', '']
[106 106   0   0   0   0   0   0   0   0] => ['speak', 'speak', '', '', '', '', '', '', '', '']
[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']


In [13]:
# Generates skip-gram pairs with negative sampling for a list of sequences
# (int-encoded sentences) based on window size, number of negative samples
# and vocabulary size.
def generate_training_data(sequences, window_size, num_ns, vocab_size, seed):
  # Elements of each training example are appended to these lists.
  targets, contexts, labels = [], [], []

  # Build the sampling table for `vocab_size` tokens.
  sampling_table = tf.keras.preprocessing.sequence.make_sampling_table(vocab_size)

  # Iterate over all sequences (sentences) in the dataset.
  for sequence in tqdm.tqdm(sequences):

    # Generate positive skip-gram pairs for a sequence (sentence).
    positive_skip_grams, _ = tf.keras.preprocessing.sequence.skipgrams(
          sequence,
          vocabulary_size=vocab_size,
          sampling_table=sampling_table,
          window_size=window_size,
          negative_samples=0)

    # Iterate over each positive skip-gram pair to produce training examples
    # with a positive context word and negative samples.
    for target_word, context_word in positive_skip_grams:
      context_class = tf.expand_dims(
          tf.constant([context_word], dtype="int64"), 1)
      negative_sampling_candidates, _, _ = tf.random.log_uniform_candidate_sampler(
          true_classes=context_class,
          num_true=1,
          num_sampled=num_ns,
          unique=True,
          range_max=vocab_size,
          seed=seed,
          name="negative_sampling")

      # Build context and label vectors (for one target word)
      context = tf.concat([tf.squeeze(context_class,1), negative_sampling_candidates], 0)
      label = tf.constant([1] + [0]*num_ns, dtype="int64")

      # Append each element from the training example to global lists.
      targets.append(target_word)
      contexts.append(context)
      labels.append(label)

  return targets, contexts, labels

In [14]:
targets, contexts, labels = generate_training_data(
    sequences=sequences,
    window_size=2,
    num_ns=4,
    vocab_size=vocab_size,
    seed=SEED)

targets = np.array(targets)
contexts = np.array(contexts)
labels = np.array(labels)

print('\n')
print(f"targets.shape: {targets.shape}")
print(f"contexts.shape: {contexts.shape}")
print(f"labels.shape: {labels.shape}")

100%|██████████| 32777/32777 [00:17<00:00, 1917.94it/s]




targets.shape: (65176,)
contexts.shape: (65176, 5)
labels.shape: (65176, 5)


In [15]:
# the tf.data.Dataset helps to operform efficient batching for large number of training examples
BATCH_SIZE = 1024
BUFFER_SIZE = 10000
dataset = tf.data.Dataset.from_tensor_slices(((targets, contexts), labels))
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
print(dataset)

<_BatchDataset element_spec=((TensorSpec(shape=(1024,), dtype=tf.int64, name=None), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None)), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None))>


In [16]:
dataset = dataset.cache().prefetch(buffer_size=AUTOTUNE)
print(dataset)

<_PrefetchDataset element_spec=((TensorSpec(shape=(1024,), dtype=tf.int64, name=None), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None)), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None))>


In [17]:
class Word2Vec(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim):
    super(Word2Vec, self).__init__()
    self.target_embedding = layers.Embedding(vocab_size,
                                      embedding_dim,
                                      name="w2v_embedding")
    self.context_embedding = layers.Embedding(vocab_size,
                                       embedding_dim)

  def call(self, pair):
    target, context = pair
    # target: (batch, dummy?)  # The dummy axis doesn't exist in TF2.7+
    # context: (batch, context)
    if len(target.shape) == 2:
      target = tf.squeeze(target, axis=1)
    # target: (batch,)
    word_emb = self.target_embedding(target)
    # word_emb: (batch, embed)
    context_emb = self.context_embedding(context)
    # context_emb: (batch, context, embed)
    dots = tf.einsum('be,bce->bc', word_emb, context_emb)
    # dots: (batch, context)
    return dots

In [18]:
def custom_loss(x_logit, y_true):
    return tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=y_true)

In [19]:
embedding_dim = 128
word2vec = Word2Vec(vocab_size, embedding_dim)
word2vec.compile(optimizer='adam', 
                 loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                 metrics=['accuracy'])

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='logs')

word2vec.fit(dataset, epochs=20, callbacks=[tensorboard_callback])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x169ba9da920>

In [23]:
from sklearn.metrics.pairwise import cosine_similarity

In [25]:
# Grab the learned embedding matrix (target side)
embedding_matrix = word2vec.target_embedding.weights[0].numpy()   # shape: (vocab_size, embedding_dim)


# map word to index
word_to_id = {w: i for i, w in enumerate(inverse_vocab)}   # `inverse_vocab` from the vectorizer
id_to_word = {i: w for w, i in word_to_id.items()}

def get_embedding(word: str) -> np.ndarray:
    """Return the embedding vector for `word` (or raise if OOV)."""
    idx = word_to_id.get(word)
    if idx is None:
        raise ValueError(f"'{word}' not in vocabulary")
    return embedding_matrix[idx]


# Find top-k similar words (cosine similarity)
def most_similar(query: str, top_k: int = 5):
    query_vec = get_embedding(query).reshape(1, -1)
    sims = cosine_similarity(query_vec, embedding_matrix)[0]          # similarity to every vocab entry
    # Exclude the query word itself
    sims[word_to_id[query]] = -np.inf
    top_ids = np.argsort(sims)[-top_k:][::-1]                        # descending order
    return [(id_to_word[i], sims[i]) for i in top_ids]


In [34]:
word = "heart"
for w, score in most_similar(word):
    print(f"{w:>12}  (cosine ≈ {score:.4f})")

      excuse  (cosine ≈ 0.5144)
    shepherd  (cosine ≈ 0.4980)
    marrying  (cosine ≈ 0.4657)
   authority  (cosine ≈ 0.4626)
   signories  (cosine ≈ 0.4623)
