# Hand coded Word2Vec

1. Take a fake problem
2. Solve it using neural network
3. As a side effect, you get word embeddings

Our fake problem: try to fill in a missing word in a sentence 

In [1]:
import nltk
from nltk.corpus import brown

import re
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

In [2]:
class Config:
    context_window = 5
    count_words = 0

    num_epochs = 20
    batch_size = 128
    embedding_dim = 100
    lr=0.001
    loss_function = nn.CrossEntropyLoss()
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Pytorch is running on {DEVICE}")


cfg = Config()

Pytorch is running on cuda


In [4]:
nltk.download('brown')

[nltk_data] Downloading package brown to C:\Users\pc-de-
[nltk_data]     caselli\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\brown.zip.


True

In [3]:
sentences = brown.sents()
print(f"Loaded {len(sentences)} sentences from the Brown Corpus.")

Loaded 57340 sentences from the Brown Corpus.


In [8]:
all_texts = [" ".join(s).lower() for s in sentences]

In [11]:
pattern = r'[^\W\d_]+|[0-9]+'
final_str_set = set()

for text in all_texts:
    # re.UNICODE is good practice
    tokens = re.findall(pattern, text, re.UNICODE) 
    final_str_set.update(tokens)

In [22]:
cfg.count_words = len(final_str_set)
word_to_int = {w: i for i, w in enumerate(final_str_set)}
int_to_word = {i: w for w, i in word_to_int.items()}

In [13]:
def generate_cbow_training_data(texts: list, word_to_int: dict):
    pattern = r'[^\W\d_]+|[0-9]+'
    
    all_training_pairs = []

    for text in texts:
        text = text.lower()
        tokens = re.findall(pattern, text, re.UNICODE) 
        
        if not tokens:
            continue
        
        token_count = len(tokens)

        for i in range(token_count):
            center_word = tokens[i]
            center_word_index = word_to_int.get(center_word) 

            if center_word_index is None:
                continue

            context_indices = []
            
            start_idx = max(0, i - cfg.context_window)
            end_idx = min(token_count, i + 1 + cfg.context_window)

            for j in range(start_idx, end_idx):
                if i == j:
                    continue
                context_word = tokens[j]
                context_word_index = word_to_int.get(context_word)
                
                if context_word_index is not None:
                    context_indices.append(context_word_index)
            
            if not context_indices:
                continue
            
            all_training_pairs.append((context_indices, center_word_index))
            
    return all_training_pairs

generate_cbow_training_data(["I try and avoid this sort of conflict"], word_to_int)

[([18321, 36068, 14995, 6263, 27484], 22991),
 ([22991, 36068, 14995, 6263, 27484, 28784], 18321),
 ([22991, 18321, 14995, 6263, 27484, 28784, 19156], 36068),
 ([22991, 18321, 36068, 6263, 27484, 28784, 19156], 14995),
 ([22991, 18321, 36068, 14995, 27484, 28784, 19156], 6263),
 ([22991, 18321, 36068, 14995, 6263, 28784, 19156], 27484),
 ([18321, 36068, 14995, 6263, 27484, 19156], 28784),
 ([36068, 14995, 6263, 27484, 28784], 19156)]

In [141]:
def generate_positional_training_data(texts: list, word_to_int: dict):
    pattern = r'[^\W\d_]+|[0-9]+'
    
    all_training_pairs = []

    for text in texts:
        text = text.lower()
        tokens = re.findall(pattern, text, re.UNICODE) 
        
        if not tokens:
            continue
        
        token_count = len(tokens)

        for i in range(token_count):
            center_word = tokens[i]
            center_word_index = word_to_int.get(center_word) 

            if center_word_index is None:
                continue

            context_positions = {}
            
            start_idx = max(0, i - cfg.context_window)
            end_idx = min(token_count, i + 1 + cfg.context_window)

            for j in range(start_idx, end_idx):
                if i == j:
                    continue
                    
                context_word = tokens[j]
                context_word_index = word_to_int.get(context_word)
                
                if context_word_index is not None:
                    position = j - i
                    
                    context_positions.setdefault(context_word_index, []).append(position)
            
            if not context_positions:
                continue
                
            word_emb_vector = np.zeros(shape=(cfg.count_words,))
            
            for word_index, positions_list in context_positions.items():
                avg_position = np.mean(positions_list)
                word_emb_vector[word_index] = avg_position
            
            all_training_pairs.append((word_emb_vector, center_word_index))
            
    return all_training_pairs
