# Skip-gram with negative sampling

## Оформление проекта в модульном виде

In [1]:
import os

os.makedirs("skip-gram", exist_ok=True)

### 01. Загрузка данных 

Загрузка основного файла `quora.txt`

In [6]:
!wget https://www.dropbox.com/s/obaitrix9jyu84r/quora.txt?dl=1 -O ./data/quora.txt -nc

--2025-05-31 13:23:54--  https://www.dropbox.com/s/obaitrix9jyu84r/quora.txt?dl=1
Resolving www.dropbox.com (www.dropbox.com)... 162.125.70.18
Connecting to www.dropbox.com (www.dropbox.com)|162.125.70.18|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://www.dropbox.com/scl/fi/p0t2dw6oqs6oxpd6zz534/quora.txt?rlkey=bjupppwua4zmd4elz8octecy9&dl=1 [following]
--2025-05-31 13:23:57--  https://www.dropbox.com/scl/fi/p0t2dw6oqs6oxpd6zz534/quora.txt?rlkey=bjupppwua4zmd4elz8octecy9&dl=1
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://ucf7487853ad7b3331d7974a6222.dl.dropboxusercontent.com/cd/0/inline/Cqt0c8p9awrwnNl-QJ3DSfg2uXtQaVS314z7mDlr3kbvD4V1PlUmc7GzxSxGmQzk4jbRxUsO_JbBT5OmxhMHsorCgugYDeuyWlC4lGWVqqXDNkEHNSO95BnpgM5V_DoksD8/file?dl=1# [following]
--2025-05-31 13:23:58--  https://ucf7487853ad7b3331d7974a6222.dl.dropboxusercontent.com/cd/0/inline/Cqt0c8p9awrwnNl-QJ3DSfg2uXtQaVS314z7m

In [2]:
# from pathlib import Path
# import requests

# # путь к папке с данными
# data_path = Path("data/")

# # если папки нет, то она будет создана
# if data_path.is_dir():
#     print(f"{data_path} directory exists.")
# else:
#     print(f"Did not find {data_path} directory, creating one...")
#     data_path.mkdir(parents=True, exist_ok=True)

# with open(data_path / "quora.txt", "wb") as f:
#     print("Downloading quora.txt...")
#     request = requests.get("https://yadi.sk/i/BPQrUu1NaTduEw")
#     f.write(request.content)

In [3]:
data = list(open("./data/quora.txt", encoding="utf-8"))
data[50]

"What TV shows or books help you read people's body language?\n"

### 02. Предобработка данных

In [44]:
%%writefile skip-gram/data_preprocessing.py
"""
Contains functionality for data preprocessing.
"""
from nltk.tokenize import WordPunctTokenizer
from collections import Counter
from itertools import chain
import numpy as np
import string

def subsample_frequent_words(word_count_dict, threshold=1e-5):
    """Calculates the subsampling probabilities for words based on their frequencies.

    This function is used to determine the probability of keeping a word in the dataset
    when subsampling frequent words. The method used is inspired by the subsampling approach
    in Word2Vec, where each word's frequency affects its probability of being kept.

    Parameters:
    - word_count_dict (dict): A dictionary where keys are words and values are the counts of those words.
    - threshold (float, optional): A threshold parameter used to adjust the frequency of word subsampling.
                                   Defaults to 1e-5.

    Returns:
    - dict: A dictionary where keys are words and values are the probabilities of keeping each word.
    """
    all_w_count = sum(word_count_dict.values())
    freq = {word: word_count_dict[word] / all_w_count for word in word_count_dict}
    prob = {word: (threshold / freq[word]) ** 0.5 for word in freq}
    return prob

def get_negative_sampling_prob(word_count_dict):
    """Calculates the negative sampling probabilities for words based on their frequencies.

    This function adjusts the frequency of each word raised to the power of 0.75, which is
    commonly used in algorithms like Word2Vec to moderate the influence of very frequent words.
    It then normalizes these adjusted frequencies to ensure they sum to 1, forming a probability
    distribution used for negative sampling.

    Parameters:
    - word_count_dict (dict): A dictionary where keys are words and values are the counts of those words.

    Returns:
    - dict: A dictionary where keys are words and values are the probabilities of selecting each word
            for negative sampling.
    """
    all_w_count = sum(word_count_dict.values())
    freq = {word: (word_count_dict[word] / all_w_count) ** 0.75 for word in word_count_dict}
    Z = sum(freq.values())
    return {word: freq[word] / Z for word in freq}

