In [None]:
import numpy as np
import string
import nltk
import random
import requests
import re
from nltk.corpus import stopwords
from typing import Union, List, Optional

# Text Download and Preparation

In [None]:
url = "https://www.gutenberg.org/files/84/84-0.txt"
text_from_url = requests.get(url).text

def clean_gutenberg(text: str) -> str:
    start_marker = "*** START OF THE PROJECT GUTENBERG EBOOK 84 ***"
    end_marker = "*** END OF THE PROJECT GUTENBERG EBOOK 84 ***"
    start = text.find(start_marker)
    end = text.find(end_marker)
    text = re.sub(r'[^\w\s]', '', text)

    if start != -1 and end != -1 and start < end:
        return text[start + len(start_marker) : end].lower().split()
    else:
        print("Warning: Gutenberg markers not found or in wrong order. Using full text.")
        return text.lower().split()


text = clean_gutenberg(text_from_url)
print(text)

# Data preparation:

*   Tokenizer class
*   Test data prep
*   Sigmoid


In [None]:
class Tokenizer:
    """
    Simple tokenizer that maps words to integer tokens and vice versa,
    and computes global word frequencies for subsampling and negative sampling.
    """

    def __init__(self, text: list[str]) -> None:
        """
        Args:
            text: Corpus as a single string.
        """
        self.token_to_id = {"<PAD>": 0, "<UNK>": 1}
        self.id_to_token = {0: "<PAD>", 1: "<UNK>"}

        # Lowercase and split text
        n = 2  # start from 2 because 0 and 1 are reserved

        for word in text:
            if word not in self.token_to_id:
                self.token_to_id[word] = n
                self.id_to_token[n] = word
                n += 1

        self.vocab_size = n
        self.vocab = self.token_to_id.keys()
        self.tokens = self.tokenize(text)

        # Compute global frequencies
        self.frequencies = np.zeros(self.vocab_size, dtype=np.float32)
        np.add.at(self.frequencies, self.tokens, 1)
        self.frequencies /= np.sum(self.frequencies)

    def tokenize(self, words: Union[str, List[str]]) -> np.ndarray:
        """
        Convert word(s) to integer token(s). Unknown words -> <UNK>.
        """
        single = isinstance(words, str)

        if single:
            words = [words]

        tokens = np.array(
            [self.token_to_id.get(word, 1) for word in words],
            dtype=np.int32
        )

        return tokens[0] if single else tokens

    def detokenize(self, tokens: Union[int, List[int]]) -> Union[str, List[Optional[str]]]:
        """
        Convert token(s) back to word(s).
        """
        # Check if the input is a single integer or numpy integer
        single = isinstance(tokens, int) or isinstance(tokens, np.integer)

        if single:
            tokens = [tokens]

        # Explicitly cast each token to a Python int before dictionary lookup
        words = [self.id_to_token.get(int(token), "<UNK>") for token in tokens]

        return words[0] if single else words


# Skip-gram Dataset: Core Optimizations

The efficiency of Word2Vec relies on three primary data-processing techniques: **Subsampling**, **Dynamic Windows**, and **Negative Sampling Tables**.

---

## 1. Subsampling of Frequent Words
To prevent frequent words (stop words) from over-representing in the training data, we drop tokens based on their frequency $f(w_i)$ and a threshold $t$.

**Drop Probability Formula:**
$$P(\text{drop } w_i) = 1 - \sqrt{\frac{t}{f(w_i)}}$$

* **Threshold $t$:** Usually set around $10^{-5}$.
* **Result:** Rare words are kept, while words like "the" are discarded frequently, allowing the model to focus on meaningful semantic relationships.

---

## 2. Dynamic Context Window
Instead of a fixed window size, we sample a random window for every center word encounter.

**Logic:**
For each center word $w_c$, pick a random integer $R$:
$$1 \le R \le \text{max_window_size}$$

* **Impact:** This ensures that words immediately adjacent to the center word are sampled more often than words at the periphery, naturally weighting local context higher.

---