t = generate_positional_training_data(["I try and avoid and sort of conflict"], word_to_int)

In [15]:
class CBOWDataset(Dataset):
    def __init__(self, training_data):
        self.data = training_data
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        context, target = self.data[idx]
        return torch.tensor(context, dtype=torch.long), torch.tensor(target, dtype=torch.long)
    
class CBOWModel(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int):
        super(CBOWModel, self).__init__()

        self.embeddings = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_dim,
            padding_idx=0
        )
        self.output_layer = nn.Linear(embedding_dim, vocab_size)

    def forward(self, context_indices: torch.Tensor):
        context_vectors = self.embeddings(context_indices)
        
        avg_context_vector = torch.mean(context_vectors, dim=1)
        logits = self.output_layer(avg_context_vector)
        return logits

def collate_cbow(batch):
    context_list = []
    target_list = []
    
    for context, target in batch:
        context_list.append(context)
        target_list.append(target)

    padded_contexts = pad_sequence(context_list, batch_first=True, padding_value=0)
    
    stacked_targets = torch.stack(target_list)
    
    return padded_contexts, stacked_targets

In [23]:
training_data = generate_cbow_training_data(all_texts, word_to_int)

dataset = CBOWDataset(training_data)

data_loader = DataLoader(
    dataset=dataset,
    batch_size=cfg.batch_size,
    shuffle=True,
    collate_fn=collate_cbow
)

model = CBOWModel(cfg.count_words, cfg.embedding_dim)
optimizer = torch.optim.Adam(model.parameters(), lr=cfg.lr)

model.to(cfg.DEVICE)
model.train()

print("--- Starting Training ---")
for epoch in range(cfg.num_epochs):
    
    total_loss = 0
    
    for context_batch, target_batch in data_loader:
        context_batch = context_batch.to(cfg.DEVICE)
        target_batch = target_batch.to(cfg.DEVICE)

        optimizer.zero_grad()
        logits = model(context_batch)
        loss = cfg.loss_function(logits, target_batch)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f"Epoch {epoch + 1}/{cfg.num_epochs}, Loss: {total_loss / len(data_loader)}")
print("--- Training Complete ---")


--- Starting Training ---
Epoch 1/128, Loss: 7.424872215427421
Epoch 2/128, Loss: 6.728880291415159
Epoch 3/128, Loss: 6.437106705279015
Epoch 4/128, Loss: 6.2266344074805975
Epoch 5/128, Loss: 6.057589807583627
Epoch 6/128, Loss: 5.914442331573535
Epoch 7/128, Loss: 5.7885090966881565
Epoch 8/128, Loss: 5.67693282597365
Epoch 9/128, Loss: 5.57589150845591
Epoch 10/128, Loss: 5.482726123335341
Epoch 11/128, Loss: 5.39740704279116
Epoch 12/128, Loss: 5.317301862658074
Epoch 13/128, Loss: 5.242506398022825
Epoch 14/128, Loss: 5.17200976911456
Epoch 15/128, Loss: 5.104766934785191
Epoch 16/128, Loss: 5.040821721461649
Epoch 17/128, Loss: 4.980457327174479
Epoch 18/128, Loss: 4.922334843749924
Epoch 19/128, Loss: 4.866926189722462
Epoch 20/128, Loss: 4.81305114264767
--- Training Complete ---


In [24]:
model.eval()

word_embeddings = model.embeddings.weight.data.cpu()

print(f"\nShape of our final embedding matrix: {word_embeddings.shape}")
torch.save(word_embeddings, "my_word_embeddings.pt")


Shape of our final embedding matrix: torch.Size([42325, 100])