def preprocessing(
    data_path: str,
    min_count: int = 5,
    window_radius: int = 5
):
    """Preprocess data and return different word sampling arrays and dictionaries

    Takes in a data directory path and returns context pairs array,
    array probabilities of negative sampling and array of probabilities of keeping words

    Parameters:
    - data_path: Path to data.
    - min_count: min number of word occurance in data to add to vocabulary.
    - window_radius: number of words to add to the context before and after central word.

    Returns:
    - word_to_index: mapping of allowed words in data to indexes
    - context_pairs: array of tuples (central_word_idx, context_word_idx)
    - keep_prob_array: array of probabilities for every allowed word to keep
    - negative_sampling_prob_array: array of probabilities for every allowed word
        to use as negative sample
    """
    data = list(open(data_path, encoding="utf-8"))

    tokenizer = WordPunctTokenizer()
    data_tok = [
        tokenizer.tokenize(
            line.translate(str.maketrans("", "", string.punctuation)).lower()
        )
        for line in data
    ] # генератор в котором токенизируем каждое предложение
    data_tok = [x for x in data_tok if len(x) >= 3] # оставляем только те, чьи длина больше 2 (т.е. минимум два слова в предложении)

    vocabulary_with_counter = Counter(chain.from_iterable(data_tok))

    word_count_dict = dict()
    for word, counter in vocabulary_with_counter.items():
        if counter >= min_count: # отбрасываем слова встречаемые реже 5 раз
            word_count_dict[word] = counter
    
    vocabulary = set(word_count_dict.keys())
    del vocabulary_with_counter

    word_to_index = {word: index for index, word in enumerate(vocabulary)} # (слово, индекс)
    index_to_word = {index: word for word, index in word_to_index.items()} # (индекс, слово)

    context_pairs = []
    for text in data_tok:
        for i, central_word in enumerate(text): # выбираем центральное слово
            context_indices = range(
                max(0, i - window_radius), min(i + window_radius, len(text))
            ) # сбор контекста к центральному слову
            for j in context_indices:
                if j == i:
                    continue
                context_word = text[j]
                if central_word in vocabulary and context_word in vocabulary:
                    context_pairs.append(
                        (word_to_index[central_word], word_to_index[context_word]) # нашли пары разрешенных слов и добавили в массив
                    )
    keep_prob_dict = subsample_frequent_words(word_count_dict)
    negative_sampling_prob_dict = get_negative_sampling_prob(word_count_dict)
    # полученные массивы
    keep_prob_array = np.array(
        [keep_prob_dict[index_to_word[idx]] for idx in range(len(word_to_index))]
    )
    negative_sampling_prob_array = np.array(
        [
            negative_sampling_prob_dict[index_to_word[idx]]
            for idx in range(len(word_to_index))
        ]
    )
    
    
    return word_to_index, context_pairs, keep_prob_array, negative_sampling_prob_array

Overwriting skip-gram/data_preprocessing.py


### 03. Создание DataLoader

In [43]:
%%writefile skip-gram/data_setup.py
"""
Contains functionality for creating PyTorch DataLoader for 
text data.
"""
import data_preprocessing
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import random

NUM_WORKERS = os.cpu_count()

# def get_one_random_sample_with_negatives(context_pairs,
#                                          keep_prob_array,
#                                          negative_sampling_prob_array,
#                                          num_negatives):
#     """Returns one sample of center, context and negative samples

#     Parameters:
#     - context_pairs: array of tuples (central_word_idx, context_word_idx)
#     - keep_prob_array: array of probabilities for every allowed word to keep
#     - negative_sampling_prob_array: array of probabilities for every allowed word
#         to use as negative sample

#     Returns:
#     A tuple of center and context words with negative samples.
#     In the form (center, context, neg_sample).
#     """
#     while True:
#         center, context = random.choice(context_pairs)
#         if random.random() < keep_prob_array[center]:
#             neg_sample = np.random.choice(
#                 range(len(negative_sampling_prob_array)),
#                 size=num_negatives,
#                 p=negative_sampling_prob_array,
#             )
#             return (center, context, neg_sample)

class Word2VecDataset(Dataset):
    def __init__(self, context_pairs):
        self.context_pairs = context_pairs

    def __len__(self):
        return len(self.context_pairs)

    def __getitem__(self, idx):
        return self.context_pairs[idx]

def get_dataloader(
    data_path: str,
    batch_size: int = 5000,
    # num_workers: int=NUM_WORKERS,
    num_workers: int=1,
    min_count: int = 5,
    window_radius: int = 5,
    num_negatives: int = 15
):
    """
    
    """
    word_to_index, context_pairs, keep_prob_array, negative_sampling_prob_array = preprocessing(data_path,
                                                                                                min_count,
                                                                                                window_radius)
    dataset = Word2VecDataset(context_pairs)
    neg_sampler = WeightedRandomSampler(negative_sampling_prob_array, num_negatives, replacement=True)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    return dataloader, neg_sampler, word_to_index

