___
# <font color= #d4b1e6> **Laboratorio 2: Skip-gram vs CBOW – Word Embeddings from Scratch** </font>
- <Strong> Nombre de los integrantes: </Strong>  <font color="blue">`Sarah Lucía Beltrán, Priscila Cervantes Ramírez, Mónica Ibarra Herrera` </font>
- <Strong> Materia: </Strong>  <font color="blue">`Minería de Textos` </font>
___

In [4]:
import re
import os
import random
import collections
from typing import List, Tuple, Dict

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.manifold import TSNE
import umap
import matplotlib.pyplot as plt
import seaborn as sns

### <font color= #d4b1e6> **Preprocesamiento** </font> 

In [5]:
def normalize_text(text: str) -> str:
    # Lowercase and remove punctuation (keep whitespace)
    text = text.lower()
    # remove all characters that are not alphanumeric or whitespace
    text = re.sub(r'[^a-z0-9\s]', ' ', text)
    # collapse whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [6]:

def build_vocab(tokens: List[str], min_freq=5, max_vocab=50000) -> Tuple[Dict[str,int], Dict[int,str], collections.Counter]:
    freq = collections.Counter(tokens)
    # remove rare words
    freq = {w:c for w,c in freq.items() if c >= min_freq}
    # sort by frequency
    sorted_items = sorted(freq.items(), key=lambda x: x[1], reverse=True)
    sorted_items = sorted_items[:max_vocab]
    vocab = {w:i+2 for i,(w,_) in enumerate(sorted_items)}  # reserve 0 for PAD, 1 for UNK
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1
    inv_vocab = {i:w for w,i in vocab.items()}
    # create a counter object for returned freq (only for kept words)
    kept_counter = collections.Counter({w:c for w,c in sorted_items})
    return vocab, inv_vocab, kept_counter

In [7]:
def tokens_to_indices(tokens: List[str], vocab: Dict[str,int]) -> List[int]:
    unk = vocab.get('<UNK>')
    return [vocab.get(t, unk) for t in tokens]

In [8]:

def generate_training_pairs(tokens: List[str],
                            vocab: Dict[str,int],
                            window_min=2,
                            window_max=5,
                            model_type='cbow') -> List[Tuple[List[int], int]]:
    """
    Returns list of (context_indices, target_index) for CBOW
    or list of (center_index, context_target_index) pairs for skip-gram.
    For skip-gram we will make one training pair per context word (center -> context_word).
    """
    pairs = []
    vocab_set = set(vocab.keys())
    unk_idx = vocab['<UNK>']
    N = len(tokens)
    for i, w in enumerate(tokens):
        # if word not in vocab (filtered), treat as UNK
        center_idx = vocab.get(w, unk_idx)
        window_size = random.randint(window_min, window_max)
        left = max(0, i - window_size)
        right = min(N, i + window_size + 1)
        context = []
        for j in range(left, right):
            if j == i:
                continue
            context_word = tokens[j]
            context_idx = vocab.get(context_word, unk_idx)
            context.append(context_idx)
        if not context:
            continue
        if model_type == 'cbow':
            # store context (list) and center
            pairs.append((context, center_idx))
        else:  # skipgram
            # produce one pair per context word (center -> that context target)
            for ctx_idx in context:
                pairs.append(([center_idx], ctx_idx))
    return pairs

### <font color= #d4b1e6> **Pytorch Dataset** </font> 

In [9]:
class Word2VecDataset(Dataset):
    def __init__(self, pairs: List[Tuple[List[int], int]], model_type='cbow'):
        self.pairs = pairs
        self.model_type = model_type

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        context_idxs, target_idx = self.pairs[idx]
        return context_idxs, target_idx


In [10]:
def collate_cbow(batch):
    # batch: list of (context_list, target)
    contexts, targets = zip(*batch)
    # contexts variable length -> we'll create tensor of shape (batch, max_len) and mask
    max_len = max(len(c) for c in contexts)
    batch_size = len(contexts)
    contexts_tensor = torch.zeros((batch_size, max_len), dtype=torch.long)
    mask = torch.zeros((batch_size, max_len), dtype=torch.float32)
    for i, c in enumerate(contexts):
        contexts_tensor[i, :len(c)] = torch.tensor(c, dtype=torch.long)
        mask[i, :len(c)] = 1.0
    targets_tensor = torch.tensor(targets, dtype=torch.long)
    return contexts_tensor, mask, targets_tensor

In [11]:
def collate_skipgram(batch):
    # for skipgram we stored ([center_idx], target_idx) so contexts are singletons
    centers = [c[0][0] for c,_ in batch]
    targets = [t for _,t in batch]
    centers_tensor = torch.tensor(centers, dtype=torch.long)
    targets_tensor = torch.tensor(targets, dtype=torch.long)
    return centers_tensor, targets_tensor

