# Project 2: The Geometry of Meaning
## Training Word2Vec Skip-gram from Scratch

**Goal:** Build a Skip-gram model and visualize the learned embedding space.

## Part 1: Setup and Data Loading

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import seaborn as sns
from collections import Counter
from typing import List, Tuple, Dict

# Download TinyShakespeare
import urllib.request

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
try:
    urllib.request.urlretrieve(url, "tinyshakespeare.txt")
    print("Downloaded TinyShakespeare corpus")
except:
    print("Using local file or sample data")

# Load and preprocess
with open("tinyshakespeare.txt", "r") as f:
    text = f.read().lower()

# Simple tokenization (word-level)
words = text.split()
print(f"Total words: {len(words)}")
print(f"Unique words: {len(set(words))}")

### Build Vocabulary

In [None]:
def build_vocabulary(words: List[str], min_count: int = 5) -> Tuple[Dict[str, int], Dict[int, str]]:
    """
    Create word-to-index and index-to-word mappings.
    
    Args:
        words: List of words
        min_count: Minimum frequency to include word
        
    Returns:
        word2idx: Word to index mapping
        idx2word: Index to word mapping
    """
    # YOUR CODE HERE
    word_counts = Counter(words)
    
    # Filter words by minimum count
    vocab_words = [w for w, c in word_counts.items() if c >= min_count]
    
    # Create mappings
    word2idx = {w: i for i, w in enumerate(vocab_words)}
    idx2word = {i: w for w, i in word2idx.items()}
    
    return word2idx, idx2word

word2idx, idx2word = build_vocabulary(words, min_count=5)
vocab_size = len(word2idx)

print(f"Vocabulary size: {vocab_size}")
print(f"Sample words: {list(word2idx.keys())[:10]}")

## Part 2: Create Skip-gram Training Data

In [None]:
def create_skipgram_pairs(words: List[str], word2idx: Dict[str, int], window_size: int = 2) -> List[Tuple[int, int]]:
    """
    Generate (center_word, context_word) pairs for Skip-gram training.
    
    Args:
        words: Tokenized corpus
        word2idx: Word to index mapping
        window_size: Context window size (words on each side)
        
    Returns:
        List of (center_idx, context_idx) pairs
    """
    # YOUR CODE HERE
    pairs = []
    
    for i, center_word in enumerate(words):
        # Skip words not in vocabulary
        if center_word not in word2idx:
            continue
            
        center_idx = word2idx[center_word]
        
        # Get context words within window
        for j in range(max(0, i - window_size), min(len(words), i + window_size + 1)):
            if i == j:  # Skip center word itself
                continue
                
            context_word = words[j]
            if context_word in word2idx:
                context_idx = word2idx[context_word]
                pairs.append((center_idx, context_idx))
    
    return pairs

# Create training pairs
skipgram_pairs = create_skipgram_pairs(words, word2idx, window_size=2)
print(f"Total training pairs: {len(skipgram_pairs)}")
print(f"Sample pairs: {skipgram_pairs[:5]}")

### Create PyTorch Dataset

In [None]:
class SkipGramDataset(Dataset):
    def __init__(self, pairs: List[Tuple[int, int]]):
        self.pairs = pairs
        
    def __len__(self):
        return len(self.pairs)
    
    def __getitem__(self, idx):
        center, context = self.pairs[idx]
        return torch.tensor(center, dtype=torch.long), torch.tensor(context, dtype=torch.long)

dataset = SkipGramDataset(skipgram_pairs)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True)

print(f"Number of batches: {len(dataloader)}")

## Part 3: Implement Skip-gram Model

