Cell 1 – Imports & basic config

In [15]:
import re
from collections import Counter
from pathlib import Path

import numpy as np
from tqdm import tqdm

import json
import pickle

Cell 2 – Tokenizer & corpus loader

In [16]:
def simple_tokenize(text: str):
    """
    Very basic tokenizer:
    - lowercase
    - keep only alphabetic characters and spaces
    - split on whitespace
    """
    text = text.lower()
    text = re.sub(r"[^a-zA-Z\\s]", " ", text)
    return text.split()


def load_corpus(corpus_path: Path):
    """
    Load corpus as list of token lists (one per line).
    """
    sentences = []
    with corpus_path.open("r", encoding="utf-8", errors="ignore") as f:
        for i, line in enumerate(f):
            line = line.strip()
            if not line:
                continue
            tokens = simple_tokenize(line)
            if tokens:
                sentences.append(tokens)
    print(f"Loaded {len(sentences)} sentences.")
    return sentences


Cell 3 – Word2VecScratch class (batched skip-gram + negative sampling)

In [17]:
class Word2VecScratch:
    def __init__( # default hardcoded values
        self,
        embedding_dim=100,
        window_size=2,          # up to 2 words on each side
        min_count=1,
        negative_samples=5,
        lr=0.025,
        epochs=3,
        batch_size=4,           # mini-batch size
        seed=42,
    ):
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.min_count = min_count
        self.negative_samples = negative_samples
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.rng = np.random.default_rng(seed)

        # will be set in build_vocab
        self.word2idx = {}
        self.idx2word = []
        self.vocab_size = 0

        # will be set in init_weights
        self.W_in = None   # shape (vocab_size, embedding_dim)
        self.W_out = None  # shape (vocab_size, embedding_dim)

        # negative sampling distribution
        self.neg_sampling_probs = None

    # ---------- PREP: VOCAB + TRAINING PAIRS ----------

    def build_vocab(self, sentences):
        """
        Build vocabulary from list of token lists.
        """
        counts = Counter()
        for sent in sentences:
            counts.update(sent)

        # filter by min_count
        filtered = [w for w, c in counts.items() if c >= self.min_count]

        self.idx2word = sorted(filtered)
        self.word2idx = {w: i for i, w in enumerate(self.idx2word)}
        self.vocab_size = len(self.idx2word)
        print(f"Vocab size: {self.vocab_size}")

        # build negative sampling distribution: P(w) is proportional to count(w)^0.75
        freqs = np.array([counts[w] for w in self.idx2word], dtype=np.float64)
        freqs = freqs ** 0.75
        self.neg_sampling_probs = freqs / freqs.sum()

    def sentences_to_indices(self, sentences):
        """
        Map tokens to indices, dropping out-of-vocab words.
        """
        idx_sentences = []
        for sent in sentences:
            idxs = [self.word2idx.get(w) for w in sent if w in self.word2idx]
            if len(idxs) > 1:
                idx_sentences.append(idxs)
        return idx_sentences

    def generate_skipgram_pairs(self, idx_sentences):
        """
        Generate (center, context) index pairs for skip-gram.
        window_size = 2 means up to 2 words on each side of the center word.
        """
        pairs = []
        w = self.window_size
        for sent in idx_sentences:
            n = len(sent)
            for i, center in enumerate(sent):
                start = max(0, i - w)
                end = min(n, i + w + 1)
                for j in range(start, end):
                    if j == i:
                        continue
                    context = sent[j]
                    pairs.append((center, context))
        return pairs

    # ---------- MODEL INIT ----------

    def init_weights(self):
        """
        Initialise input and output embeddings.
        """
        self.W_in = 0.01 * self.rng.standard_normal(
            (self.vocab_size, self.embedding_dim)
        )
        self.W_out = 0.01 * self.rng.standard_normal(
            (self.vocab_size, self.embedding_dim)
        )

    # ---------- NEGATIVE SAMPLING + TRAINING ----------

    def sample_negatives(self, batch_size):
        """
        Sample negative word indices according to the unigram^0.75 distribution.
        Returns shape (batch_size, negative_samples).
        """
        return self.rng.choice(
            self.vocab_size,
            size=batch_size * self.negative_samples,
            replace=True,
            p=self.neg_sampling_probs,
        ).reshape(batch_size, self.negative_samples)

    @staticmethod
    def _sigmoid(x):
        return 1 / (1 + np.exp(-x))

    def train(self, sentences):
        """
        High-level training:
        - build vocab
        - convert sentences to indices
        - generate skip-gram pairs
        - train using negative sampling with mini-batches
        """
        print("=" * 60)
        print("Starting Word2Vec Training")
        print("=" * 60)
        
        print("\n[Step 1/4] Building vocabulary...")
        self.build_vocab(sentences)
        
        print("\n[Step 2/4] Converting sentences to indices...")
        idx_sentences = self.sentences_to_indices(sentences)
        
        print("\n[Step 3/4] Generating skip-gram pairs...")
        pairs = self.generate_skipgram_pairs(idx_sentences)
        print(f"Generated {len(pairs):,} training pairs")

        print("\n[Step 4/4] Initializing weights...")
        self.init_weights()

        pairs = np.array(pairs, dtype=np.int64)
        n_pairs = len(pairs)
        n_batches = (n_pairs + self.batch_size - 1) // self.batch_size

        print(f"\n{'=' * 60}")
        print(f"Training Configuration:")
        print(f"  - Epochs: {self.epochs}")
        print(f"  - Batch size: {self.batch_size}")
        print(f"  - Total batches per epoch: {n_batches:,}")
        print(f"  - Learning rate: {self.lr}")
        print(f"{'=' * 60}\n")

        for epoch in range(1, self.epochs + 1):
            print(f"\nEpoch {epoch}/{self.epochs}")
            self.rng.shuffle(pairs)
            total_loss = 0.0

            # Progress bar for batches
            batch_range = range(0, n_pairs, self.batch_size)
            with tqdm(batch_range, desc=f"  Epoch {epoch}", unit="batch", 
                     bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar:
                for start in pbar:
                    batch = pairs[start:start + self.batch_size]
                    centers = batch[:, 0]
                    contexts = batch[:, 1]
                    batch_loss = self.train_batch(centers, contexts)
                    total_loss += batch_loss * len(batch)
                    
                    # Update progress bar with current loss
                    current_avg_loss = total_loss / (start + len(batch))
                    pbar.set_postfix({'loss': f'{current_avg_loss:.4f}'})

            avg_loss = total_loss / n_pairs
            print(f"  Completed - Average loss: {avg_loss:.4f}")

        print(f"\n{'=' * 60}")
        print("Training completed!")
        print(f"{'=' * 60}")

    def train_batch(self, center_idxs, context_idxs):
        """
        Train on a batch of (center, context) pairs using negative sampling.
        """
        B = center_idxs.shape[0]

        v_t = self.W_in[center_idxs]       # (B, D)
        u_c = self.W_out[context_idxs]     # (B, D)

        # positive
        score_pos = np.sum(u_c * v_t, axis=1)      # (B,)
        sig_pos = self._sigmoid(score_pos)         # (B,)
        loss_pos = -np.log(sig_pos + 1e-10)        # (B,)

        # negatives
        neg_idxs = self.sample_negatives(B)        # (B, K)
        u_negs = self.W_out[neg_idxs]              # (B, K, D)
        scores_neg = np.einsum("bkd,bd->bk", u_negs, v_t)  # (B, K)
        sig_negs = self._sigmoid(-scores_neg)
        loss_neg = -np.sum(np.log(sig_negs + 1e-10), axis=1)  # (B,)

        loss = np.mean(loss_pos + loss_neg)

        # gradients
        grad_pos = (1 - sig_pos)                   # (B,)
        grad_u_c = grad_pos[:, None] * v_t         # (B, D)
        grad_v_pos = grad_pos[:, None] * u_c       # (B, D)

        sig_scores_neg = self._sigmoid(scores_neg) # σ(x)
        grad_negs = -sig_scores_neg                # (B, K)
        grad_u_negs = grad_negs[..., None] * v_t[:, None, :]  # (B, K, D)
        grad_v_neg = np.sum(grad_negs[..., None] * u_negs, axis=1)  # (B, D)

        grad_v = grad_v_pos + grad_v_neg           # (B, D)

        # scatter-add updates
        np.add.at(self.W_out, context_idxs, self.lr * grad_u_c)

        neg_flat = neg_idxs.reshape(-1)
        grad_u_negs_flat = grad_u_negs.reshape(-1, self.embedding_dim)
        np.add.at(self.W_out, neg_flat, self.lr * grad_u_negs_flat)

        np.add.at(self.W_in, center_idxs, self.lr * grad_v)

        return loss

    # ---------- UTILITIES ----------

    def get_vector(self, word):
        idx = self.word2idx.get(word)
        if idx is None:
            raise KeyError(f"Word '{word}' not in vocabulary.")
        return self.W_in[idx]

    def most_similar(self, word, topn=5):
        if word not in self.word2idx:
            raise KeyError(f"Word '{word}' not in vocabulary.")
        idx = self.word2idx[word]
        v = self.W_in[idx]

        norms = np.linalg.norm(self.W_in, axis=1) + 1e-10
        sim = (self.W_in @ v) / norms / (np.linalg.norm(v) + 1e-10)

        best = np.argsort(-sim)
        result = []
        for i in best:
            if i == idx:
                continue
            result.append((self.idx2word[i], float(sim[i])))
            if len(result) >= topn:
                break
        return result


Cell 4 – Parameters & load mini wiki corpus

In [20]:
# Parameters for the vanilla wiki model
corpus_path = Path("data/AllCombined.txt")

embedding_dim = 100
window_size = 2      # up to 2 words on each side
min_count = 5        # ignore very rare words
negative_samples = 5
learning_rate = 0.025
epochs = 3
batch_size = 4000

# Load corpus
sentences = load_corpus(corpus_path)


Loaded 965518 sentences.


Cell 5 – Train the vanilla Word2Vec model

In [21]:
vanilla_model = Word2VecScratch(
    embedding_dim=embedding_dim,
    window_size=window_size,
    min_count=min_count,
    negative_samples=negative_samples,
    lr=learning_rate,
    epochs=epochs,
    batch_size=batch_size,
)

vanilla_model.train(sentences)

Starting Word2Vec Training

[Step 1/4] Building vocabulary...
Vocab size: 112970

[Step 2/4] Converting sentences to indices...

[Step 3/4] Generating skip-gram pairs...
Generated 108,994,394 training pairs

[Step 4/4] Initializing weights...

Training Configuration:
  - Epochs: 3
  - Batch size: 4000
  - Total batches per epoch: 27,249
  - Learning rate: 0.025


Epoch 1/3


  return 1 / (1 + np.exp(-x))
  Epoch 1: 100%|██████████| 27249/27249 [17:16<00:00, 26.30batch/s]


  Completed - Average loss: 4.7843

Epoch 2/3


  Epoch 2: 100%|██████████| 27249/27249 [17:28<00:00, 25.98batch/s]


  Completed - Average loss: 4.8686

Epoch 3/3


  Epoch 3: 100%|██████████| 27249/27249 [17:17<00:00, 26.25batch/s]

  Completed - Average loss: 4.8779

Training completed!





Cell 6 – Quick sanity check: similar words

In [22]:
test_word = "dog"

if test_word in vanilla_model.word2idx:
    print(f"Most similar to '{test_word}':")
    for w, score in vanilla_model.most_similar(test_word, topn=10):
        print(f"{w:<20} {score:.4f}")
else:
    print(f"'{test_word}' not in vocabulary.")


Most similar to 'dog':
fruit                0.6044
cat                  0.5902
bird                 0.5856
bear                 0.5587
fish                 0.5574
seeds                0.5343
tiger                0.5296
trees                0.5275
leaves               0.5254
goat                 0.5100


Cell 7 – Export the vanilla Word2Vec model

In [23]:
# Create models/ directory if it doesn't exist
export_dir = Path("models")
export_dir.mkdir(exist_ok=True)

base_name = "vanilla_w2v_model"

# 1) Save embeddings matrix
embeddings_path = export_dir / f"{base_name}.npy"
np.save(embeddings_path, vanilla_model.W_in)
print(f"Saved embeddings to {embeddings_path}")

# 2) Save vocabulary list (idx → word)
vocab_txt_path = export_dir / f"{base_name}_vocab.txt"
with vocab_txt_path.open("w", encoding="utf-8") as f:
    for w in vanilla_model.idx2word:
        f.write(w + "\n")
print(f"Saved vocabulary to {vocab_txt_path}")

# 3) Save word2idx mapping (word → index)
vocab_json_path = export_dir / f"{base_name}_word2idx.json"
with vocab_json_path.open("w", encoding="utf-8") as f:
    json.dump(vanilla_model.word2idx, f, ensure_ascii=False)
print(f"Saved word2idx to {vocab_json_path}")

# 4) Save full model (pickle)
model_pkl_path = export_dir / f"{base_name}.pkl"
with model_pkl_path.open("wb") as f:
    pickle.dump(vanilla_model, f)
print(f"Saved full model to {model_pkl_path}")



Saved embeddings to models\vanilla_w2v_model.npy
Saved vocabulary to models\vanilla_w2v_model_vocab.txt
Saved word2idx to models\vanilla_w2v_model_word2idx.json
Saved full model to models\vanilla_w2v_model.pkl