### <font color= #d4b1e6> **Modelos** </font> 

In [12]:
class Word2VecBaseModel(nn.Module):
    def __init__(self, vocab_size:int, embedding_dim:int=100):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        # optional: initialize embeddings
        nn.init.normal_(self.embed.weight, mean=0.0, std=0.1)

    def forward(self, *args, **kwargs):
        raise NotImplementedError

In [13]:
class CBOWModel(Word2VecBaseModel):
    def __init__(self, vocab_size:int, embedding_dim:int=100):
        super().__init__(vocab_size, embedding_dim)
        self.linear = nn.Linear(embedding_dim, vocab_size)

    def forward(self, contexts_idx: torch.LongTensor, mask: torch.FloatTensor):
        # contexts_idx: (B, L)
        # mask: (B, L) float where 1 means valid
        emb = self.embed(contexts_idx)  # (B, L, D)
        mask = mask.unsqueeze(-1)  # (B, L, 1)
        emb = emb * mask  # zero-out pads
        sum_emb = emb.sum(dim=1)  # (B, D)
        lengths = mask.sum(dim=1).clamp(min=1e-6)  # (B,1)
        avg_emb = sum_emb / lengths  # (B, D)
        logits = self.linear(avg_emb)  # (B, V)
        return logits

In [14]:
class SkipGramModel(Word2VecBaseModel):
    def __init__(self, vocab_size:int, embedding_dim:int=100):
        super().__init__(vocab_size, embedding_dim)
        self.linear = nn.Linear(embedding_dim, vocab_size)

    def forward(self, centers_idx: torch.LongTensor):
        emb = self.embed(centers_idx)  # (B, D)
        logits = self.linear(emb)  # (B, V)
        return logits

### <font color= #d4b1e6> **Training helper** </font> 