In [None]:
class SkipGramModel(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int):
        """
        Simple Skip-gram model.
        
        Args:
            vocab_size: Number of unique words
            embedding_dim: Dimension of embedding vectors
        """
        super().__init__()
        # YOUR CODE HERE
        # Hint: You need:
        # 1. Embedding layer for input words
        # 2. Linear layer to predict context words
        
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.output = nn.Linear(embedding_dim, vocab_size)
        
    def forward(self, center_word: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.
        
        Args:
            center_word: Tensor of center word indices [batch_size]
            
        Returns:
            Logits for context word prediction [batch_size, vocab_size]
        """
        # YOUR CODE HERE
        # 1. Get embeddings for center words
        # 2. Pass through output layer
        
        pass
    
    def get_embeddings(self) -> np.ndarray:
        """Return the learned word embeddings as numpy array."""
        return self.embeddings.weight.detach().cpu().numpy()

# Initialize model
embedding_dim = 128
model = SkipGramModel(vocab_size, embedding_dim)

print(f"Model parameters: {sum(p.numel() for p in model.parameters())}")
print(model)

## Part 4: Train the Model

In [None]:
def train_skipgram(model, dataloader, epochs=5, lr=0.01):
    """
    Train Skip-gram model.
    """
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    losses = []
    
    for epoch in range(epochs):
        epoch_loss = 0
        
        for center, context in dataloader:
            # YOUR CODE HERE
            # 1. Forward pass
            # 2. Compute loss
            # 3. Backward pass
            # 4. Update weights
            
            optimizer.zero_grad()
            
            # Forward
            logits = model(center)
            loss = criterion(logits, context)
            
            # Backward
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(dataloader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
    
    return losses

# Train model
losses = train_skipgram(model, dataloader, epochs=10, lr=0.01)

# Plot loss curve
plt.figure(figsize=(10, 5))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Skip-gram Training Loss')
plt.grid(True)
plt.show()

## Part 5: Visualize Embeddings

### Task 1: Cosine Similarity Matrix

In [None]:
def plot_cosine_matrix(embeddings: np.ndarray, words: List[str], top_n: int = 50):
    """
    Plot heatmap of cosine similarities between words.
    """
    # YOUR CODE HERE
    # 1. Compute pairwise cosine similarities
    # 2. Create heatmap
    
    from sklearn.metrics.pairwise import cosine_similarity
    
    # Select top_n most frequent words
    selected_embeddings = embeddings[:top_n]
    selected_words = words[:top_n]
    
    # Compute cosine similarity matrix
    cos_sim = cosine_similarity(selected_embeddings)
    
    # Plot
    plt.figure(figsize=(12, 10))
    sns.heatmap(cos_sim, xticklabels=selected_words, yticklabels=selected_words, cmap='coolwarm')
    plt.title('Cosine Similarity Matrix')
    plt.tight_layout()
    plt.show()

# Get embeddings and word list
embeddings = model.get_embeddings()
word_list = [idx2word[i] for i in range(len(idx2word))]

plot_cosine_matrix(embeddings, word_list, top_n=30)

### Task 2: 2D Projection with t-SNE

In [None]:
def plot_embeddings_2d(embeddings: np.ndarray, words: List[str], method='tsne', top_n: int = 200):
    """
    Project embeddings to 2D and visualize.
    
    Args:
        method: 'pca' or 'tsne'
    """
    # YOUR CODE HERE
    # Select subset
    selected_embeddings = embeddings[:top_n]
    selected_words = words[:top_n]
    
    # Dimensionality reduction
    if method == 'pca':
        reducer = PCA(n_components=2)
    else:
        reducer = TSNE(n_components=2, random_state=42)
    
    coords = reducer.fit_transform(selected_embeddings)
    
    # Plot
    plt.figure(figsize=(15, 12))
    plt.scatter(coords[:, 0], coords[:, 1], alpha=0.5)
    
    # Annotate points
    for i, word in enumerate(selected_words):
        plt.annotate(word, (coords[i, 0], coords[i, 1]), fontsize=8, alpha=0.7)
    
    plt.title(f'Word Embeddings ({method.upper()})')
    plt.xlabel('Component 1')
    plt.ylabel('Component 2')
    plt.tight_layout()
    plt.show()

plot_embeddings_2d(embeddings, word_list, method='tsne', top_n=100)

### Task 3: Test Semantic Analogies

In [None]:
def test_analogy(embeddings, word2idx, idx2word, a: str, b: str, c: str, top_k: int = 5):
    """
    Test analogy: a is to b as c is to ?
    
    Example: king - man + woman â‰ˆ queen
    
    Returns top_k most similar words to (b - a + c)
    """
    # YOUR CODE HERE
    from sklearn.metrics.pairwise import cosine_similarity
    
    # Get embeddings for input words
    if a not in word2idx or b not in word2idx or c not in word2idx:
        print(f"One or more words not in vocabulary")
        return
    
    a_vec = embeddings[word2idx[a]]
    b_vec = embeddings[word2idx[b]]
    c_vec = embeddings[word2idx[c]]
    
    # Compute target vector: b - a + c
    target_vec = b_vec - a_vec + c_vec
    
    # Find most similar words
    similarities = cosine_similarity([target_vec], embeddings)[0]
    
    # Get top-k (excluding input words)
    top_indices = similarities.argsort()[::-1]
    
    print(f"\n'{a}' is to '{b}' as '{c}' is to:")
    count = 0
    for idx in top_indices:
        word = idx2word[idx]
        if word not in [a, b, c] and count < top_k:
            print(f"  {count+1}. {word} (similarity: {similarities[idx]:.4f})")
            count += 1
        if count >= top_k:
            break

# Test some analogies (these may not work well on small corpus)
test_analogy(embeddings, word2idx, idx2word, 'king', 'queen', 'man')
test_analogy(embeddings, word2idx, idx2word, 'good', 'better', 'bad')

## Part 6: Compare One-Hot vs Learned Embeddings

In [None]:
class OneHotModel(nn.Module):
    """Baseline model using one-hot encoding."""
    def __init__(self, vocab_size: int):
        super().__init__()
        # No embedding layer - input is already one-hot
        self.output = nn.Linear(vocab_size, vocab_size)
    
    def forward(self, center_word_onehot: torch.Tensor) -> torch.Tensor:
        # YOUR CODE HERE
        pass

# Compare model sizes
onehot_model = OneHotModel(vocab_size)

skipgram_params = sum(p.numel() for p in model.parameters())
onehot_params = sum(p.numel() for p in onehot_model.parameters())

print(f"\nModel Size Comparison:")
print(f"Skip-gram (embedding_dim={embedding_dim}): {skipgram_params:,} parameters")
print(f"One-hot (no compression): {onehot_params:,} parameters")
print(f"Memory savings: {(1 - skipgram_params/onehot_params)*100:.1f}%")

## Part 7: Analysis and Reflection

### Questions to Answer:

1. **Why are one-hot vectors orthogonal?**
   - YOUR ANSWER HERE

2. **What does cosine similarity measure in embedding space?**
   - YOUR ANSWER HERE

3. **Did the analogies work? Why or why not?**
   - YOUR ANSWER HERE (Hint: corpus size matters!)

4. **What clusters did you observe in the t-SNE plot?**
   - YOUR ANSWER HERE

## ðŸŽ¯ Completion Checklist

- [ ] Built vocabulary from corpus
- [ ] Created Skip-gram training pairs
- [ ] Implemented `SkipGramModel`
- [ ] Trained model and plotted loss curve
- [ ] Visualized cosine similarity matrix
- [ ] Created 2D projection (t-SNE/PCA)
- [ ] Tested semantic analogies
- [ ] Compared with one-hot baseline
- [ ] Answered reflection questions

## ðŸš€ Next Project
Move to **03_rope_animator** to learn how to add positional information to sequences!