# Word2vec - Skip-Gram with Negative Sampling

## Необходимые библиотеки

In [1]:
!pip install --upgrade nltk bokeh umap-learn

Collecting bokeh
  Downloading bokeh-3.7.3-py3-none-any.whl.metadata (12 kB)
Collecting umap-learn
  Downloading umap_learn-0.5.7-py3-none-any.whl.metadata (21 kB)
Collecting narwhals>=1.13 (from bokeh)
  Downloading narwhals-1.41.0-py3-none-any.whl.metadata (11 kB)
Downloading bokeh-3.7.3-py3-none-any.whl (7.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.0/7.0 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading umap_learn-0.5.7-py3-none-any.whl (88 kB)
Downloading narwhals-1.41.0-py3-none-any.whl (357 kB)
Installing collected packages: narwhals, bokeh, umap-learn
  Attempting uninstall: bokeh
    Found existing installation: bokeh 3.6.0
    Uninstalling bokeh-3.6.0:
      Successfully uninstalled bokeh-3.6.0
  Attempting uninstall: umap-learn
    Found existing installation: umap-learn 0.5.6
    Uninstalling umap-learn-0.5.6:
      Successfully uninstalled umap-learn-0.5.6
Successfully installed bokeh-3.7.3 narwhals-1.41.0 umap-lear

In [None]:
import itertools
import random
import string
from collections import Counter
from itertools import chain

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import umap
from IPython.display import clear_output
from matplotlib import pyplot as plt
from nltk.tokenize import WordPunctTokenizer
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR
from tqdm.auto import tqdm as tqdma

## Подготовка данных

In [3]:
# download the data:
!wget https://www.dropbox.com/s/obaitrix9jyu84r/quora.txt?dl=1 -O ./quora.txt -nc
# alternative download link: https://yadi.sk/i/BPQrUu1NaTduEw

File ‘./quora.txt’ already there; not retrieving.


In [4]:
data = list(open("./quora.txt", encoding="utf-8"))
data[50]

"What TV shows or books help you read people's body language?\n"

Токенизируем на отдельные слова

[`WordPunctTokenizer()`](https://www.nltk.org/api/nltk.tokenize.regexp.html) — токенизация на слова и токены пунктуации

In [6]:
tokenizer = WordPunctTokenizer()

print(tokenizer.tokenize(data[50]))

['What', 'TV', 'shows', 'or', 'books', 'help', 'you', 'read', 'people', "'", 's', 'body', 'language', '?']


[`str.maketrans(s1 : str, s2 : str, to_none : str)`](https://docs.python.org/3/library/stdtypes.html#str.maketrans) — задает таблицу отображения слов, последний аргумент задает символы, которые будут отображены в None.
[`str.translate(map)`](https://docs.python.org/3/library/stdtypes.html#str.translate) — по отображению создает обновленную копию строки.

In [7]:
data_tok = [
    tokenizer.tokenize(
        line.translate(str.maketrans("", "", string.punctuation)).lower()
    )
    for line in data
] # генератор в котором токенизируем каждое предложение
data_tok = [x for x in data_tok if len(x) >= 3] # оставляем только те, чьи длина больше 2 (т.е. минимум два слова в предложении)

Проверки корректности

In [10]:
assert all(
    isinstance(row, (list, tuple)) for row in data_tok
), "please convert each line into a list of tokens (strings)"
assert all(
    all(isinstance(tok, str) for tok in row) for row in data_tok
), "please convert each line into a list of tokens (strings)"
is_latin = lambda tok: all("a" <= x.lower() <= "z" for x in tok)
assert all(
    map(lambda l: not is_latin(l) or l.islower(), map(" ".join, data_tok))
), "please make sure to lowercase the data"

Задаем ширину окна, предобрабатываем и собираем контекстные пары

In [11]:
min_count = 5
window_radius = 5

`chain.from_iterable(iter)` — получает все отдельные элементы внутри объектов итератора: `chain.from_iterable(['ABC', 'DEF']) → A B C D E F`

т.е. в данной ситуации собираем список всех слов из всех текстов

In [12]:
vocabulary_with_counter = Counter(chain.from_iterable(data_tok))

word_count_dict = dict()
for word, counter in vocabulary_with_counter.items():
    if counter >= min_count: # отбрасываем слова встречаемые реже 5 раз
        word_count_dict[word] = counter

vocabulary = set(word_count_dict.keys())
del vocabulary_with_counter

In [14]:
word_to_index = {word: index for index, word in enumerate(vocabulary)} # (слово, индекс)
index_to_word = {index: word for word, index in word_to_index.items()} # (индекс, слово)

Генерируем пары вида `(слово, контекст)`

In [15]:
context_pairs = []

for text in data_tok:
    for i, central_word in enumerate(text): # выбираем центральное слово
        context_indices = range(
            max(0, i - window_radius), min(i + window_radius, len(text))
        ) # сбор контекста к центральному слову
        for j in context_indices:
            if j == i:
                continue
            context_word = text[j]
            if central_word in vocabulary and context_word in vocabulary:
                context_pairs.append(
                    (word_to_index[central_word], word_to_index[context_word]) # нашли пары разрешенных слов и добавили в массив
                )

print(f"Generated {len(context_pairs)} pairs of target and context words.")

Generated 40220313 pairs of target and context words.


### Подзадача №1: subsampling
Для того, чтобы сгладить разницу в частоте встречаемсости слов, необходимо реализовать механизм subsampling'а.
Для этого реализована функция ниже.

Вероятность **оставить** слово из обучения (на фиксированном шаге) вычисляется как
$$
P_\text{save}(w_i)=\sqrt{\frac{t}{f(w_i)}},
$$
где $f(w_i)$ – нормированная частота встречаемости слова, а $t$ – заданный порог (threshold).

In [17]:
def subsample_frequent_words(word_count_dict, threshold=1e-5):
    """
    Calculates the subsampling probabilities for words based on their frequencies.

    This function is used to determine the probability of keeping a word in the dataset
    when subsampling frequent words. The method used is inspired by the subsampling approach
    in Word2Vec, where each word's frequency affects its probability of being kept.

    Parameters:
    - word_count_dict (dict): A dictionary where keys are words and values are the counts of those words.
    - threshold (float, optional): A threshold parameter used to adjust the frequency of word subsampling.
                                   Defaults to 1e-5.

    Returns:
    - dict: A dictionary where keys are words and values are the probabilities of keeping each word.
    """
    all_w_count = sum(word_count_dict.values())
    freq = {word: word_count_dict[word] / all_w_count for word in word_count_dict}
    prob = {word: (threshold / freq[word]) ** 0.5 for word in freq}
    return prob


### Подзадача №2: negative sampling
Для более эффективного обучения необходимо не только предсказывать высокие вероятности для слов из контекста, но и предсказывать низкие для слов, не встреченных в контексте. Для этого необходимо вычислить вероятность использовать слово в качестве negative sample.

В оригинальной статье предлагается оценивать вероятность слов выступать в качестве negative sample согласно распределению $P_n(w)$
$$
P_n(w) = \frac{U(w)^{3/4}}{Z},
$$

где $U(w)$ распределение слов по частоте (или, как его еще называют, по униграммам) (т.е. в нашей ситуации $U(w)$ == $f(w)$), а $Z$ – нормировочная константа, чтобы общая мера была равна $1$.

In [18]:
def get_negative_sampling_prob(word_count_dict):
    """
    Calculates the negative sampling probabilities for words based on their frequencies.

    This function adjusts the frequency of each word raised to the power of 0.75, which is
    commonly used in algorithms like Word2Vec to moderate the influence of very frequent words.
    It then normalizes these adjusted frequencies to ensure they sum to 1, forming a probability
    distribution used for negative sampling.

    Parameters:
    - word_count_dict (dict): A dictionary where keys are words and values are the counts of those words.

    Returns:
    - dict: A dictionary where keys are words and values are the probabilities of selecting each word
            for negative sampling.
    """
    all_w_count = sum(word_count_dict.values())
    freq = {word: (word_count_dict[word] / all_w_count) ** 0.75 for word in word_count_dict}
    Z = sum(freq.values())
    return {word: freq[word] / Z for word in freq}
    

### Формирование словарей

Для удобства, преобразуем полученные словари в массивы (т.к. все слова все равно уже пронумерованы).

In [19]:
keep_prob_dict = subsample_frequent_words(word_count_dict)
assert keep_prob_dict.keys() == word_count_dict.keys()

In [20]:
negative_sampling_prob_dict = get_negative_sampling_prob(word_count_dict)
assert negative_sampling_prob_dict.keys() == negative_sampling_prob_dict.keys()
assert np.allclose(sum(negative_sampling_prob_dict.values()), 1)

In [21]:
# полученные массивы
keep_prob_array = np.array(
    [keep_prob_dict[index_to_word[idx]] for idx in range(len(word_to_index))]
)
negative_sampling_prob_array = np.array(
    [
        negative_sampling_prob_dict[index_to_word[idx]]
        for idx in range(len(word_to_index))
    ]
)

## Skip-Gram

Наконец, время реализовать модель.

Напомним, что в случае negative sampling решается задача максимизации следующего функционала:

$$
\mathcal{L} = \log \sigma({\mathbf{v}'_{w_O}}^\top \mathbf{v}_{w_I}) + \sum_{i=1}^{k} \mathbb{E}_{w_i \sim P_n(w)} \left[ \log \sigma({-\mathbf{v}'_{w_i}}^\top \mathbf{v}_{w_I}) \right],
$$

где:
- $\mathbf{v}_{w_I}$ – вектор центрального слова $w_I$,
- $\mathbf{v}'_{w_O}$ – вектор слова из контекста $w_O$,
- $k$ – число negative samples,
- $P_n(w)$ – распределение negative samples, заданное выше,
- $\sigma$ – сигмоида.

Далее по ходу работы будем использовать лосс: nn.BCEWithLogitsLoss(), в котором уже реализовано почти все, остается посчитать только произведения векторов

Модель использует два типа эмбеддингов:
* `self.embeddings_in = nn.Embedding(vocab_size, embedding_dim)` — центральное слово
* `self.embeddings_out = nn.Embedding(vocab_size, embedding_dim)` — контекст

[`torch.nn.init.xavier_uniform_`](https://docs.pytorch.org/docs/stable/nn.init.html#torch.nn.init.xavier_uniform_) — начальная инициализация весов эмбеддингов (порекомендовали в чатике), результирующий тензор будет иметь значения, семплированные из $U(a, -a)$, где: $$a = gain \sqrt{\frac{6}{fan\_in + fan\_out}}$$

In [23]:
class SkipGramModelWithNegSampling(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(SkipGramModelWithNegSampling, self).__init__()
        self.embeddings_in = nn.Embedding(vocab_size, embedding_dim) # center
        self.embeddings_out = nn.Embedding(vocab_size, embedding_dim) # context
        
        # никакая логсигмоида нам не нужна! это все заложено в лоссе
        torch.nn.init.xavier_uniform_(self.embeddings_in.weight)
        torch.nn.init.xavier_uniform_(self.embeddings_out.weight)
        
    def forward(self, center_words, pos_context_words, neg_context_words):
        # center_words — входные слова
        # pos_context_words — таргет, т.е. правильный контекст (реально существующий для входного слова)
        # neg_context_words — отрицательные примеры — то что не должно быть в контексте

        v_in = self.embeddings_in(center_words) 
        v_out = self.embeddings_out(pos_context_words)
        v_neg = self.embeddings_out(neg_context_words)
        
        pos_scores = (torch.sum(v_in * v_out, dim=1))
        neg_scores = (torch.bmm(v_neg, v_in.unsqueeze(2)).squeeze(2)) #.sum(1) # bmm - батчевое (по 2D-матричное) перемножение матриц
        return pos_scores, neg_scores

In [24]:
device = torch.device("cpu")

Воспользуемся преимуществами API Pytorch: используем [`DataLoader`](https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader), для многократного ускорения обучения

In [25]:
from torch.utils.data import Dataset, DataLoader

class Word2VecDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

dataset = Word2VecDataset(context_pairs)
dataloader = DataLoader(dataset, batch_size=5000, shuffle=True)

In [40]:
vocab_size = len(word_to_index)
embedding_dim = 32
num_negatives = 15

model = SkipGramModelWithNegSampling(vocab_size, embedding_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.05) # оптимизатор параметров методом Adam
lr_scheduler = ReduceLROnPlateau(optimizer, factor=0.5, patience=30) # штука, которая будет уполовинивать (* factor = 1/2) lr при отсутствии улучшений в течение 150 эпох
criterion = nn.BCEWithLogitsLoss() # тот самый лосс, похож на логлосс

In [41]:
# проверяем корректность параметров модели (nn.Linear не нужны)
params_counter = 0
for weights in model.parameters():
    params_counter += weights.shape.numel()
assert params_counter == len(word_to_index) * embedding_dim * 2

In [42]:
def get_negative_samples(
    n_smpl,
    negative_sampling_prob_array,
    num_negatives,
):
    return np.random.choice(
                range(n_smpl),
                size=num_negatives,
                p=negative_sampling_prob_array,
            )

In [43]:
def train_skipgram_with_neg_sampling(
    model,
    context_pairs,
    keep_prob_array,
    word_to_index,
    batch_size,
    num_negatives,
    negative_sampling_prob_array,
    epohs,
    steps,
    optimizer=optimizer,
    lr_scheduler=lr_scheduler,
    device=device,
    
):
    pos_labels = torch.ones(batch_size).to(device)
    neg_labels = torch.zeros(batch_size, num_negatives).to(device)
    loss_history = []
    step = 0
    n_smpl = len(negative_sampling_prob_array)
    for epoh in tqdma(range(epohs)):
        if step > steps:
            break
        for target, context in dataloader:
            if step > steps:
                break
            center_words = target.long() # 
            pos_context_words = context.long()
            neg_context_words = torch.LongTensor(np.array([get_negative_samples(n_smpl, negative_sampling_prob_array, num_negatives) for t in center_words]))
            optimizer.zero_grad()
            pos_scores, neg_scores = model(
                center_words, pos_context_words, neg_context_words
            )
            loss_pos = criterion(pos_scores, pos_labels)
            loss_neg = criterion(neg_scores, neg_labels)
    
            loss = loss_pos + loss_neg
            loss.backward()
            optimizer.step()
    
            loss_history.append(loss.item())
            lr_scheduler.step(loss_history[-1])

            if step % 10 == 0:
                print(
                    f"Step {step}, Loss: {np.mean(loss_history[-10:])}, learning rate: {lr_scheduler._last_lr}"
                )
            step += 1

In [49]:
steps = 300
batch_size = 5000
epohs = 5
train_skipgram_with_neg_sampling(
    model,
    context_pairs,
    keep_prob_array,
    word_to_index,
    batch_size,
    num_negatives,
    negative_sampling_prob_array,
    epohs,
    steps,
)

  0%|          | 0/5 [00:00<?, ?it/s]

Step 0, Loss: 1.2087421417236328, learning rate: [0.00625]
Step 10, Loss: 1.209468710422516, learning rate: [0.00625]
Step 20, Loss: 1.212858831882477, learning rate: [0.00625]
Step 30, Loss: 1.2110777854919434, learning rate: [0.00625]
Step 40, Loss: 1.2062019944190978, learning rate: [0.00625]
Step 50, Loss: 1.202225649356842, learning rate: [0.00625]
Step 60, Loss: 1.2049126505851746, learning rate: [0.00625]
Step 70, Loss: 1.2037643432617187, learning rate: [0.00625]
Step 80, Loss: 1.2017034649848939, learning rate: [0.003125]
Step 90, Loss: 1.2031516790390016, learning rate: [0.003125]
Step 100, Loss: 1.200639808177948, learning rate: [0.003125]
Step 110, Loss: 1.2014903545379638, learning rate: [0.003125]
Step 120, Loss: 1.2050774931907653, learning rate: [0.003125]
Step 130, Loss: 1.1974157094955444, learning rate: [0.0015625]
Step 140, Loss: 1.1950206637382508, learning rate: [0.0015625]
Step 150, Loss: 1.198791217803955, learning rate: [0.0015625]
Step 160, Loss: 1.19687471389

In [50]:
_model_parameters = model.parameters()
embedding_matrix_center = next(
    _model_parameters
).detach()  # Assuming that first matrix was for central word
embedding_matrix_context = next(
    _model_parameters
).detach()  # Assuming that second matrix was for context word

In [51]:
def get_word_vector(word, embedding_matrix, word_to_index=word_to_index):
    return embedding_matrix[word_to_index[word]]

Проверки

In [52]:
similarity_1 = F.cosine_similarity(
    get_word_vector("iphone", embedding_matrix_context)[None, :],
    get_word_vector("apple", embedding_matrix_context)[None, :],
)
similarity_2 = F.cosine_similarity(
    get_word_vector("iphone", embedding_matrix_context)[None, :],
    get_word_vector("dell", embedding_matrix_context)[None, :],
)
assert similarity_1 > similarity_2

In [None]:
similarity_1 = F.cosine_similarity(
    get_word_vector("windows", embedding_matrix_context)[None, :],
    get_word_vector("laptop", embedding_matrix_context)[None, :],
)
similarity_2 = F.cosine_similarity(
    get_word_vector("windows", embedding_matrix_context)[None, :],
    get_word_vector("macbook", embedding_matrix_context)[None, :],
)
assert similarity_1 > similarity_2

Посмотрим на ближайщие слова по косинусной мере

In [54]:
def find_nearest(word, embedding_matrix, word_to_index=word_to_index, k=10):
    word_vector = get_word_vector(word, embedding_matrix)[None, :]
    dists = F.cosine_similarity(embedding_matrix, word_vector)
    index_sorted = torch.argsort(dists)
    top_k = index_sorted[-k:]
    return [(index_to_word[x], dists[x].item()) for x in top_k.numpy()]

In [55]:
find_nearest("python", embedding_matrix_context, k=10)

[('aspnet', 0.7178930044174194),
 ('backend', 0.7190329432487488),
 ('learning', 0.7227652072906494),
 ('programming', 0.7242833971977234),
 ('tool', 0.7321114540100098),
 ('linux', 0.7323721647262573),
 ('java', 0.7537800073623657),
 ('c', 0.7770878076553345),
 ('javascript', 0.7932676672935486),
 ('python', 0.9999998807907104)]

Визуализация

In [56]:
top_k = 5000
_top_words = sorted([x for x in word_count_dict.items()], key=lambda x: x[1])[
    -top_k - 100 : -100
]  # ignoring 100 most frequent words
top_words = [x[0] for x in _top_words]
del _top_words

In [57]:
word_embeddings = torch.cat(
    [embedding_matrix_context[word_to_index[x]][None, :] for x in top_words], dim=0
).numpy()

In [58]:
import bokeh.models as bm
import bokeh.plotting as pl
from bokeh.io import output_notebook

output_notebook()


def draw_vectors(
    x,
    y,
    radius=10,
    alpha=0.25,
    color="blue",
    width=600,
    height=400,
    show=True,
    **kwargs,
):
    """draws an interactive plot for data points with auxilirary info on hover"""
    if isinstance(color, str):
        color = [color] * len(x)
    data_source = bm.ColumnDataSource({"x": x, "y": y, "color": color, **kwargs})

    fig = pl.figure(active_scroll="wheel_zoom", width=width, height=height)
    fig.scatter("x", "y", size=radius, color="color", alpha=alpha, source=data_source)

    fig.add_tools(bm.HoverTool(tooltips=[(key, "@" + key) for key in kwargs.keys()]))
    if show:
        pl.show(fig)
    return fig

In [59]:
embedding = umap.UMAP(n_neighbors=5).fit_transform(word_embeddings)
draw_vectors(embedding[:, 0], embedding[:, 1], token=top_words)



In [None]:
# do not change the code in the block below
# __________start of block__________
import os
import json

assert os.path.exists(
    "words_subset.txt"
), "Please, download `words_subset.txt` and place it in the working directory"

with open("words_subset.txt") as iofile:
    selected_words = iofile.read().split("\n")


def get_matrix_for_selected_words(selected_words, embedding_matrix, word_to_index):
    word_vectors = []
    for word in selected_words:
        index = word_to_index.get(word, None)
        vector = [0.0] * embedding_matrix.shape[1]
        if index is not None:
            vector = embedding_matrix[index].numpy().tolist()
        word_vectors.append(vector)
    return word_vectors


word_vectors = get_matrix_for_selected_words(
    selected_words, embedding_matrix_context, word_to_index
)

with open("submission_dict.json", "w") as iofile:
    json.dump(word_vectors, iofile)
print("File saved to `submission_dict.json`")
# __________end of block__________