In [6]:
import random
import string
import pickle
import numpy as np
import tensorflow as tf
from tqdm import tqdm
from typing import List, Tuple, Dict
from itertools import compress
from collections import Counter, deque
from sklearn.base import BaseEstimator, TransformerMixin
from datasets import load_dataset, load_from_disk

To-Do:
- Fix Skipgram
- Fix out of vocab_size index issues
- Build test method/function for use when imported
- Test word2vec embeddings with wiki dataset

In [None]:
#path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

In [None]:
with open(path_to_file) as f:
    words = [word for line in f.readlines() for word in line.split()]

print(f'Number of words: {len(words)}')

In [7]:
def preparePickleData(pickle_file_path: str, vocab_size: int = 50000) -> Tuple[np.ndarray, List[Tuple[str, int]], Dict[str, int], Dict[int, str]]:
    """
    Prepares the data for word vectorization by converting words to indices and creating dictionaries for word-to-index and index-to-word mappings.

    Parameters:
    pickle_file_path: The path to the pickle file containing the corpus of words to be processed.
    vocab_size:       The maximum size of the vocabulary. Default is 50,000.

    Returns:
    Tuple[np.ndarray, List[Tuple[str, int]], dict, dict]:
        - data:        The corpus converted to a NumPy array of word indices.
        - count:       A list of tuples where each tuple contains a word and its frequency, including the <unk> token for rare words.
        - dictionary:  A dictionary mapping words to their corresponding indices.
        - reverse_dictionary:  A dictionary mapping indices to their corresponding words.
    """
    translator = str.maketrans('', '', string.punctuation)
    word_counts = Counter()
    
    # First pass: Count unique words
    with open(pickle_file_path, "rb") as file:
        file.seek(0, 2)  # Move to the end of the file to get its size
        file_size = file.tell()
        file.seek(0)  # Move back to the start of the file
        with tqdm(total=file_size, desc="Counting words") as pbar:
            while True:
                try:
                    words = pickle.load(file)
                    words = [word.lower().translate(translator) for word in words]
                    words = [word for word in words if word.isalpha()]
                    word_counts.update(words)
                    pbar.update(file.tell() - pbar.n)
                except EOFError:
                    break

    # Rare words are replaced by <unk> token
    count = [['<unk>', -1]]
    count.extend(word_counts.most_common(vocab_size - 1))

    # Initialize dictionary for index to word mapping
    dictionary = {word: idx for idx, (word, _) in enumerate(count)}
    reverse_dictionary = {idx: word for word, idx in dictionary.items()}
    
    # Estimate the total number of words for the second pass
    total_words = sum(word_counts.values())
    
    # Initialize NumPy array for word indices
    data = np.zeros(total_words, dtype=np.int32)
    
    # Second pass: Convert words to indices
    index = 0
    unk_count = 0
    with open(pickle_file_path, "rb") as file:
        with tqdm(total=file_size, desc="Converting words to indices") as pbar:
            while True:
                try:
                    words = pickle.load(file)
                    words = [word.lower().translate(translator) for word in words]
                    words = [word for word in words if word.isalpha()]
                    for word in words:
                        if word in dictionary:
                            data[index] = dictionary[word]
                        else:
                            data[index] = 0
                            unk_count += 1
                        index += 1
                    pbar.update(file.tell() - pbar.n)
                except EOFError:
                    break

    count[0][1] = unk_count
    
    return data, count, dictionary, reverse_dictionary

# Example usage
pickle_file_path = 'Resources\\corpus.pkl'
data, count, dictionary, reverse_dictionary = preparePickleData(pickle_file_path, vocab_size=500000)
print(f"\nTotal number of words: {len(data)}")
print(f"Total number of unique words: {len(dictionary)}")
print(f"Most common words: {count[:5]}")
print(f"\nSample data: {data[:10]}")
print(f"Decoded sample data: {[reverse_dictionary[idx] for idx in data[:10]]}")

Counting words: 100%|██████████| 19845159647/19845159647 [26:03<00:00, 12690909.99it/s]
Converting words to indices: 100%|██████████| 19845159647/19845159647 [30:52<00:00, 10713579.64it/s]


