In [1]:
import os
import re
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity


In [None]:
DATA_DIR = "data"
MODEL_DIR = "models"
RAW_TEXT_PATH = os.path.join(DATA_DIR, "processed_text.txt")
MAX_VOCAB_SIZE = 30000 
MIN_COUNT = 5  
MAX_TOKENS = 200000  

def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

ensure_dir(DATA_DIR)
ensure_dir(MODEL_DIR)

In [None]:
def simple_tokenize(text):
    text = text.lower()
    text = re.sub(r'[,.;:\(\)\[\]{}"\'\-_!?@#$%^&*+=<>~/\\|]', ' ', text)
    
    tokens = [token for token in text.split() if len(token) >= 2 and not token.isdigit()]
    
    return tokens[:MAX_TOKENS]

In [None]:
def load_small_dataset():
    if os.path.exists(RAW_TEXT_PATH):
        print("Loading cached text data...")
        with open(RAW_TEXT_PATH, 'r', encoding='utf-8') as f:
            text = f.read()
    else:
        print("Creating sample text data...")
            
        sample_text = """
        Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence 
        concerned with the interactions between computers and human language, in particular how to program computers 
        to process and analyze large amounts of natural language data. The goal is a computer capable of understanding 
        the contents of documents, including the contextual nuances of the language within them. The technology can then 
        accurately extract information and insights contained in the documents as well as categorize and organize the 
        documents themselves.
        
        Challenges in natural language processing frequently involve speech recognition, natural language understanding, 
        and natural language generation. Modern NLP algorithms are based on machine learning, especially statistical 
        machine learning and deep learning methods. Many different classes of machine-learning algorithms have been 
        applied to natural-language-processing tasks. These algorithms take as input a large set of features that are 
        generated from the input data. Some of the earliest-used algorithms, such as decision trees, produced systems 
        of hard if-then rules similar to existing hand-written rules. Increasingly, however, research has focused on 
        statistical models, which make soft, probabilistic decisions based on attaching real-valued weights to each 
        input feature.
        
        Word embeddings are one of the most popular representations of document vocabulary. They are capable of capturing 
        context of a word in a document, semantic and syntactic similarity, relation with other words, etc. There are 
        several methods to train and extract word embeddings from text, including Word2Vec, GloVe, and FastText. These 
        methods are based on the distributional hypothesis, which states that words that occur in the same contexts tend 
        to have similar meanings. This allows these models to learn word representations that capture meaningful semantic 
        relationships between words, which can be used for various NLP tasks such as sentiment analysis, named entity 
        recognition, and machine translation.
        """
        
        # Add it multiple times to get more data
        text = sample_text * 20  
        
        with open(RAW_TEXT_PATH, 'w', encoding='utf-8') as f:
            f.write(text)
    
    return text

In [None]:
def build_vocab(tokens, min_count=MIN_COUNT, max_size=MAX_VOCAB_SIZE):

    word_freq = Counter(tokens)
    vocab_list = [word for word, freq in word_freq.most_common(max_size) 
                 if freq >= min_count]
    
    word_to_idx = {word: idx for idx, word in enumerate(vocab_list)}
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    
    print(f"Vocabulary size: {len(word_to_idx)}")
    return word_to_idx, idx_to_word


In [None]:
class Word2VecDataset(Dataset):
    def __init__(self, tokens, word_to_idx, window_size=2, neg_samples=5):
        self.tokens = tokens
        self.word_to_idx = word_to_idx
        self.window_size = window_size
        self.neg_samples = neg_samples
        
        self.corpus = [word_to_idx[token] for token in tokens if token in word_to_idx]
        
        self._prepare_negative_sampling()
        
    def _prepare_negative_sampling(self):
        count = Counter(self.corpus)
        freq = np.zeros(len(self.word_to_idx))
        
        for idx, word_freq in count.items():
            freq[idx] = word_freq
        
        freq = freq ** 0.75
        self.neg_sampling_prob = freq / freq.sum()
    
    def __len__(self):
        return len(self.corpus) - 2 * self.window_size
    
    def __getitem__(self, idx):
        idx += self.window_size
        
        center_word_idx = self.corpus[idx]
        
        context_indices = list(range(idx - self.window_size, idx)) + \
                          list(range(idx + 1, idx + self.window_size + 1))
        
        context_pos = np.random.choice(context_indices)
        context_word_idx = self.corpus[context_pos]
        
        neg_samples = []
        while len(neg_samples) < self.neg_samples:
            neg_idx = np.random.choice(len(self.neg_sampling_prob), p=self.neg_sampling_prob)
            if neg_idx != center_word_idx and neg_idx != context_word_idx:
                neg_samples.append(neg_idx)
        
        return (
            torch.tensor(center_word_idx, dtype=torch.long),
            torch.tensor(context_word_idx, dtype=torch.long),
            torch.tensor(neg_samples, dtype=torch.long)
        )

