In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import re
from collections import Counter
import pandas as pd
from pathlib import Path
import numpy as np

In [3]:
def load_tokens(path):
    path = Path(path).expanduser()  # Expand ~
    with path.open('r', encoding='utf-8') as f:
        text = f.read()
    return text.strip().split()

words = load_tokens("w2v/text8")
print(f"Total tokens: {len(words)}")

vocab = Counter(words)
word_to_idx = {word: idx for idx, (word, _) in enumerate(vocab.items())}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}
vocab_size = len(word_to_idx)

# Convert corpus to list of indices
corpus = [word_to_idx[w] for w in words]

Total tokens: 17005207


In [3]:
def generate_skipgram_pairs(corpus, window_size=5):
    pairs = []
    for center_pos in range(len(corpus)):
        window = random.randint(1, window_size)
        for w in range(-window, window + 1):
            context_pos = center_pos + w
            if context_pos < 0 or context_pos >= len(corpus) or center_pos == context_pos:
                continue
            pairs.append((corpus[center_pos], corpus[context_pos]))
    return pairs

word_freq = np.array([vocab[idx_to_word[i]] for i in range(vocab_size)])
unigram_dist = word_freq ** 0.75
unigram_dist /= unigram_dist.sum()

In [4]:
class Word2Vec(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super().__init__()
        self.in_embed = nn.Embedding(vocab_size, embed_dim)
        self.out_embed = nn.Embedding(vocab_size, embed_dim)

    def forward(self, center, context, negatives):
        center_embed = self.in_embed(center)         # [B, D]
        context_embed = self.out_embed(context)      # [B, D]
        neg_embed = self.out_embed(negatives)        # [B, K, D]

        pos_score = torch.sum(center_embed * context_embed, dim=1)
        pos_loss = torch.log(torch.sigmoid(pos_score))

        neg_score = torch.bmm(neg_embed, center_embed.unsqueeze(2)).squeeze()
        neg_loss = torch.sum(torch.log(torch.sigmoid(-neg_score)), dim=1)

        return -torch.mean(pos_loss + neg_loss)

from torch.utils.data import Dataset, DataLoader

class Word2VecDataset(Dataset):
    def __init__(self, pairs, unigram_dist, neg_sample_count):
        self.pairs = pairs
        self.unigram_dist = unigram_dist
        self.neg_sample_count = neg_sample_count

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        center, context = self.pairs[idx]
        negatives = np.random.choice(
            len(self.unigram_dist), size=self.neg_sample_count, p=self.unigram_dist
        )
        return (
            torch.tensor(center, dtype=torch.long),
            torch.tensor(context, dtype=torch.long),
            torch.tensor(negatives, dtype=torch.long)
        )

In [5]:
embed_dim = 300           # Higher dimensionality for richer semantic space
window_size = 5           # Standard context size; 5 is a good default
neg_sample_count = 10     # 5–20 is typical; 10 is a balanced choice
batch_size = 1024         # Larger batch size for GPU acceleration (adjust if on CPU)
epochs = 10               # For meaningful learning without overfitting

pairs = generate_skipgram_pairs(corpus, window_size)
dataset = Word2VecDataset(pairs, unigram_dist, neg_sample_count)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

model = Word2Vec(vocab_size, embed_dim)
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)

test_words = ['king', 'money', 'computer', '.', '$', 'fitness']  # choose words relevant to your domain

In [None]:
def get_similar_words(target_word, model, word_to_idx, idx_to_word, top_n=5):
    if target_word not in word_to_idx:
        return f"'{target_word}' not in vocabulary."

    with torch.no_grad():
        embed_weights = model.in_embed.weight
        norm_embed = F.normalize(embed_weights, dim=1)  # Normalize for cosine similarity

        target_idx = word_to_idx[target_word]
        target_vec = norm_embed[target_idx]  # [embed_dim]

        similarity = torch.matmul(norm_embed, target_vec)
        topk = torch.topk(similarity, top_n + 1)
        similar_idxs = topk.indices.tolist()

        similar_words = [idx_to_word[i] for i in similar_idxs if i != target_idx][:top_n]
        return similar_words


In [None]:
for batch in dataloader:
    print(f"Batch: {batch}")
    break

In [None]:
for epoch in range(epochs):
    total_loss = 0
    for center, context, negatives in dataloader:
        loss = model(center, context, negatives)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}")

    # NEW: Validation by finding similar words
    print("Sample similar words:")
    for word in test_words:
        similar = get_similar_words(word, model, word_to_idx, idx_to_word, top_n=5)
        print(f"  {word} → {similar}")

In [None]:
# Get numpy array of embeddings
embeddings = model.in_embed.weight.data.cpu().numpy()

# Save to disk if needed
np.save("word2vec_embeddings.npy", embeddings)