Total number of unique words: 500000
Most common words: [['<unk>', 23882695], ('the', 197426772), ('of', 102327621), ('in', 86486718), ('and', 82223696), ('a', 56513157), ('to', 55008144), ('was', 33149692), ('is', 24429821), ('for', 24130351), ('on', 23603557), ('as', 23495232), ('by', 21107004), ('with', 19885853), ('from', 16931502), ('he', 16594540), ('at', 16368794), ('that', 14803061), ('his', 13090739), ('it', 11529692)]
Sample data: [22110     8     5   248  2146     4   837    17     8 15268]
Decoded sample data: ['anarchism', 'is', 'a', 'political', 'philosophy', 'and', 'movement', 'that', 'is', 'skeptical']


In [None]:
def prepareData(words: List[str], vocab_size: int = 50000):
    """
    Prepares the data for word vectorization by converting words to indices and creating dictionaries for word-to-index and index-to-word mappings.

    Parameters:
    words:      The corpus of words to be processed.
    vocab_size: The maximum size of the vocabulary. Default is 50,000.

    Returns:
    Tuple[List[int], List[Tuple[str, int]], dict, dict]:
        - data:        The corpus converted to a list of word indices.
        - count:       A list of tuples where each tuple contains a word and its frequency, including the <unk> token for rare words.
        - dictionary:  A dictionary mapping words to their corresponding indices.
        - reverse_dictionary:  A dictionary mapping indices to their corresponding words.
    """
    ## Corpus pre-processing
    translator = str.maketrans('', '', string.punctuation)
    words = [word.lower().translate(translator) for word in words]
    words = [word for word in words if word.isalpha()]
    
    ## Rare words are replaced by <unk> token
    count = [['<unk>', -1]]
    count.extend(Counter(words).most_common(vocab_size - 1))

    ## Initialize dictionary for index to word mapping
    dictionary = dict()
    for word, _ in count:
        dictionary[word] = len(dictionary)
    
    ## Convert corpus to list of indices
    data = list()
    unk_count = 0
    for word in words:
        if word in dictionary:
            index = dictionary[word]
        else:
            index = 0
            unk_count += 1
        data.append(index)

    count[0][1] = unk_count
    reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    
    return data, count, dictionary, reverse_dictionary