Overwriting skip-gram/data_setup.py


### 04. Создание модели

In [42]:
%%writefile skip-gram/model_builder.py
"""
Contains PyTorch model code to instantiate a SkipGramModelWithNegSampling model.
"""
import torch
import torch.autograd as autograd
import torch.nn as nn

class SkipGramModelWithNegSampling(nn.Module):
    """
    """
    def __init__(self, vocab_size, embedding_dim):
        super(SkipGramModelWithNegSampling, self).__init__()
        self.embeddings_in = nn.Embedding(vocab_size, embedding_dim) # center
        self.embeddings_out = nn.Embedding(vocab_size, embedding_dim) # context
        
        # никакая логсигмоида нам не нужна! это все заложено в лоссе
        torch.nn.init.xavier_uniform_(self.embeddings_in.weight)
        torch.nn.init.xavier_uniform_(self.embeddings_out.weight)
        
    def forward(self, center_words, pos_context_words, neg_context_words):
        # center_words — входные слова
        # pos_context_words — таргет, т.е. правильный контекст (реально существующий для входного слова)
        # neg_context_words — отрицательные примеры — то что не должно быть в контексте

        v_in = self.embeddings_in(center_words) 
        v_out = self.embeddings_out(pos_context_words)
        v_neg = self.embeddings_out(neg_context_words)
        
        pos_scores = (torch.sum(v_in * v_out, dim=1))
        neg_scores = (torch.bmm(v_neg, v_in.unsqueeze(2)).squeeze(2)) #.sum(1) # bmm - батчевое (по 2D-матричное) перемножение матриц
        return pos_scores, neg_scores

Overwriting skip-gram/model_builder.py


### 05. Обучение модели

In [41]:
%%writefile skip-gram/engine.py
"""
"""
import torch
import numpy as np
from tqdm.auto import tqdm as tqdma

def train(
    model: torch.nn.Module,
    dataloader: torch.utils.data.DataLoader,
    neg_sampler,
    loss_fn: torch.nn.Module,
    optimizer: torch.optim.Optimizer,
    lr_scheduler: torch.optim.lr_scheduler,
    device: torch.device,
    steps: int,
    batch_size: int = 5000,
):
    """
    """
    model.train()
    step = 0
    loss_history = []
    pos_labels = torch.ones(batch_size).to(device)
    neg_labels = torch.zeros(batch_size, num_negatives).to(device)
    while step <= steps:
        for target, context in dataloader:
            if step > steps:
                break
            center_words = target.long().to(device)
            pos_context_words = context.long().to(device)
            neg_context = torch.LongTensor(np.array([np.array(list(neg_sampler)) for t in center_words]))
            neg_context_words = neg_context.long().to(device)
    
            optimizer.zero_grad()
            pos_scores, neg_scores = model(
                center_words, pos_context_words, neg_context_words
            )
            loss_pos = loss_fn(pos_scores, pos_labels)
            loss_neg = loss_fn(neg_scores, neg_labels)
    
            loss = loss_pos + loss_neg
            loss.backward()
            optimizer.step()
    
            loss_history.append(loss.item())
            lr_scheduler.step(loss_history[-1])
    
            if step % 10 == 0:
                print(f"Step {step}, Loss: {np.mean(loss_history[-10:])}, learning rate: {lr_scheduler._last_lr}")
            step += 1

    return np.mean(loss_history)

Overwriting skip-gram/engine.py


### 06. Сохранение готовой модели

In [40]:
%%writefile skip-gram/utils.py
"""
Contains various utility functions for PyTorch model training and saving.
"""
from pathlib import Path
import torch

def save_model(model: torch.nn.Module,
               target_dir: str,
               model_name: str):
    """Saves a PyTorch model to a target directory.

    Args:
    model: A target PyTorch model to save.
    target_dir: A directory for saving the model to.
    model_name: A filename for the saved model. Should include
      either ".pth" or ".pt" as the file extension.

    Example usage:
    save_model(model=model_0,
               target_dir="models",
               model_name="skip-gram with negative sampling.pth")
    """
    # Create target directory
    target_dir_path = Path(target_dir)
    target_dir_path.mkdir(parents=True,
                        exist_ok=True)

    # Create model save path
    assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
    model_save_path = target_dir_path / model_name

    # Save the model state_dict()
    print(f"[INFO] Saving model to: {model_save_path}")
    torch.save(obj=model.state_dict(),
             f=model_save_path)

Writing skip-gram/utils.py