In [None]:
class SkipGram(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super().__init__()
        self.center_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.context_embeddings = nn.Embedding(vocab_size, embedding_dim)
        
        # Initialize with small random values
        self.center_embeddings.weight.data.uniform_(-0.1, 0.1)
        self.context_embeddings.weight.data.uniform_(-0.1, 0.1)
    
    def forward(self, center_words, context_words, negative_words):
        # Get embeddings for center and positive context words
        center_emb = self.center_embeddings(center_words)  
        context_emb = self.context_embeddings(context_words)  
        
        pos_score = torch.sum(center_emb * context_emb, dim=1)  
        
        # Get embeddings for negative samples
        neg_emb = self.context_embeddings(negative_words) 
        
        # Compute negative scores (batch matrix multiplication)
        neg_score = torch.bmm(neg_emb, center_emb.unsqueeze(2)).squeeze()  
        
        # Apply loss function (negative sampling loss)
        pos_loss = F.logsigmoid(pos_score)
        neg_loss = F.logsigmoid(-neg_score).sum(dim=1)
        
        return -(pos_loss + neg_loss).mean()
    
    def get_embeddings(self):
        """Return the trained word embeddings"""
        return self.center_embeddings.weight.data.cpu().numpy()

In [None]:
def train_word2vec(tokens, word_to_idx, embedding_dim=100, epochs=5, batch_size=512, 
                   window_size=2, neg_samples=5, learning_rate=0.002):
    """Train Word2Vec model with checkpointing"""
    
    checkpoint_path = os.path.join(MODEL_DIR, "word2vec_checkpoint.pt")
    final_model_path = os.path.join(MODEL_DIR, "word2vec_final.pt")
    
    dataset = Word2VecDataset(
        tokens=tokens,
        word_to_idx=word_to_idx,
        window_size=window_size,
        neg_samples=neg_samples
    )
    
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,  
    )
    
    vocab_size = len(word_to_idx)
    model = SkipGram(vocab_size, embedding_dim)
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    print(f"Training on: {device}")
    
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    start_epoch = 0
    if os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resuming from epoch {start_epoch}")
    
    losses = []
    for epoch in range(start_epoch, epochs):
        total_loss = 0.0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}")
        
        for batch_idx, (centers, contexts, negatives) in enumerate(progress_bar):
            centers = centers.to(device)
            contexts = contexts.to(device)
            negatives = negatives.to(device)
            
            optimizer.zero_grad()
            loss = model(centers, contexts, negatives)
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            progress_bar.set_postfix(loss=f"{loss.item():.4f}")
            
            # Save checkpoint every 500 batches
            if (batch_idx + 1) % 500 == 0:
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': loss.item(),
                }, checkpoint_path)
                print(f"Checkpoint saved at batch {batch_idx+1}")
        
        # Calculate average loss for epoch
        avg_loss = total_loss / len(dataloader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
        
        # Save checkpoint after each epoch
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
        }, checkpoint_path)
    
    # Save final model
    torch.save(model.state_dict(), final_model_path)
    print(f"Final model saved to {final_model_path}")
    
    # Plot training loss
    plt.figure(figsize=(10, 5))
    plt.plot(range(1, len(losses) + 1), losses)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.savefig(os.path.join(MODEL_DIR, 'training_loss.png'))
    plt.close()
    
    return model

In [None]:
def evaluate_word_analogies(model, word_to_idx, idx_to_word):
    """Evaluate word analogies like king - man + woman = queen"""
    
    embeddings = model.get_embeddings()
    
    test_cases = [
        ('king', 'man', 'woman', 'queen'),
        ('paris', 'france', 'italy', 'rome'),
        ('good', 'better', 'bad', 'worse'),
        ('big', 'bigger', 'small', 'smaller')
    ]
    
    results = []
    
    for word_a, word_b, word_c, expected in test_cases:
        if not all(word in word_to_idx for word in [word_a, word_b, word_c, expected]):
            results.append(f"Skipping {word_a} - {word_b} + {word_c} = ? (missing word(s) in vocabulary)")
            continue
        
        idx_a = word_to_idx[word_a]
        idx_b = word_to_idx[word_b]
        idx_c = word_to_idx[word_c]
        
        target_vec = embeddings[idx_b] - embeddings[idx_a] + embeddings[idx_c]
        
        sims = cosine_similarity([target_vec], embeddings)[0]
        
        closest_idxs = np.argsort(sims)[::-1][:5]
        closest_words = [idx_to_word[idx] for idx in closest_idxs]
        
        closest_words = [w for w in closest_words if w not in [word_a, word_b, word_c]]
        
        expected_rank = closest_words.index(expected) + 1 if expected in closest_words else "not found"
        
        result = f"{word_a} - {word_b} + {word_c} = {closest_words[0]} (expected: {expected}, rank: {expected_rank})"
        results.append(result)
    
    return results