In [None]:
def skipgram (data: List[int], batch_size: int, num_skips: int, skip_window: int, data_index: int = 0):
    """
    Generate a batch of data for the skip-gram model.

    Parameters:
    data:        List of word indices.
    batch_size:  Number of words in each batch.
    num_skips:   How many times to reuse an input to generate a label.
    skip_window: How many words to consider left and right.
    data_index:  Index to start with in the data list. Default is 0.

    Returns:
    Tuple[np.ndarray, np.ndarray]: Batch of input words and corresponding labels.
    """
    assert batch_size % num_skips == 0
    assert num_skips <= 2 * skip_window

    batch = np.ndarray(shape=(batch_size), dtype=np.int32)
    labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
    window_size = 2 * skip_window + 1

    # Create a buffer to store the data
    buffer = deque(maxlen=window_size)
    for _ in range(window_size):
        buffer.append(data[data_index])
        data_index = (data_index + 1) % len(data)

    # Generates the batch of context words and labels
    for i in range(batch_size // num_skips):
        target = skip_window
        targets_to_avoid = [skip_window]

        for j in range(num_skips):
            while target in targets_to_avoid:
                target = random.randint(0, window_size - 1)
            targets_to_avoid.append(target)
            batch[i * num_skips + j] = buffer[skip_window]
            labels[i * num_skips + j, 0] = buffer[target]

        # Move the window
        buffer.append(data[data_index])
        data_index = (data_index + 1) % len(data)

    return batch, labels

In [None]:
def cbow(data: List[int], batch_size: int, num_skips: int, skip_window: int, data_index: int = 0):
    """
    Generate a batch of data for the CBOW model.

    Parameters:
    data:        List of word indices.
    batch_size:  Number of words in each batch.
    num_skips:   How many times to reuse an input to generate a label.
    skip_window: How many words to consider left and right.
    data_index:  Index to start with in the data list. Default is 0.

    Returns:
    Tuple[np.ndarray, np.ndarray]: Batch of context words and corresponding labels.
    """    
    assert batch_size % num_skips == 0
    assert num_skips <= 2 * skip_window

    batch = np.ndarray(shape=(batch_size, num_skips), dtype=np.int32)
    labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
    window_size = 2 * skip_window + 1
    
    # Create a buffer to store the data
    buffer = deque(maxlen=window_size)
    for _ in range(window_size):
        buffer.append(data[data_index])
        data_index = (data_index + 1) % len(data)

    # Generates the batch of context words and labels
    for i in range(batch_size):
        mask = [1] * window_size
        mask[skip_window] = 0
        batch[i] = list(compress(buffer, mask))
        labels[i, 0] = buffer[skip_window]

        # Move the window
        buffer.append(data[data_index])
        data_index = (data_index + 1) % len(data)
        
    return batch, labels

In [None]:
## CBOW is functioning / Skipgram is not functioning -- Code below can be made into a debug function

data, count, dictionary, reverse_dictionary = prepareData(words)

# Call the cbow function
batch_size = 8
num_skips = 4
skip_window = 2

# Print metrics
print(f"Corpus length: {len(data)}")
print(f"Encoded corpus sample: {data[:20]}")
print(f"Decoded corpus sample: {[reverse_dictionary[i] for i in data[:20]]}")

print(f"\nVocabulary size: {len(count)}")
print(f"Index Map size: {len(dictionary)}")
print(f"\nMost common words:\n{sorted(count, key=lambda x: x[1], reverse=True)[:5]}")
print("Index Map Examples:\n", {k: reverse_dictionary[k] for k in list(reverse_dictionary)[:5]})

# Print the first 5 examples of CBOW batches
print("\nExamples of CBOW batches:")
cbow_batch, cbow_labels = cbow(data, batch_size, num_skips, skip_window)
print(f"Batch shape: {cbow_batch.shape}")
print(f"Labels shape: {cbow_labels.shape}")
for i in range(5):
    context_words = [reverse_dictionary[idx] for idx in cbow_batch[i]]
    target_word = reverse_dictionary[cbow_labels[i, 0]]
    print(f"Context words: {context_words}, Target word: {target_word}")

# Print the first 5 examples of Skip-gram batches
print("\nExamples of Skip-gram batches:")
skipgram_batch, skipgram_labels = skipgram(data, batch_size, num_skips, skip_window)
print(f"Batch shape: {skipgram_batch.shape}")
print(f"Labels shape: {skipgram_labels.shape}")
for i in range(5):
    input_words = [reverse_dictionary[idx] for idx in skipgram_batch]
    output_word = reverse_dictionary[skipgram_labels[i, 0]]
    print(f"Input word: {input_words}, Output word: {output_word}")

In [None]:
class Word2Vec(BaseEstimator, TransformerMixin):
    def __init__(self, vocab_size: int = 50000,
                 batch_size: int = 128,
                 embedding_size: int = 128,
                 architecture: str = 'skip-gram',
                 num_skips: int = 2,
                 skip_window: int = 1,
                 loss_type: str = 'sampled_softmax_loss',
                 n_neg_samples: int = 64,
                 optimizer: str = 'adagrad',
                 learning_rate: float = 1.0,
                 n_steps: int = 10001,
                 valid_size: int = 16,
                 valid_window: int = 100):
        self.vocab_size = vocab_size
        self.batch_size = batch_size
        self.embedding_size = embedding_size
        self.architecture = architecture
        self.num_skips = num_skips
        self.skip_window = skip_window
        self.loss_type = loss_type
        self.n_neg_samples = n_neg_samples
        self.optimizer = optimizer
        self.learning_rate = learning_rate
        self.n_steps = n_steps
        self.valid_size = valid_size
        self.valid_window = valid_window

        self.chooseSamples()
        self.chooseGenerator()
        self.__init__model()

    def chooseSamples(self):
        valid_examples = np.array(random.sample(range(self.valid_window), self.valid_size))
        self.valid_examples = valid_examples
    
    def chooseGenerator(self):
        if self.architecture == 'skip-gram':
            self.generator = skipgram
        elif self.architecture == 'cbow':
            self.generator = cbow
        else:
            raise ValueError("Architecture must be either 'skip-gram' or 'cbow'.")
        
    def tokenMapping(self, words):
        data, count, dictionary, reverse_dictionary = prepareData(words, self.vocab_size)
        self.data = data
        self.count = count
        self.dictionary = dictionary
        self.reverse_dictionary = reverse_dictionary
        return data
    
    def __init__model(self):
        self.embeddings = tf.Variable(tf.random.uniform([self.vocab_size, self.embedding_size], -1.0, 1.0))
        self.weights = tf.Variable(tf.random.truncated_normal([self.vocab_size, self.embedding_size], stddev=1.0 / np.sqrt(self.embedding_size)))
        self.biases = tf.Variable(tf.zeros([self.vocab_size]))

        if self.optimizer == 'adagrad':
            self.optimizer = tf.optimizers.Adagrad(learning_rate=self.learning_rate)
        elif self.optimizer == 'SGD':
            self.optimizer = tf.optimizers.SGD(learning_rate=self.learning_rate)
        
        # Compute the similarity distance metrics between individual embeddings
        norm = tf.sqrt(tf.reduce_sum(tf.square(self.embeddings), 1, keepdims=True))
        self.normalized_embeddings = self.embeddings / norm
        self.valid_dataset = tf.constant(self.valid_examples, dtype=tf.int32)
        self.valid_embeddings = tf.nn.embedding_lookup(self.normalized_embeddings, self.valid_dataset)
        self.similarity = tf.matmul(self.valid_embeddings, self.normalized_embeddings, transpose_b=True)

    @tf.function
    def train_step(self, batch_data, batch_labels):
        with tf.GradientTape() as tape:
        
            if self.architecture == 'skip-gram':
                embed = tf.nn.embedding_lookup(self.embeddings, batch_data)
            elif self.architecture == 'cbow':
                embed = tf.zeros([self.batch_size, self.embedding_size])
                for j in range(self.num_skips):
                    embed += tf.nn.embedding_lookup(self.embeddings, batch_data[:, j])
                embed /= self.num_skips
        
            if self.loss_type == 'sampled_softmax_loss':
                loss = tf.nn.sampled_softmax_loss(weights=self.weights,
                                                  biases=self.biases,
                                                  labels=batch_labels,
                                                  inputs=embed,
                                                  num_sampled=self.n_neg_samples,
                                                  num_classes=self.vocab_size)
            elif self.loss_type == 'nce_loss':
                loss = tf.nn.nce_loss(weights=self.weights,
                                      biases=self.biases,
                                      labels=batch_labels,
                                      inputs=embed,
                                      num_sampled=self.n_neg_samples,
                                      num_classes=self.vocab_size)
            loss = tf.reduce_mean(loss)

        gradients = tape.gradient(loss, [self.embeddings, self.weights, self.biases])
        self.optimizer.apply_gradients(zip(gradients, [self.embeddings, self.weights, self.biases]))
        return loss

    def fit(self, words):
        self.data = self.tokenMapping(words)
        average_loss = 0

        for step in range(self.n_steps):
            batch_data, batch_labels = self.generator(self.data, self.batch_size, self.num_skips, self.skip_window)
            loss = self.train_step(batch_data, batch_labels)
            average_loss += loss
            if step % 2000 == 0:
                if step > 0:
                    average_loss /= 2000
                print(f'Average loss at step {step}: {average_loss}')
                average_loss = 0

        self.final_embeddings = self.normalized_embeddings.numpy()
        return self
    
    def get_embedding(self, word):
        if word not in self.dictionary:
            raise ValueError(f"Word '{word}' not in dictionary")
        word_index = self.dictionary[word]
        return self.final_embeddings[word_index]

    def similar_by_word(self, word, top_n=5):
        word_vector = self.get_embedding(word)
        similarities = np.dot(self.final_embeddings, word_vector) / (np.linalg.norm(self.final_embeddings, axis=1) * np.linalg.norm(word_vector))
        similar_indices = np.argsort(-similarities)[:top_n]
        
        print(f"Similar indices for '{word}': {similar_indices}")
        similar_words = []
        for idx in similar_indices:
            if idx in self.reverse_dictionary:
                similar_words.append(self.reverse_dictionary[idx])
            else:
                print(f"Index {idx} not found in reverse_dictionary")
                similar_words.append(f"Index {idx} not found")
        
        return similar_words

In [None]:
# Call the cbow function
batch_size = 128
num_skips = 4
skip_window = 2

word2vec = Word2Vec(architecture='cbow', batch_size=batch_size, num_skips=num_skips, skip_window=skip_window)
model = word2vec.fit(words)

In [None]:
# Find the embedding for a word
try:
    embedding = model.get_embedding('king')
    print(f"Embedding for 'king': {embedding}")
except ValueError as e:
    print(e)

# Find similar words
try:
    similar_words = model.similar_by_word('king')
    print(f"Words similar to 'king': {similar_words}")
except ValueError as e:
    print(e)