##**Skip-gram model from scratch using a sample corpus**

In [1]:
corpus = [
    'he is a king is',
    'she is a queen',
    'he is a man',
    'she is a woman',
    'warsaw is poland capital',
    'berlin is germany capital',
    'paris is france capital',
]


In [6]:
def tokenize_corpus(corpus):
    tokens = [x.split() for x in corpus]
    return tokens

tokenized_corpus = tokenize_corpus(corpus)

corpus_list = []
for sentence in tokenized_corpus:
    for token in sentence:
      corpus_list.append(token)

In [15]:
vocabulary = []
for sentence in tokenized_corpus:
    for token in sentence:
        if token not in vocabulary:
            vocabulary.append(token)

word2idx = {w: idx for (idx, w) in enumerate(vocabulary)}
idx2word = {idx: w for (idx, w) in enumerate(vocabulary)}

vocabulary_size = len(vocabulary)

##**Skip-gram Model**

In [20]:
import numpy as np
from collections import defaultdict
import random
from tqdm import tqdm

class SkipGram:
    def __init__(self, corpus, vocab_size, embedding_size, neg_sampling_rate, window_size, learning_rate=0.01):
        self.corpus = corpus
        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.neg_sampling_rate = neg_sampling_rate
        self.window_size = window_size
        self.learning_rate = learning_rate

        # Initialize the word vectors randomly
        self.word_vectors = np.random.randn(vocab_size, embedding_size)

        # Initialize the context vectors to zeros
        self.context_vectors = np.random.randn(vocab_size, embedding_size)
        
        # Initialize the biases to zeros
        self.word_biases = np.zeros(shape=(vocab_size,))
        self.context_biases = np.zeros(shape=(vocab_size,))

    # Define sigmoid function
    def sigmoid(self, x):
      return 1 / (1 + np.exp(-x)) 

    # Build the word frequency table for negative sampling
    def get_negative_prob(self, corpus):
      word_freq = defaultdict(int)
      for word in self.corpus:
        word_freq[word] += 1
        
      total_word_freq = sum(word_freq.values())
      word_probs = {word: freq / total_word_freq for word, freq in word_freq.items()}

      # Negative sampling probabilities
      noise_dist = {key: val ** (3/4) for key, val in word_probs.items()}
      Z = sum(noise_dist.values())
      noise_dist_normalized = {key: val / Z for key, val in noise_dist.items()}
      return noise_dist_normalized

    def get_negative_samples(self, context_word):
      negative_samples = []
      noise_dist_normalized = self.get_negative_prob(corpus)

      while len(negative_samples) < self.neg_sampling_rate:
        sample_list = np.random.choice(list(noise_dist_normalized.keys()), size = self.neg_sampling_rate, p=list(noise_dist_normalized.values()))
        for sample in sample_list:
          if sample != context_word and sample not in negative_samples:
            negative_samples.append(sample)
            return negative_samples
        
        
    def train(self, num_epochs):
        for epoch in tqdm(range(num_epochs)):
            loss = 0
            
            for i, word in enumerate(self.corpus):
                # Get the context words for this center word
                context_words = self.corpus[max(0, i - self.window_size) : i] + self.corpus[i + 1 : i + self.window_size + 1]
                center_word_id = word2idx[word]
                
                # Loop over each context word and update the embeddings
                for context_word_i in context_words:
                    context_word_id = word2idx[context_word_i]
                    # Perform negative sampling to get negative samples
                    negative_samples = self.get_negative_samples(context_word_i)
                    
                    # Update the center word vector and bias
                    center_vector = self.word_vectors[center_word_id]
                    center_bias = self.word_biases[center_word_id]
                    context_vector = self.context_vectors[context_word_id]
                    context_bias = self.context_biases[context_word_id]
                    
                    pos_score = np.dot(center_vector, np.transpose(context_vector)) + center_bias + context_bias
                    pos_score_grad = self.sigmoid(pos_score) - 1
                    
                    center_vector_grad = pos_score_grad * context_vector
                    center_bias_grad = pos_score_grad
                    context_vector_grad = pos_score_grad * center_vector
                    context_bias_grad = pos_score_grad
                    
                    loss -= np.log(self.sigmoid(pos_score))
                    
                    # Loop over each negative sample and update the embeddings
                    for negative_sample in negative_samples:
                        negative_sample_id = word2idx[negative_sample]
                        negative_vector = self.context_vectors[negative_sample_id]
                        negative_bias = self.context_biases[negative_sample_id]
                        
                        neg_score = np.dot(center_vector, np.transpose(negative_vector)) + center_bias + negative_bias
                        neg_score_grad = self.sigmoid(neg_score)
                        
                        center_vector_grad += neg_score_grad * negative_vector
                        center_bias_grad += neg_score_grad
                        negative_vector_grad = neg_score_grad * center_vector
                        negative_bias_grad = neg_score_grad
                        
                        loss -= np.log(self.sigmoid(-neg_score))
                        
                        # Update the negative sample context vector and bias
                        self.context_vectors[negative_sample_id] -= self.learning_rate * negative_vector_grad
                        self.context_biases[negative_sample_id] -= self.learning_rate * negative_bias_grad
                        
                    # Update the center word vector and bias
                    self.word_vectors[center_word_id] -= self.learning_rate * center_vector_grad
                    self.word_biases[center_word_id] -= self.learning_rate * center_bias_grad

            print(f"Loss after epoch {epoch}: {loss / len(corpus)}")

        return self, self.word_vectors



In [21]:
SkipGram(corpus_list, vocab_size = vocabulary_size, embedding_size = 3, neg_sampling_rate=2, window_size=2, learning_rate=0.01).train(5)

100%|██████████| 5/5 [00:00<00:00, 96.40it/s]

Loss after epoch 0: 25.311524390080763
Loss after epoch 1: 24.868888011461735
Loss after epoch 2: 23.53942109438085
Loss after epoch 3: 23.190190106772867
Loss after epoch 4: 25.45003104618511





(<__main__.SkipGram at 0x7f9698e90a90>,
 array([[-0.59385423,  0.51233554,  1.06558124],
        [-0.79228443, -0.44279977, -0.62379612],
        [ 0.22536055,  1.00681264,  0.29579073],
        [ 1.45673005,  1.28858807, -0.9740576 ],
        [ 0.46564592, -1.10070552,  0.18261535],
        [ 0.62612754, -1.58964195, -0.28565742],
        [-1.0365158 , -1.65562567, -0.54116416],
        [-0.22185968,  0.96660497,  0.70797113],
        [ 0.95787363,  0.47526893,  1.56466027],
        [-0.64947276,  0.48866358,  0.18961807],
        [ 0.27125167,  0.92446307,  0.27966786],
        [ 0.49710557,  0.22309339,  0.24039582],
        [-1.004651  , -1.21505755, -0.17143081],
        [-0.14698578,  1.96582328,  0.31332156],
        [-0.94319637, -1.29849078, -0.66970296]]))