In [1]:
import time

import numpy as np
import matplotlib.pyplot as plt

from abc import abstractmethod, ABC
from typing import Optional, Literal

from numpy.typing import NDArray

np.random.seed(1234)

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.rcParams["figure.figsize"] = [16, 9]

### Handy utility functions

In [2]:
def zeros(*dims: int) -> NDArray[np.float32]:
    return np.zeros(shape=tuple(dims), dtype=np.float32)


def ones(*dims: int) -> NDArray[np.float32]:
    return np.ones(shape=tuple(dims), dtype=np.float32)


def rand(*dims: int) -> NDArray[np.float32]:
    return np.random.rand(*dims).astype(np.float32)


def randn(*dims: int) -> NDArray[np.float32]:
    return np.random.randn(*dims).astype(np.float32)


def sigmoid(x: NDArray) -> NDArray:
    return 1.0 / (1.0 + np.exp(-x))

# Word2Vec

In [3]:
import pickle
import gzip

with gzip.open("text8.dat.gz", "rb") as f:
    train_dict, train_set, train_tokens = pickle.load(f)

train_set = np.random.permutation(train_set)

In [4]:
from collections import namedtuple

Config = namedtuple(
    "Config",
    (
        "dict_size",
        "vect_size",
        "neg_samples",
        "updates",
        "learning_rate",
        "learning_rate_decay",
        "decay_period",
        "log_period",
    ),
)

config = Config(
    dict_size=len(train_dict),
    vect_size=100,
    neg_samples=10,
    updates=5_000_000,
    learning_rate=0.1,
    learning_rate_decay=0.995,
    decay_period=10_000,
    log_period=10_000,
)

## Negative Sampling

In order to train distributed word representations, first:
 - calculate gradient of the cost function with respect to the `word_vector` and store in `word_grad`.
 - calculate gradient of the cost function with respect to the `context_vect` and store in `context_grad`.
 - calculate gradient of the cost function with respect to the sampled `negative_vects` and store in neg_context_grad.

In [5]:
def neg_sample(
    config: Config,
    train_set: NDArray[np.int64],
    train_tokens: NDArray[np.int64],
) -> tuple[NDArray, NDArray, float]:
    lr = config.learning_rate
    loss = 0.0
    V_p = randn(config.dict_size, config.vect_size)
    V_o = randn(config.dict_size, config.vect_size)

    for i in range(config.updates):
        w_idx: int = train_set[i % len(train_set), 0]
        c_idx: int = train_set[i % len(train_set), 1]
        n_idx: NDArray[np.int64] = np.random.randint(0, len(train_tokens), config.neg_samples)
        n_idx: NDArray[np.int64] = train_tokens[n_idx]

        w = V_p[w_idx, :]  # word vector, shape `(dim,)`
        c = V_o[c_idx, :]  # context vector, shape `(dim,)`
        n = V_o[n_idx, :]  # sampled noise vectors, shape `(k, dim)`

        # Cost and gradient calculation
        # -----------------------------
        σ_p = sigmoid(+w @ c.T)  # shape `(1,)`
        σ_n = sigmoid(-w @ n.T)  # shape `(k,)`
        loss -= np.log(σ_p) + np.sum(np.log(σ_n))

        if (i + 1) % config.log_period == 0:
            print(f"Update {i+1}\tLoss: {loss / config.log_period:>2.2f}")
            final_loss = loss / config.log_period
            loss = 0.0

        grad_w = (σ_p - 1.0) * c + (1.0 - σ_n) @ n
        grad_c = (σ_p - 1.0) * w
        grad_n = (1.0 - σ_n).reshape(-1, 1) * w

        V_p[w_idx, :] -= lr * grad_w
        V_o[c_idx, :] -= lr * grad_c
        V_o[n_idx, :] -= lr * grad_n

        if i % config.decay_period == 0:
            lr = lr * config.learning_rate_decay

    return V_p, V_o, final_loss

In [None]:
# def neg_sample(conf, train_set, train_tokens):
#     Vp = randn(conf.dict_size, conf.vect_size)
#     Vo = randn(conf.dict_size, conf.vect_size)

#     J = 0.0
#     learning_rate = conf.learning_rate
#     for i in range(conf.updates):
#         idx = i % len(train_set)

#         word    = train_set[idx, 0]
#         context = train_set[idx, 1]
        
#         neg_context = np.random.randint(0, len(train_tokens), conf.neg_samples)
#         neg_context = train_tokens[neg_context]

#         word_vect = Vp[word, :]              # word vector
#         context_vect = Vo[context, :];       # context wector
#         negative_vects = Vo[neg_context, :]  # sampled negative vectors

#         # Cost and gradient calculation starts here
#         score_pos = word_vect @ context_vect.T
#         score_neg = word_vect @ negative_vects.T

#         J -= np.log(sigmoid(score_pos)) + np.sum(np.log(sigmoid(-score_neg)))
#         if (i + 1) % conf.log_period == 0:
#             print('Update {0}\tcost: {1:>2.2f}'.format(i + 1, J / conf.log_period))
#             final_cost = J / conf.log_period
#             J = 0.0

#         pos_g = 1.0 - sigmoid(score_pos)
#         neg_g = sigmoid(score_neg)

#         # Note: use pos_g and neg_g will simplify gradient calculation.
#         # raise Exception("Unimplemented")
        
#         word_grad = -pos_g * context_vect + neg_g @ negative_vects
#         context_grad = -pos_g * word_vect
#         neg_context_grad = np.outer(neg_g, word_vect)

#         Vp[word, :] -= learning_rate * word_grad
#         Vo[context, :] -= learning_rate * context_grad
#         Vo[neg_context, :] -= learning_rate * neg_context_grad

#         if i % conf.decay_period == 0:
#             learning_rate = learning_rate * conf.learning_rate_decay

#     return Vp, Vo, final_cost

In [7]:
V_p, V_o, loss = neg_sample(config, train_set, train_tokens)

Update 10000	cost: 36.18
Update 20000	cost: 28.55
Update 30000	cost: 23.31
Update 40000	cost: 19.56
Update 50000	cost: 17.02


KeyboardInterrupt: 

### Word similarity

In [None]:
def lookup_word_idx(word, word_dict):
    try:
        return np.argwhere(np.array(word_dict) == word)[0][0]
    except:
        raise Exception("No such word in dict: {}".format(word))


def similar_words(embeddings, word, word_dict, hits):
    word_idx = lookup_word_idx(word, word_dict)
    similarity_scores = embeddings @ embeddings[word_idx]
    similar_word_idxs = np.argsort(-similarity_scores)
    return [word_dict[i] for i in similar_word_idxs[:hits]]

In [None]:
print("\n\nTraining cost: {0:>2.2f}\n\n".format(loss))

Vp_norm = V_p / np.linalg.norm(V_p, axis=1).reshape(-1, 1)
for w in ["zero", "computer", "cars", "home", "album"]:
    similar = similar_words(Vp_norm, w, train_dict, 5)
    print("Words similar to {}: {}".format(w, ", ".join(similar)))



Training cost: 3.02


Words similar to zero: zero, acres, nautical, pmid, hardcover
Words similar to computer: computer, computers, hardware, macintosh, amiga
Words similar to cars: cars, manufactured, lighter, diesel, collect
Words similar to home: home, stadium, cleveland, gardens, houston
Words similar to album: album, albums, song, bowie, concert