In [None]:
def visualize_embeddings(model, word_to_idx, idx_to_word, num_words=50):
    """Visualize word embeddings using PCA"""
    
    embeddings = model.get_embeddings()
    words = list(word_to_idx.keys())[:num_words]
    word_vectors = np.array([embeddings[word_to_idx[word]] for word in words])
    
    pca = PCA(n_components=2)
    result = pca.fit_transform(word_vectors)
    
    # Create plot
    plt.figure(figsize=(12, 10))
    plt.scatter(result[:, 0], result[:, 1], alpha=0.7)
    
    for i, word in enumerate(words):
        plt.annotate(word, xy=(result[i, 0], result[i, 1]), fontsize=9)
    
    plt.title('Word Embeddings Visualization')
    plt.savefig(os.path.join(MODEL_DIR, 'embeddings_visualization.png'))
    plt.close()
    
    return os.path.join(MODEL_DIR, 'embeddings_visualization.png')


In [None]:
def find_similar_words(model, word, word_to_idx, idx_to_word, top_n=10):
    """Find words similar to the input word"""
    if word not in word_to_idx:
        return [f"'{word}' not found in vocabulary"]
    
    embeddings = model.get_embeddings()
    
    word_idx = word_to_idx[word]
    word_vec = embeddings[word_idx]
    
    sims = cosine_similarity([word_vec], embeddings)[0]
    
    # Get top N similar words
    most_similar_idxs = np.argsort(sims)[::-1][:top_n+1]  # +1 because the word itself will be included
    similar_words = [(idx_to_word[idx], sims[idx]) for idx in most_similar_idxs if idx != word_idx]
    
    return similar_words[:top_n]


In [None]:
print("Loading and processing text data...")
text = load_small_dataset()
tokens = simple_tokenize(text)
print(f"Total tokens: {len(tokens)}")

word_to_idx, idx_to_word = build_vocab(tokens)

print("Training Word2Vec model...")
embedding_dim = 100
model = train_word2vec(
    tokens=tokens,
    word_to_idx=word_to_idx,
    embedding_dim=embedding_dim,
    epochs=5,
    batch_size=128,
    window_size=2,
    neg_samples=5,
    learning_rate=0.002
)

# Evaluate analogies
print("\nEvaluating word analogies:")
analogy_results = evaluate_word_analogies(model, word_to_idx, idx_to_word)
for result in analogy_results:
    print(result)

print("\nFinding similar words:")
for test_word in ['language', 'learning', 'computer', 'natural']:
    if test_word in word_to_idx:
        similar = find_similar_words(model, test_word, word_to_idx, idx_to_word, top_n=5)
        print(f"Words similar to '{test_word}':")
        for word, score in similar:
            print(f"  {word}: {score:.4f}")

print("\nVisualizing word embeddings...")
viz_path = visualize_embeddings(model, word_to_idx, idx_to_word)
print(f"Visualization saved to {viz_path}")

Loading and processing text data...
Loading cached text data...
Total tokens: 5860
Vocabulary size: 164
Training Word2Vec model...
Training on: cpu
Loading checkpoint...
Resuming from epoch 5
Final model saved to models\word2vec_final.pt

Evaluating word analogies:
Skipping king - man + woman = ? (missing word(s) in vocabulary)
Skipping paris - france + italy = ? (missing word(s) in vocabulary)
Skipping good - better + bad = ? (missing word(s) in vocabulary)
Skipping big - bigger + small = ? (missing word(s) in vocabulary)

Finding similar words:
Words similar to 'language':
  processing: 0.8102
  understanding: 0.8010
  natural: 0.7473
  generation: 0.7372
  translation: 0.7341
Words similar to 'learning':
  especially: 0.7623
  deep: 0.7515
  train: 0.7392
  machine: 0.7253
  statistical: 0.7065
Words similar to 'computer':
  capturing: 0.8090
  linguistics: 0.7688
  capable: 0.7400
  different: 0.7392
  subfield: 0.7262
Words similar to 'natural':
  frequently: 0.7840
  language: 0.