In [15]:
def train_epoch(model, dataloader, optimizer, criterion, device, model_type='cbow'):
    model.train()
    total_loss = 0.0
    pbar = tqdm(dataloader, desc="train", leave=False)
    for batch in pbar:
        optimizer.zero_grad()
        if model_type == 'cbow':
            contexts_tensor, mask, targets = batch
            contexts_tensor = contexts_tensor.to(device)
            mask = mask.to(device)
            targets = targets.to(device)
            logits = model(contexts_tensor, mask)
        else:
            centers_tensor, targets = batch
            centers_tensor = centers_tensor.to(device)
            targets = targets.to(device)
            logits = model(centers_tensor)
        loss = criterion(logits, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * targets.size(0)
        pbar.set_postfix(loss=loss.item())
    avg_loss = total_loss / len(dataloader.dataset)
    return avg_loss

In [16]:
def evaluate_similar_words(embedding_weights: np.ndarray,
                           word_to_idx: Dict[str,int],
                           idx_to_word: Dict[int,str],
                           anchor_words: List[str],
                           topk=10):
    # embeddings normalized
    emb_norm = embedding_weights / (np.linalg.norm(embedding_weights, axis=1, keepdims=True) + 1e-9)
    results = {}
    for w in anchor_words:
        idx = word_to_idx.get(w, None)
        if idx is None:
            results[w] = []
            continue
        vec = emb_norm[idx:idx+1]  # (1, D)
        sims = (emb_norm @ vec.T).squeeze()  # (V,)
        # remove self
        sims[idx] = -np.inf
        top_idx = np.argsort(-sims)[:topk]
        results[w] = [(idx_to_word[i], float(sims[i])) for i in top_idx]
    return results

### <font color= #d4b1e6> **Visualizaciones** </font> 

In [17]:
def plot_2d_embeddings(embeddings_2d: np.ndarray, words: List[str], title=''):
    plt.figure(figsize=(12,10))
    sns.scatterplot(x=embeddings_2d[:,0], y=embeddings_2d[:,1], s=30)
    for i,w in enumerate(words):
        plt.text(embeddings_2d[i,0]+0.005, embeddings_2d[i,1]+0.005, w, fontsize=9)
    plt.title(title)
    plt.tight_layout()
    plt.show()

In [18]:
def reduce_and_plot(embeddings: np.ndarray, idx_to_word: Dict[int,str], top_n=500, method='tsne', random_state=42, title=''):
    # select top_n most frequent indices (assumes idx_to_word ordering not by freq; user should pass frequent list)
    # Here we'll expect an ordered list of indices to keep
    if method == 'tsne':
        reducer = TSNE(n_components=2, init='pca', random_state=random_state, perplexity=30)
        emb2d = reducer.fit_transform(embeddings)
    else:
        reducer = umap.UMAP(n_components=2, random_state=random_state)
        emb2d = reducer.fit_transform(embeddings)
    return emb2d

### <font color= #d4b1e6> **Ejemplo de uso** </font> 

In [19]:
def run_pipeline(text_path: str,
                 model_type='cbow',  # 'cbow' or 'skipgram'
                 min_freq=5,
                 max_vocab=50000,
                 embedding_dim=100,
                 epochs=5,
                 batch_size=1024,
                 window_min=2,
                 window_max=5,
                 device=None):
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print("Device:", device)

    # 1. Read file
    with open(text_path, 'r', encoding='utf-8') as f:
        raw = f.read()
    print("Raw size (chars):", len(raw))

    # 2. Normalize and tokenize by whitespace
    print("Normalizing...")
    norm = normalize_text(raw)
    tokens = norm.split()  # tokenization by whitespace
    print("Total tokens:", len(tokens))

    # 3. Build vocabulary
    print("Building vocab...")
    vocab, inv_vocab, kept_counts = build_vocab(tokens, min_freq=min_freq, max_vocab=max_vocab)
    vocab_size = len(vocab)
    print(f"Vocab size (after filtering+cap): {vocab_size}")

    # 4. Generate pairs (with randomized window per center)
    print("Generating training pairs (this may take a while depending on corpus size)...")
    pairs = generate_training_pairs(tokens, vocab,
                                    window_min=window_min,
                                    window_max=window_max,
                                    model_type=model_type)
    print("Total training pairs:", len(pairs))

    # 5. Dataset / DataLoader
    dataset = Word2VecDataset(pairs, model_type=model_type)
    if model_type == 'cbow':
        collate_fn = lambda b: collate_cbow(b)
    else:
        collate_fn = lambda b: collate_skipgram(b)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, num_workers=2)

    # 6. Model
    model_cls = CBOWModel if model_type == 'cbow' else SkipGramModel
    model = model_cls(vocab_size=vocab_size, embedding_dim=embedding_dim).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    # 7. Train
    for ep in range(1, epochs+1):
        print(f"Epoch {ep}/{epochs}")
        avg_loss = train_epoch(model, dataloader, optimizer, criterion, device, model_type=model_type)
        print(f"Epoch {ep} avg loss: {avg_loss:.4f}")

    # 8. Get embeddings
    emb_weights = model.embed.weight.data.cpu().numpy()  # (V, D)

    # 9. Evaluation: most similar for anchor words
    idx_to_word = {i:w for w,i in vocab.items()}
    anchor_words = ['king', 'queen', 'apple', 'run', 'doctor']  # adjust as desired
    similar = evaluate_similar_words(emb_weights, vocab, idx_to_word, anchor_words, topk=10)
    print("\nMost similar words (cosine) — anchors:", anchor_words)
    for anchor, lst in similar.items():
        print(f"\n{anchor}:")
        for w,score in lst:
            print(f"  {w} ({score:.4f})")

    # 10. Visualization (top frequent words)
    # pick most frequent words from kept_counts — ensure we include words in vocab
    most_common = [w for w,_ in kept_counts.most_common(500) if w in vocab][:500]
    indices = [vocab[w] for w in most_common]
    emb_subset = emb_weights[indices]  # (n, D)

    # t-SNE
    print("Reducing with t-SNE...")
    tsne_2d = TSNE(n_components=2, init='pca', random_state=42, perplexity=30).fit_transform(emb_subset)
    plot_2d_embeddings(tsne_2d, most_common, title=f"{model_type.upper()} embeddings t-SNE")

    # UMAP
    print("Reducing with UMAP...")
    umap_2d = umap.UMAP(n_components=2, random_state=42).fit_transform(emb_subset)
    plot_2d_embeddings(umap_2d, most_common, title=f"{model_type.upper()} embeddings UMAP")

    # Return useful artifacts
    return {
        'model': model,
        'vocab': vocab,
        'inv_vocab': idx_to_word,
        'embeddings': emb_weights,
        'similar': similar,
        'most_common': most_common,
        'tsne_2d': tsne_2d,
        'umap_2d': umap_2d
    }

### <font color= #d4b1e6> **Como Script** </font> 

In [None]:
result = run_pipeline(
    text_path="text8",   # o "text 8" si tu archivo tiene espacio
    model_type="cbow",   # o "skipgram"
    min_freq=5,
    max_vocab=50000,
    embedding_dim=100,
    epochs=2,            # pon menos para probar
    batch_size=512,
    window_min=2,
    window_max=5
)

Device: cpu
Raw size (chars): 100000000
Normalizing...
Total tokens: 17005207
Building vocab...
Vocab size (after filtering+cap): 50002
Generating training pairs (this may take a while depending on corpus size)...
Total training pairs: 17005207
Epoch 1/2


train:   0%|          | 0/33214 [00:00<?, ?it/s]