## 3. Negative Sampling Table (Unigram Distribution)
To avoid the $O(|V|)$ cost of sampling from the entire vocabulary during the training loop, we pre-calculate a large lookup table.

**Sampling Probability:**
$$P(w) = \frac{f(w)^{0.75}}{\sum_{v \in V} f(v)^{0.75}}$$



### Why the $0.75$ Power?
* If $P(\text{"the"}) = 0.9$ and $P(\text{"zebra"}) = 0.0001$:
* After the $0.75$ power, the gap shrinks.
* **Result:** The model samples rare words slightly more often than their raw frequency would suggest, making the negative classification task more robust.

---

## 4. Implementation Summary Table

| Technique | Goal | Key Hyperparameter |
| :--- | :--- | :--- |
| **Subsampling** | Reduce redundancy of stop words | `subsample_threshold` ($t$) |
| **Dynamic Window** | Weight local context higher | `max_window_size` |
| **Negative Sampling** | Turn Softmax into Binary Class. | `num_negatives` ($K$) |
| **Lookup Table** | $O(1)$ sampling efficiency | `neg_table_size` |

In [None]:
class SkipGramDataset:
    """
    Skip-gram dataset with subsampling, dynamic context window, and negative sampling.
    """

    def __init__(
        self,
        tokens: np.ndarray,
        tokenizer: Tokenizer,
        subsample_threshold: float = 1e-5,
        max_window_size: int = 5,
        num_negatives: int = 5,
    ) -> None:
        """
        Args:
            tokens: Tokenized corpus as numpy array.
            tokenizer: Tokenizer object with frequencies.
            subsample_threshold: Threshold t for subsampling frequent words.
            max_window_size: Maximum size of dynamic context window.
            num_negatives: Number of negative samples per positive pair.
        """
        self.tokens = tokens
        self.tokenizer = tokenizer
        self.subsample_threshold = subsample_threshold
        self.max_window_size = max_window_size
        self.num_negatives = num_negatives
        self.vocab_size = tokenizer.vocab_size

        # Subsampling drop probabilities (use global frequencies)
        freqs = np.clip(tokenizer.frequencies, 1e-10, None)
        self.drop_probs = 1 - np.sqrt(subsample_threshold / freqs)

        # Precompute negative sampling table (unigram^0.75)
        prob = tokenizer.frequencies ** 0.75
        prob /= prob.sum()
        self.neg_table = prob

        # Step 1: subsample tokens
        self.filtered_tokens = [
            token for token in tokens if random.random() >= self.drop_probs[token]
        ]

        # Step 2: generate (center, context, negatives)
        (
            self.centers,
            self.contexts,
            self.negatives,
        ) = self._generate_examples()

    def _generate_examples(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Generate (center, context, negatives) for Skip-gram training.
        """
        centers = []
        contexts = []
        negatives = []
        n = len(self.filtered_tokens)

        for i, center in enumerate(self.filtered_tokens):
            window_size = np.random.randint(1, self.max_window_size + 1)
            start = max(0, i - window_size)
            end = min(n, i + window_size + 1)
            for j in range(start, end):
                if j != i:
                    centers.append(center)
                    contexts.append(self.filtered_tokens[j])

                    # Normal distribution
                    neg_sample = np.random.choice(
                        self.vocab_size, size=self.num_negatives, replace=False, p = self.neg_table
                    )
                    negatives.append(neg_sample)

        return (
            np.array(centers, dtype=np.int32),
            np.array(contexts, dtype=np.int32),
            np.array(negatives, dtype=np.int32),
        )

    def get_training_batches(
        self, batch_size: int
    ) -> list[tuple[np.ndarray, np.ndarray, np.ndarray]]:
        """
        Shuffle the dataset and split into batches.

        Returns:
            List of tuples: (batch_centers, batch_contexts, batch_negatives)
        """
        indices = np.arange(len(self.centers))
        np.random.shuffle(indices)

        centers_shuffled = self.centers[indices]
        contexts_shuffled = self.contexts[indices]
        negatives_shuffled = self.negatives[indices]

        batches = []
        N = len(centers_shuffled)

        for i in range(0, N, batch_size):
            batch_centers = centers_shuffled[i : i + batch_size]
            batch_contexts = contexts_shuffled[i : i + batch_size]
            batch_negatives = negatives_shuffled[i : i + batch_size]
            batches.append((batch_centers, batch_contexts, batch_negatives))

        return batches


# Word2Vec implementation

### Embedding layer implementation

In [None]:
import numpy as np
from typing import Optional, Union, Sequence


class EmbeddingLayer:
    """
    An embedding layer with support for sparse gradient accumulation.
    """

    def __init__(self, vocab_size: int, embedding_size: int) -> None:
        # Small random init (DO NOT normalize)
        self.weights = np.random.randn(vocab_size, embedding_size) * 0.01
        self.grad_accumulator = None
        self.indices = None
        self.vocab_size = vocab_size
        self.embedding_size = embedding_size

    def merge_duplicate_indices(self) -> None:
        """Combine gradients for repeated indices."""
        if self.indices is None:
            return

        unique_indices, inverse = np.unique(self.indices, return_inverse=True)
        out = np.zeros((len(unique_indices), self.embedding_size), dtype=self.weights.dtype)
        np.add.at(out, inverse, self.grad_accumulator)

        self.indices = unique_indices
        self.grad_accumulator = out

    def accumulate_gradients(self, grad: np.ndarray, indices: np.ndarray) -> None:
        """Accumulate gradients for given indices."""
        grad = np.asarray(grad, dtype=self.weights.dtype).reshape(-1, self.embedding_size)
        indices = np.asarray(indices, dtype=np.int64).reshape(-1)

        if self.grad_accumulator is None:
            self.grad_accumulator = grad
            self.indices = indices
        else:
            self.grad_accumulator = np.concatenate([self.grad_accumulator, grad], axis=0)
            self.indices = np.concatenate([self.indices, indices], axis=0)

        self.merge_duplicate_indices()

    def apply_gradients(self, learning_rate: float = 1.0) -> None:
        """Apply accumulated gradients using gradient descent."""
        if self.grad_accumulator is None:
            return

        # 1. Gradient clipping (prevents explosion)
        np.clip(self.grad_accumulator, -5.0, 5.0, out=self.grad_accumulator)

        # 2. Gradient DESCENT (minus sign!)
        np.add.at(
            self.weights,
            self.indices,
            -learning_rate * self.grad_accumulator
        )

        self.zero_grad()

    def zero_grad(self) -> None:
        """Clear accumulated gradients."""
        self.grad_accumulator = None
        self.indices = None


# Skip-gram with Negative Sampling (SGNS)

The **Skip-gram model** learns word embeddings by using a **center word** to predict its **context words**.

### Key Notation
* $v_w$: **Input embedding** (when word $w$ is the center word).
* $u_w$: **Output embedding** (when word $w$ is the context word).

---

## 1. The Objective Function

For a positive center-context pair $(w_c, w_o)$ and $K$ negative samples $w_i^-$:

$$\mathcal{L} = \log \sigma(u_{w_o}^\top v_{w_c}) + \sum_{i=1}^K \log \sigma(-u_{w_i^-}^\top v_{w_c})$$

Where $\sigma(x) = \frac{1}{1 + e^{-x}}$ is the sigmoid function.

---

## 2. Gradient Derivations

To update the vectors using SGD, we use the following gradients:

### Gradient w.r.t. Center Vector ($v_{w_c}$)
$$\frac{\partial \mathcal{L}}{\partial v_{w_c}} = [1 - \sigma(u_{w_o}^\top v_{w_c})]u_{w_o} - \sum_{i=1}^K [1 - \sigma(-u_{w_i^-}^\top v_{w_c})]u_{w_i^-}$$

### Gradient w.r.t. Positive Context Vector ($u_{w_o}$)
$$\frac{\partial \mathcal{L}}{\partial u_{w_o}} = [1 - \sigma(u_{w_o}^\top v_{w_c})]v_{w_c}$$

### Gradient w.r.t. Negative Context Vector ($u_{w_i^-}$)
$$\frac{\partial \mathcal{L}}{\partial u_{w_i^-}} = -[1 - \sigma(-u_{w_i^-}^\top v_{w_c})]v_{w_c}$$

---

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [None]:
class SkipGram:
    """
    Skip-gram model for learning word embeddings using negative sampling.

    Attributes:
        vocab_size (int): Size of the vocabulary.
        embedding_size (int): Dimensionality of the embeddings.
        input_embedding (EmbeddingLayer): Embedding layer for center words.
        output_embedding (EmbeddingLayer): Embedding layer for context/negative words.
        calculate_gradient (bool): Whether to compute gradients during forward pass.
    """

    def __init__(self, vocab_size: int, embedding_size: int):
        """
        Initialize the SkipGram model with input and output embeddings.

        Args:
            vocab_size (int): Number of unique tokens in the vocabulary.
            embedding_size (int): Dimensionality of the embedding vectors.
        """
        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.input_embedding = EmbeddingLayer(vocab_size=vocab_size, embedding_size=embedding_size)
        self.output_embedding = EmbeddingLayer(vocab_size=vocab_size, embedding_size=embedding_size)
        self.calculate_gradient = True

    def forward(self, centers, contexts, negatives):
        """
        Compute the forward pass of the Skip-gram model with negative sampling.

        Args:
            centers (np.ndarray): Array of center word indices, shape (batch_size,).
            contexts (np.ndarray): Array of positive context word indices, shape (batch_size,).
            negatives (np.ndarray): Array of negative sample indices, shape (batch_size, neg_size).

        Returns:
            float: The average negative log-likelihood loss for the batch.

        Notes:
            If `calculate_gradient` is True, this function also computes and accumulates
            gradients for both input and output embeddings.
        """
        batch_size = centers.shape[0]
        neg_size = negatives.shape[1]

        # Look up embeddings
        v_c = self.input_embedding.weights[centers]        # Center embeddings (batch_size, embedding_size)
        u_w = self.output_embedding.weights[contexts]     # Positive context embeddings (batch_size, embedding_size)
        u_n = self.output_embedding.weights[negatives]    # Negative sample embeddings (batch_size, neg_size, embedding_size)

        # Positive score: dot product of center and context embeddings
        pos_dot = np.sum(v_c * u_w, axis=1, keepdims=True)
        pos_prob = sigmoid(pos_dot)

        # Negative score: dot product of center and negative embeddings
        neg_dot = np.sum(u_n * v_c[:, np.newaxis, :], axis=2)
        neg_prob = sigmoid(-neg_dot)

        # Compute loss: Negative Log-Likelihood
        loss = -np.mean(np.log(pos_prob + 1e-9) + np.sum(np.log(neg_prob + 1e-9), axis=1, keepdims=True))

        if self.calculate_gradient:
            # Gradients w.r.t positive and negative dot products
            grad_pos = pos_prob - 1
            grad_neg = 1 - neg_prob

            # Gradients for embeddings
            v_c_grad = (grad_pos * u_w) + np.sum(grad_neg[:, :, np.newaxis] * u_n, axis=1)
            u_w_grad = grad_pos * v_c
            u_n_grad = grad_neg[:, :, np.newaxis] * v_c[:, np.newaxis, :]

            # Accumulate gradients in embedding layers
            self.input_embedding.accumulate_gradients(v_c_grad / batch_size, centers)
            self.output_embedding.accumulate_gradients(u_w_grad / batch_size, contexts)
            self.output_embedding.accumulate_gradients(u_n_grad / batch_size, negatives)

        return loss

    def backward(self, learning_rate: float = 1.0):
        """
        Apply accumulated gradients to update embedding weights.

        Args:
            learning_rate (float, optional): Learning rate for gradient updates. Defaults to 1.0.
        """
        self.input_embedding.apply_gradients(learning_rate=learning_rate)
        self.output_embedding.apply_gradients(learning_rate=learning_rate)

    def zero_grad(self):
        self.input_embedding.zero_grad()
        self.output_embedding.zero_grad()


# Training

In [None]:
def train_epoch(epoch, dataset, learning_rate, model, verbose):
    """
    Train the Skip-gram model for one epoch on the provided dataset.

    Args:
        epoch (int): Current epoch index (0-based).
        dataset (tuple): A tuple containing three lists/arrays:
            - batches_centres: List/array of center word batches.
            - batches_contexts: List/array of positive context word batches.
            - batches_negatives: List/array of negative sample batches.
        learning_rate (float): Learning rate for updating embeddings.
        model (SkipGram): The Skip-gram model instance to train.
        verbose (bool): Whether to print progress and loss information.

    Returns:
        float: The average loss over all batches for this epoch.

    """
    total_loss = 0
    num_batches = len(dataset)

    model.zero_grad()

    for batch in dataset:
        centre, context, negatives = batch
        loss = model.forward(centre, context, negatives)
        total_loss += loss
        model.backward(learning_rate=learning_rate)

    avg_loss = total_loss / num_batches

    if verbose:
        print(f"Epoch: {epoch + 1}, Loss: {avg_loss:.4f}")

    return avg_loss


def training_loop(num_epochs, dataset, learning_rate, model):
    """
    Train the Skip-gram model over multiple epochs.

    Args:
        num_epochs (int): Number of epochs to train.
        dataset (tuple): Dataset tuple containing batches of centers, contexts, and negatives.
        learning_rate (float): Learning rate for embedding updates.
        model (SkipGram): The Skip-gram model instance to train.

    Notes:
        - Enables gradient calculation in the model.
        - Calls `train_epoch` for each epoch.
        - Prints progress every 5 epochs and the first epoch.
        - Does not return a value; training updates are applied directly to the model.
    """
    model.calculate_gradient = True

    for epoch in range(num_epochs):
        verbose = (epoch + 1) % 5 == 0 or epoch == 0

        train_epoch(
            epoch=epoch,
            dataset=dataset,
            learning_rate=learning_rate,
            model=model,
            verbose=verbose
        )

In [None]:
sample_text = text[:10000]
tokenizer = Tokenizer(sample_text)
tokens = tokenizer.tokenize(sample_text)

In [None]:
skip_gram_dataset = SkipGramDataset(tokens = tokens, tokenizer=tokenizer, num_negatives=20, max_window_size=10)
dataset = skip_gram_dataset.get_training_batches(batch_size=20)

In [None]:
vocab_size = tokenizer.vocab_size
embedding_size = 50

skpGram = SkipGram(vocab_size=vocab_size, embedding_size= embedding_size)

training_loop(
    num_epochs=50,
    dataset=dataset,
    learning_rate=0.1,
    model=skpGram
)

In [None]:
def get_similar_words(word, embedding_matrix, tokenizer, k=5):
    """
    word_idx: The index of the word you want to test
    embedding_matrix: Your trained self.input_embedding.weight
    vocab_dict: A dictionary mapping {id: "word"} to print results
    """
    word_idx = tokenizer.tokenize(word)

    # 1. Normalize all vectors to unit length (L2 norm)
    # This turns dot product into cosine similarity
    norm = np.linalg.norm(embedding_matrix, axis=1, keepdims=True)
    normalized_embeds = embedding_matrix / (norm + 1e-9)

    # 2. Get the specific vector for our word
    query_vector = normalized_embeds[word_idx]

    # 3. Calculate dot product with ALL other words
    # (embedding_size,) @ (vocab_size, embedding_size).T -> (vocab_size,)
    scores = normalized_embeds @ query_vector

    # 4. Get indices of top K scores (excluding the word itself at the very top)
    # argsort sorts ascending, so we take the last k+1 and reverse them
    nearest_idx = np.argsort(scores)[-(k+1):-1][::-1]

    print(f"Words most similar to '{word}':")
    for idx in nearest_idx:
        print(f"  - {tokenizer.detokenize(idx)} (score: {scores[idx]:.4f})")

In [None]:
print(tokenizer.vocab)


In [None]:
print(tokenizer.vocab_size)

In [None]:
get_similar_words('summer', embedding_matrix = skpGram.input_embedding.weights, tokenizer=tokenizer, k = 10)