In [20]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import re
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')
stopwords = stopwords.words('russian')

In [2]:
with open("idiot.txt", "r") as f:
    text = f.read()

In [3]:
def tokenize(text):
    pattern = re.compile(r'[А-яа-я]+[\w^\']*|[\w^\']*[А-яа-я]+[\w^\']*')
    tokens = pattern.findall(text.lower())
    tokens = [tok for tok in tokens if tok not in stopwords]
    return tokens

In [4]:
train_text = text[:100000]
tokens = tokenize(train_text)

In [None]:
class Word2Vec:
    def __init__(self, tokens, embedding_dim):
        self.tokens = tokens
        self.make_vocab()
        self.weight_params = self.init_weights(vocab_size=len(self.word2index), embedding_dim=embedding_dim)

    def init_weights(self, vocab_size, embedding_dim):
        params = {
            "W1": np.random.uniform(vocab_size, embedding_dim),
            "W2": np.random.uniform(embedding_dim, vocab_size),
        }
        return params

    def make_vocab(self):
        """
        Making vocab for w2v
        """
        word_index = {}
        index_word = {}
        for ind, tok in enumerate(set(self.tokens)):
            word_index[tok] = ind
            index_word[ind] = tok
        self.word2index = word_index
        self.index2word = index_word

    def concat(self, *iterables):
        for iterable in iterables:
            yield from iterable

    def one_hot_encode(self, id, vocab_size):
        res = [0] * vocab_size
        res[id] = 1
        return res

    def generate_training_data(self, window_size: int = 2):
        X = []
        y = []
        n_tokens = len(self.tokens)
        
        for i in range(n_tokens):
            idx = self.concat(
                range(max(0, i - window_size), i), 
                range(i, min(n_tokens, i + window_size + 1))
            )
            for j in idx:
                if i == j:
                    continue
                X.append(self.one_hot_encode(self.word2index[self.tokens[i]], len(self.word2index)))
                y.append(self.one_hot_encode(self.word2index[self.tokens[j]], len(self.word2index)))
        
        return np.asarray(X), np.asarray(y)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)
    
    def forward_pass(self, input_batch):
        """
        input_batch: (batch_size, vocab_size)
        """
        hidden = input_batch @ self.weight_params["W1"]  # (batch_size, embedding_dim)
        scores = hidden @ self.weight_params["W2"]       # (batch_size, vocab_size)
        return scores, self.softmax(scores)
    
    def backward_pass(self, input_batch, target_batch, predicted_batch, learning_rate=0.01):
        """
        input_batch: (batch_size, vocab_size)
        target_batch: (batch_size, vocab_size)
        predicted_batch: (batch_size, vocab_size)
        """
        batch_size = input_batch.shape[0]
        error = predicted_batch - target_batch  # (batch_size, vocab_size)

        hidden = input_batch @ self.weight_params["W1"]  # (batch_size, embedding_dim)

        dW2 = hidden.T @ error  # (embedding_dim, vocab_size)
        dh = error @ self.weight_params["W2"].T  # (batch_size, embedding_dim)
        dW1 = input_batch.T @ dh  # (vocab_size, embedding_dim)

        self.weight_params["W1"] -= learning_rate * dW1 / batch_size
        self.weight_params["W2"] -= learning_rate * dW2 / batch_size

    def most_similar(self, word, top_n=5):
        if word not in self.word2index:
            print(f"'{word}' not in vocabulary.")
            return

        idx = self.word2index[word]
        target_vec = self.weight_params["W1"][idx]
        
        def cosine_similarity(a, b):
            return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8)

        similarities = []
        for i in range(len(self.index2word)):
            if i == idx:
                continue
            vec = self.weight_params["W1"][i]
            sim = cosine_similarity(target_vec, vec)
            similarities.append((self.index2word[i], sim))

        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_n]


In [24]:
def batchify(X, y, batch_size: int):
    for ind in tqdm(range(0, X.shape[0], batch_size), total=X.shape[0] // batch_size):
        X_batch = X[ind: ind + batch_size, :]
        y_batch = y[ind: ind + batch_size, :]
        yield X_batch, y_batch

In [43]:
wv = Word2Vec(tokens=tokens, embedding_dim=100)
X, y = wv.generate_training_data(window_size=3)

In [44]:
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

def visualize_embeddings(model, epoch=None):
    W = model.weight_params["W1"]
    words = list(model.word2index.keys())[:50]
    
    pca = PCA(n_components=2)
    W_pca = pca.fit_transform(W)

    plt.figure(figsize=(8, 6))
    for i, word in enumerate(words):
        x, y = W_pca[i]
        plt.scatter(x, y)
        plt.text(x + 0.01, y + 0.01, word, fontsize=9)

    title = f"Word Embeddings at Epoch {epoch}" if epoch is not None else "Word Embeddings"
    plt.title(title)
    plt.grid(True)
    plt.show()

In [45]:
def training(n_epochs, model, X, y, batch_size, learning_rate=0.01, visualize_every: int = 5):
    for epoch in range(n_epochs):
        total_loss = 0
        for X_batch, y_batch in batchify(X, y, batch_size):
            scores, preds = model.forward_pass(X_batch)

            log_preds = np.log(preds + 1e-8)
            batch_loss = -np.sum(y_batch * log_preds)
            total_loss += batch_loss

            model.backward_pass(X_batch, y_batch, preds, learning_rate)

        print(f"Epoch {epoch + 1}/{n_epochs} | Loss: {total_loss:.4f}")
        if (epoch + 1) % visualize_every == 0 or epoch == n_epochs - 1:
            visualize_embeddings(model, epoch + 1)

In [48]:
n_epochs = 100
batch_size = 32
learning_rate = 0.1

training(n_epochs=n_epochs, model=wv, X=X, y=y, batch_size=batch_size, learning_rate=learning_rate, visualize_every=10)