In [25]:
word_embeddings

tensor([[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
          0.0000e+00,  0.0000e+00],
        [ 1.8574e-01, -3.2374e-02, -7.4229e-01,  ...,  6.1075e-01,
          2.2968e-01,  1.5072e+00],
        [ 2.1158e+00, -1.8626e-01,  1.0257e+00,  ...,  1.5958e+00,
         -1.1144e-01, -6.1884e-01],
        ...,
        [ 6.9596e-01,  3.9360e-02,  1.8186e+00,  ..., -3.1483e-03,
          1.5269e+00,  7.6351e-01],
        [ 3.8152e+00,  1.5229e+00,  1.2885e+00,  ..., -2.0685e-01,
         -7.1479e-01,  1.0372e+00],
        [-8.8064e-01,  8.2757e-01,  6.3314e-01,  ...,  5.4394e-01,
         -1.1798e+00,  7.5773e-01]])

In [4]:
import torch
import torch.nn.functional as F

def get_analogy(word_a: str, word_b: str, word_c: str, embeddings: torch.Tensor, cfg: Config):
    w2i = word_to_int
    i2w = int_to_word
    
    for word in [word_a, word_b, word_c]:
        if word not in w2i:
            print(f"Error: Word '{word}' is not in the vocabulary.")
            return
            
    vec_a = embeddings[w2i[word_a]]
    vec_b = embeddings[w2i[word_b]]
    vec_c = embeddings[w2i[word_c]]
    
    target_vec = vec_a - vec_b + vec_c
    
    all_similarities = F.cosine_similarity(target_vec.unsqueeze(0), embeddings)
    
    for word in [word_a, word_b, word_c]:
        all_similarities[w2i[word]] = -float('inf')
        
    top_5_scores, top_5_indices = torch.topk(all_similarities, 5)
    
    print(f"Analogy: {word_a} - {word_b} + {word_c} = ?\n")
    print("Top 5 results:")
    for i in range(5):
        word = i2w[top_5_indices[i].item()]
        score = top_5_scores[i].item()
        print(f"  {i+1}. {word} (Similarity: {score:.4f})")
        
print("\n--- Analogy Test 1 ---")
get_analogy("king", "man", "woman", word_embeddings, cfg)

print("\n--- Analogy Test 2 ---")
get_analogy("queen", "woman", "man", word_embeddings, cfg)

print("\n--- Analogy Test 3 ---")
get_analogy("image", "images", "cell", word_embeddings, cfg)


--- Analogy Test 1 ---


NameError: name 'word_embeddings' is not defined

In [27]:
final_str_set

{'retirements',
 'observation',
 'uneconomic',
 'committees',
 'collecting',
 'cropping',
 'swelling',
 'zu',
 'guilty',
 'reservoirs',
 'sufferers',
 'ym',
 'quaver',
 'woodshed',
 'hurtling',
 'mandamus',
 'bechhofer',
 'predictably',
 'ziraldo',
 'flamboyant',
 'de',
 'provdied',
 'stern',
 'jilted',
 'revolutionaries',
 'geary',
 'rifled',
 'triumphant',
 'impoundments',
 'sameness',
 'bacteria',
 'rendering',
 'steinkerque',
 'resorcinol',
 'keyboarding',
 'bruises',
 'stubbs',
 'pizzicato',
 'tardiness',
 'hammarskjold',
 'ra',
 'pondering',
 'chum',
 'fantasist',
 'knee',
 'python',
 'dine',
 'diamond',
 'stanchest',
 'troupes',
 'qui',
 'devotional',
 'mastery',
 'patents',
 'domesday',
 'vallee',
 'english',
 'publique',
 'photocathode',
 'affix',
 'imperturbable',
 'aerobic',
 'omnipotence',
 'archaeology',
 'drains',
 'dingo',
 'influx',
 'identifies',
 'nominee',
 'unbroken',
 'manley',
 'ultrasonically',
 'rates',
 'contemplating',
 'clan',
 'noncommittal',
 'doctrinally',