In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import Counter
import re

# Word2Vec learns to represent words as dense vectors (embeddings), predict
# which words appear near each other in text
# skip-gram, fpr each word pair (center word, context word) that appears
# within a window in the corpus, postive examples (words that actually appear
# together, like "cat", "meow") . maxmize the probability that the center word
# predicts the context word
# negative sampling, instead of computing over all words, sample a few negative
# examples (words that don't appear in context). Make dot products large for
# actual context words, make dot product small for random words
class Word2Vec(nn.Module):
    """Word2Vec implementation with Skip-gram and CBOW architectures"""

    def __init__(self, vocab_size, embedding_dim, model_type='skipgram'):
        super(Word2Vec, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.model_type = model_type

        # Input embeddings (center word)
        # nn.Embedding create a matrix of shape [vocab_size, embedding_dim]
        # given a word index, simply retrieves row i
        self.in_embed = nn.Embedding(vocab_size, embedding_dim)
        # Output embeddings (context words), differentiable
        # after training, only in_embed is kept as final word embeddings
        self.out_embed = nn.Embedding(vocab_size, embedding_dim)

        # Initialize embeddings
        self.in_embed.weight.data.uniform_(-0.5/embedding_dim, 0.5/embedding_dim)
        self.out_embed.weight.data.uniform_(-0.5/embedding_dim, 0.5/embedding_dim)

    def forward(self, center, context, neg_samples):
        """
        Args:
            center: center word indices [batch_size]
            context: context word indices [batch_size]
            neg_samples: negative sample indices [batch_size, num_neg_samples]
        """
        # Get embeddings
        center_embed = self.in_embed(center)  # [batch_size, embed_dim]
        context_embed = self.out_embed(context)  # [batch_size, embed_dim]
        neg_embed = self.out_embed(neg_samples)  # [batch_size, num_neg, embed_dim]

        # Positive score
        pos_score = torch.sum(center_embed * context_embed, dim=1)  # [batch_size]
        pos_score = torch.log(torch.sigmoid(pos_score) + 1e-10)

        # Negative scores
        neg_score = torch.bmm(neg_embed, center_embed.unsqueeze(2)).squeeze(2)  # [batch_size, num_neg]
        neg_score = torch.sum(torch.log(torch.sigmoid(-neg_score) + 1e-10), dim=1)  # [batch_size]

        # Negative sampling loss
        loss = -(pos_score + neg_score).mean()

        return loss


class Word2VecTrainer:
    """Trainer for Word2Vec model"""

    def __init__(self, sentences, embedding_dim=100, window_size=5,
                 min_count=5, num_neg_samples=5, model_type='skipgram'):
        self.sentences = sentences
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.min_count = min_count
        self.num_neg_samples = num_neg_samples
        self.model_type = model_type

        # Build vocabulary
        self.build_vocab()

        # Initialize model
        self.model = Word2Vec(len(self.word2idx), embedding_dim, model_type)

        # Prepare negative sampling table
        self.prepare_neg_sampling()

    def build_vocab(self):
        """Build vocabulary from sentences"""
        word_counts = Counter()
        for sentence in self.sentences:
            word_counts.update(sentence.lower().split())

        # Filter by min_count
        self.vocab = [word for word, count in word_counts.items()
                      if count >= self.min_count]

        # Create mappings
        self.word2idx = {word: idx for idx, word in enumerate(self.vocab)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}

        # Store word frequencies for negative sampling
        self.word_freq = np.array([word_counts[word] for word in self.vocab])

        print(f"Vocabulary size: {len(self.vocab)}")

    def prepare_neg_sampling(self):
        """Prepare negative sampling distribution (raised to 3/4 power)"""
        freq = self.word_freq ** 0.75
        self.neg_sampling_probs = freq / freq.sum()
        self.neg_sampling_table = np.random.choice(
            len(self.vocab),
            size=1000000,
            p=self.neg_sampling_probs
        )
        self.neg_sample_idx = 0

    def get_negative_samples(self, batch_size):
        """Sample negative examples"""
        samples = []
        for _ in range(batch_size):
            neg = []
            for _ in range(self.num_neg_samples):
                neg.append(self.neg_sampling_table[self.neg_sample_idx])
                self.neg_sample_idx = (self.neg_sample_idx + 1) % len(self.neg_sampling_table)
            samples.append(neg)
        return torch.LongTensor(samples)

    def generate_training_data(self):
        """Generate training pairs for Skip-gram"""
        pairs = []

        for sentence in self.sentences:
            words = sentence.lower().split()
            indices = [self.word2idx[w] for w in words if w in self.word2idx]

            for i, center in enumerate(indices):
                # Get context words within window
                start = max(0, i - self.window_size)
                end = min(len(indices), i + self.window_size + 1)

                for j in range(start, end):
                    if i != j:
                        context = indices[j]
                        pairs.append((center, context))

        return pairs

    def train(self, epochs=5, batch_size=128, lr=0.025):
        """Train the Word2Vec model"""
        optimizer = optim.Adam(self.model.parameters(), lr=lr)

        # Generate training data
        print("Generating training data...")
        training_pairs = self.generate_training_data()
        print(f"Training pairs: {len(training_pairs)}")

        self.model.train()

        for epoch in range(epochs):
            total_loss = 0
            np.random.shuffle(training_pairs)

            for i in range(0, len(training_pairs), batch_size):
                batch = training_pairs[i:i+batch_size]

                centers = torch.LongTensor([p[0] for p in batch])
                contexts = torch.LongTensor([p[1] for p in batch])
                neg_samples = self.get_negative_samples(len(batch))

                optimizer.zero_grad()
                loss = self.model(centers, contexts, neg_samples)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            avg_loss = total_loss / (len(training_pairs) / batch_size)
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    def get_word_vector(self, word):
        """Get embedding vector for a word"""
        if word not in self.word2idx:
            return None
        idx = self.word2idx[word]
        return self.model.in_embed.weight[idx].detach().numpy()

    def most_similar(self, word, top_k=5):
        """Find most similar words using cosine similarity"""
        if word not in self.word2idx:
            return []

        word_vec = self.get_word_vector(word)
        word_vec = word_vec / np.linalg.norm(word_vec)

        # Compute cosine similarity with all words
        all_vecs = self.model.in_embed.weight.detach().numpy()
        all_vecs = all_vecs / np.linalg.norm(all_vecs, axis=1, keepdims=True)

        similarities = np.dot(all_vecs, word_vec)

        # Get top-k most similar (excluding the word itself)
        top_indices = np.argsort(similarities)[::-1][1:top_k+1]

        results = [(self.idx2word[idx], similarities[idx]) for idx in top_indices]
        return results


# Example usage
if __name__ == "__main__":
    # Sample corpus
    corpus = [
        "the quick brown fox jumps over the lazy dog",
        "the dog is lazy and sleeps all day",
        "the fox is quick and clever",
        "a quick brown dog jumps high",
        "the lazy cat sleeps on the mat",
        "quick movements scare the lazy animals",
        "brown and white dogs play together",
        "the clever fox outsmarts the dog",
        "lazy afternoon with sleeping animals",
        "quick thinking saves the day"
    ] * 100  # Repeat for more training data

    # Initialize and train
    print("Initializing Word2Vec trainer...")
    trainer = Word2VecTrainer(
        sentences=corpus,
        embedding_dim=50,
        window_size=3,
        min_count=1,
        num_neg_samples=5
    )

    print("\nTraining Word2Vec model...")
    trainer.train(epochs=10, batch_size=32, lr=0.01)

    # Test similarity
    print("\n" + "="*50)
    print("Testing word similarities:")
    print("="*50)

    test_words = ['quick', 'lazy', 'dog', 'fox']
    for word in test_words:
        print(f"\nMost similar to '{word}':")
        similar = trainer.most_similar(word, top_k=3)
        for sim_word, score in similar:
            print(f"  {sim_word}: {score:.4f}")

Initializing Word2Vec trainer...
Vocabulary size: 32

Training Word2Vec model...
Generating training data...
Training pairs: 26400
Epoch 1/10, Loss: 2.0676
Epoch 2/10, Loss: 1.8739
Epoch 3/10, Loss: 1.8655
Epoch 4/10, Loss: 1.8576
Epoch 5/10, Loss: 1.8606
Epoch 6/10, Loss: 1.8584
Epoch 7/10, Loss: 1.8592
Epoch 8/10, Loss: 1.8588
Epoch 9/10, Loss: 1.8536
Epoch 10/10, Loss: 1.8574

Testing word similarities:

Most similar to 'quick':
  outsmarts: 0.6447
  high: 0.5565
  fox: 0.4588

Most similar to 'lazy':
  mat: 0.5642
  day: 0.4861
  quick: 0.4406

Most similar to 'dog':
  clever: 0.6392
  jumps: 0.5277
  over: 0.4887

Most similar to 'fox':
  a: 0.5424
  clever: 0.5423
  high: 0.4678


In [2]:
import torch
import torch.nn as nn
from collections import Counter
import math




class TFIDF:
    """TF-IDF implementation from scratch using PyTorch"""

    def __init__(self, norm='l2', smooth_idf=True, sublinear_tf=False):
        """
        Args:
            norm: 'l2' or 'l1' normalization (None for no normalization)
            smooth_idf: Add 1 to document frequencies (prevents division by zero)
            sublinear_tf: Use log(tf + 1) instead of raw term frequency
        """
        self.norm = norm
        self.smooth_idf = smooth_idf
        self.sublinear_tf = sublinear_tf
        self.vocab = {}  # word -> index mapping
        self.idf = None  # IDF weights

    def fit(self, documents):
        """
        Learn vocabulary and IDF weights from documents

        Args:
            documents: List of documents, where each doc is a list of tokens
                      e.g., [['hello', 'world'], ['hello', 'pytorch']]
        """
        # Build vocabulary
        vocab_set = set()
        for doc in documents:
            vocab_set.update(doc)

        self.vocab = {word: idx for idx, word in enumerate(sorted(vocab_set))}
        vocab_size = len(self.vocab)
        n_documents = len(documents)

        # Calculate document frequency for each term
        # df[i] = number of documents containing term i
        df = torch.zeros(vocab_size)

        for doc in documents:
            # Get unique terms in this document
            unique_terms = set(doc)
            for term in unique_terms:
                if term in self.vocab:
                    df[self.vocab[term]] += 1

        # Calculate IDF: log(N / df)
        # With smoothing: log((N + 1) / (df + 1)) + 1
        if self.smooth_idf:
            idf = torch.log((n_documents + 1) / (df + 1)) + 1
        else:
            idf = torch.log(n_documents / df)

        self.idf = idf
        return self

    def transform(self, documents):
        """
        Transform documents to TF-IDF matrix

        Args:
            documents: List of documents (list of tokens)

        Returns:
            TF-IDF matrix of shape [n_documents, vocab_size]
        """
        if self.idf is None:
            raise ValueError("Must call fit() before transform()")

        n_documents = len(documents)
        vocab_size = len(self.vocab)

        # Initialize TF matrix
        tf_matrix = torch.zeros(n_documents, vocab_size)

        # Calculate term frequencies
        for doc_idx, doc in enumerate(documents):
            # Count terms in document
            term_counts = Counter(doc)

            for term, count in term_counts.items():
                if term in self.vocab:
                    term_idx = self.vocab[term]

                    if self.sublinear_tf:
                        # Use log scaling: log(1 + tf)
                        tf_matrix[doc_idx, term_idx] = math.log(1 + count)
                    else:
                        # Raw term frequency
                        tf_matrix[doc_idx, term_idx] = count

        # Apply IDF weights: TF-IDF = TF * IDF
        tfidf_matrix = tf_matrix * self.idf.unsqueeze(0)

        # Normalize rows
        if self.norm == 'l2':
            # L2 normalization: each row has unit L2 norm
            norms = torch.sqrt((tfidf_matrix ** 2).sum(dim=1, keepdim=True))
            norms = torch.clamp(norms, min=1e-10)  # Avoid division by zero
            tfidf_matrix = tfidf_matrix / norms

        elif self.norm == 'l1':
            # L1 normalization: each row sums to 1
            norms = tfidf_matrix.sum(dim=1, keepdim=True)
            norms = torch.clamp(norms, min=1e-10)
            tfidf_matrix = tfidf_matrix / norms

        return tfidf_matrix

    def fit_transform(self, documents):
        """Fit and transform in one step"""
        return self.fit(documents).transform(documents)


# ============================================================================
# EXAMPLE USAGE
# ============================================================================

if __name__ == "__main__":
    # Sample documents (already tokenized)
    documents = [
        ['the', 'cat', 'sat', 'on', 'the', 'mat'],
        ['the', 'dog', 'sat', 'on', 'the', 'log'],
        ['cats', 'and', 'dogs', 'are', 'pets'],
        ['the', 'cat', 'and', 'the', 'dog']
    ]

    print("=" * 60)
    print("TF-IDF EXAMPLE")
    print("=" * 60)

    # Create and fit TF-IDF
    tfidf = TFIDF(norm='l2', smooth_idf=True, sublinear_tf=False)
    tfidf_matrix = tfidf.fit_transform(documents)

    print(f"\nVocabulary size: {len(tfidf.vocab)}")
    print(f"Number of documents: {len(documents)}")
    print(f"\nTF-IDF matrix shape: {tfidf_matrix.shape}")

    # Show IDF weights
    print("\n" + "=" * 60)
    print("IDF WEIGHTS (higher = rarer words)")
    print("=" * 60)
    sorted_vocab = sorted(tfidf.vocab.items(), key=lambda x: -tfidf.idf[x[1]])
    for word, idx in sorted_vocab[:10]:
        print(f"{word:15s} -> IDF: {tfidf.idf[idx]:.4f}")

    # Show TF-IDF for first document
    print("\n" + "=" * 60)
    print(f"DOCUMENT 1: {' '.join(documents[0])}")
    print("=" * 60)
    doc1_tfidf = tfidf_matrix[0]

    # Get non-zero entries
    nonzero_indices = torch.nonzero(doc1_tfidf).squeeze()
    vocab_reverse = {v: k for k, v in tfidf.vocab.items()}

    print("\nTF-IDF scores:")
    for idx in nonzero_indices:
        idx = idx.item()
        word = vocab_reverse[idx]
        score = doc1_tfidf[idx].item()
        print(f"{word:15s} -> {score:.4f}")

    # Compute similarity between documents
    print("\n" + "=" * 60)
    print("COSINE SIMILARITIES")
    print("=" * 60)

    # Cosine similarity (since we used L2 norm, it's just dot product)
    similarity_matrix = torch.mm(tfidf_matrix, tfidf_matrix.t())

    print("\nDoc 0 vs Doc 1 similarity:", similarity_matrix[0, 1].item())
    print("Doc 0 vs Doc 2 similarity:", similarity_matrix[0, 2].item())
    print("Doc 0 vs Doc 3 similarity:", similarity_matrix[0, 3].item())

    print("\nFull similarity matrix:")
    print(similarity_matrix.numpy().round(3))

TF-IDF EXAMPLE

Vocabulary size: 12
Number of documents: 4

TF-IDF matrix shape: torch.Size([4, 12])

IDF WEIGHTS (higher = rarer words)
are             -> IDF: 1.9163
cats            -> IDF: 1.9163
dogs            -> IDF: 1.9163
log             -> IDF: 1.9163
mat             -> IDF: 1.9163
pets            -> IDF: 1.9163
and             -> IDF: 1.5108
cat             -> IDF: 1.5108
dog             -> IDF: 1.5108
on              -> IDF: 1.5108

DOCUMENT 1: the cat sat on the mat

TF-IDF scores:
cat             -> 0.3719
mat             -> 0.4717
on              -> 0.3719
sat             -> 0.3719
the             -> 0.6022

COSINE SIMILARITIES

Doc 0 vs Doc 1 similarity: 0.6391986012458801
Doc 0 vs Doc 2 similarity: 0.0
Doc 0 vs Doc 3 similarity: 0.5680627226829529

Full similarity matrix:
[[1.    0.639 0.    0.568]
 [0.639 1.    0.    0.568]
 [0.    0.    1.    0.155]
 [0.568 0.568 0.155 1.   ]]