### 07. Обучение модели, проверка и сохранение

In [10]:
dataloader, neg_sampler, word_to_index = get_dataloader('data/quora.txt')

In [39]:
%%writefile skip-gram/train.py
"""
"""
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR
import torch.optim as optim
import data_setup, engine, model_builder, utils

torch.manual_seed(42)
torch.cuda.manual_seed(42)

NUM_STEPS = 750
device = "cuda" if torch.cuda.is_available() else "cpu"


vocab_size = len(word_to_index)
embedding_dim = 32
num_negatives = 15
model_0 = SkipGramModelWithNegSampling(vocab_size, embedding_dim).to(device)

# Setup loss function and optimizer
loss_fn = nn.BCEWithLogitsLoss() # тот самый лосс, похож на логлосс
optimizer = optim.Adam(model_0.parameters(), lr=0.05)
lr_scheduler = ReduceLROnPlateau(optimizer, factor=0.5, patience=30) # штука, которая будет уполовинивать (* factor = 1/2) lr при отсутствии улучшений в течение 150 эпох

# Start the timer
from timeit import default_timer as timer 
start_time = timer()

# Train model_0 
model_0_results = train(model=model_0, 
                        dataloader=dataloader,
                        neg_sampler=neg_sampler,
                        loss_fn=loss_fn, 
                        optimizer=optimizer,
                        lr_scheduler=lr_scheduler,
                        steps=NUM_STEPS,
                        device=device)

# End the timer and print out how long it took
end_time = timer()
print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")

# Save the model
save_model(model=model_0,
           target_dir="models",
           model_name="skip-gram with negative sampling.pth")

Overwriting skip-gram/train.py


In [32]:
model_0_results = train(model=model_0, 
                        dataloader=dataloader,
                        neg_sampler=neg_sampler,
                        loss_fn=loss_fn, 
                        optimizer=optimizer,
                        lr_scheduler=lr_scheduler,
                        steps=NUM_STEPS,
                        device=device)

Step 0, Loss: 1.178739309310913, learning rate: [0.003125]
Step 10, Loss: 1.1792627811431884, learning rate: [0.003125]
Step 20, Loss: 1.1801915049552918, learning rate: [0.003125]
Step 30, Loss: 1.1780844688415528, learning rate: [0.003125]
Step 40, Loss: 1.1769014477729798, learning rate: [0.0015625]
Step 50, Loss: 1.1806744456291198, learning rate: [0.0015625]
Step 60, Loss: 1.1832282185554504, learning rate: [0.0015625]
Step 70, Loss: 1.180414652824402, learning rate: [0.00078125]
Step 80, Loss: 1.1816535592079163, learning rate: [0.00078125]
Step 90, Loss: 1.1767923831939697, learning rate: [0.00078125]
Step 100, Loss: 1.1825859546661377, learning rate: [0.000390625]
Step 110, Loss: 1.177309775352478, learning rate: [0.000390625]
Step 120, Loss: 1.1782177329063415, learning rate: [0.000390625]
Step 130, Loss: 1.176673173904419, learning rate: [0.000390625]
Step 140, Loss: 1.1798057556152344, learning rate: [0.0001953125]
Step 150, Loss: 1.1756161451339722, learning rate: [0.000195

In [33]:
index_to_word = {i : w for w, i in word_to_index.items()}

In [34]:
_model_parameters = model_0.parameters()
embedding_matrix_center = next(
    _model_parameters
).detach()  # Assuming that first matrix was for central word
embedding_matrix_context = next(
    _model_parameters
).detach()  # Assuming that second matrix was for context word

In [35]:
def get_word_vector(word, embedding_matrix, word_to_index=word_to_index):
    return embedding_matrix[word_to_index[word]]

In [36]:
import torch.nn.functional as F

def find_nearest(word, embedding_matrix, word_to_index=word_to_index, k=10):
    word_vector = get_word_vector(word, embedding_matrix)[None, :]
    dists = F.cosine_similarity(embedding_matrix, word_vector)
    index_sorted = torch.argsort(dists)
    top_k = index_sorted[-k:]
    return [(index_to_word[x], dists[x].item()) for x in top_k.numpy()]

In [37]:
find_nearest("python", embedding_matrix_context, k=10)

[('ios', 0.7597508430480957),
 ('pointers', 0.7638192772865295),
 ('matlab', 0.7749127745628357),
 ('linux', 0.7759209275245667),
 ('html', 0.7836021780967712),
 ('javascript', 0.7920598387718201),
 ('c', 0.8066774606704712),
 ('programming', 0.8400071263313293),
 ('java', 0.8756576180458069),
 ('python', 1.0000001192092896)]