# Word2Vec (Negative Sampling)

Let's work on negative-sampling based implementation of word2vec.

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F

In [2]:
np.__version__, torch.__version__

('1.21.5', '2.5.1+cu118')

## 1. Define some very simple data for understanding

In [3]:
import nltk
nltk.download('brown')

[nltk_data] Downloading package brown to
[nltk_data]     C:\Users\Ekkar\AppData\Roaming\nltk_data...
[nltk_data]   Package brown is already up-to-date!


True

In [4]:
from nltk.corpus import brown
corpus = brown.sents(categories='news')

In [5]:
corpus

[['The', 'Fulton', 'County', 'Grand', 'Jury', 'said', 'Friday', 'an', 'investigation', 'of', "Atlanta's", 'recent', 'primary', 'election', 'produced', '``', 'no', 'evidence', "''", 'that', 'any', 'irregularities', 'took', 'place', '.'], ['The', 'jury', 'further', 'said', 'in', 'term-end', 'presentments', 'that', 'the', 'City', 'Executive', 'Committee', ',', 'which', 'had', 'over-all', 'charge', 'of', 'the', 'election', ',', '``', 'deserves', 'the', 'praise', 'and', 'thanks', 'of', 'the', 'City', 'of', 'Atlanta', "''", 'for', 'the', 'manner', 'in', 'which', 'the', 'election', 'was', 'conducted', '.'], ...]

In [6]:
#get word sequences and unique words
flatten = lambda l: [item for sublist in l for item in sublist]
vocab = list(set(flatten(corpus)))
vocab[:10]

['Tulsa',
 'soldiers',
 'Steve',
 'adapting',
 'event',
 'oases',
 'conspicuously',
 'drill',
 'enlivened',
 'junta']

In [7]:
#numericalization
word2index = {w: i for i, w in enumerate(vocab)}
print(word2index)



In [8]:
#vocab size
voc_size = len(vocab)
print(voc_size)

14394


In [9]:
#append UNK
vocab.append('<UNK>')

In [10]:
word2index['<UNK>'] = 0

In [11]:
#just in case we need to use
index2word = {v:k for k, v in word2index.items()} 

## 2. Prepare train data

In [12]:
# for c in corpus:
#     print(c)

In [13]:
def random_batch(batch_size, word_sequence):
    
    window_size = 2
    # Make skip gram of one size window
    skip_grams = []
    # loop each word sequence
    # we starts from 1 because 0 has no context
    # we stop at second last for the same reason
    for sent in corpus:
        for i in range(window_size, len(sent) - window_size):
            target = word2index[sent[i]]

            context = []
            for j in range(1, window_size):
                context.append(word2index[sent[i - j]])
                context.append(word2index[sent[i + j]])

            # for each outside word, append to a skip_grams
            for w in context:
                skip_grams.append([target, w])
    
    random_inputs = []
    random_labels = []
    random_index = np.random.choice(range(len(skip_grams)), batch_size, replace=False) #randomly pick without replacement
        
    for i in random_index:
        random_inputs.append([skip_grams[i][0]])  # target, e.g., 2
        random_labels.append([skip_grams[i][1]])  # context word, e.g., 3
            
    return np.array(random_inputs), np.array(random_labels)

### Testing the method

In [14]:
#testing the method
batch_size = 2 # mini-batch size
input_batch, target_batch = random_batch(batch_size, corpus)

print("Input: ",  input_batch)
print("Target: ", target_batch)

#we will convert them to tensor during training, so don't worry...

Input:  [[ 1374]
 [12330]]
Target:  [[1022]
 [9720]]


In [15]:
input_batch.shape, target_batch.shape

((2, 1), (2, 1))

## 3. Negative Sampling

### Unigram distribution

$$P(w)=U(w)^{3/4}/Z$$

In [16]:
Z = 0.001

In [17]:
from collections import Counter

word_count = Counter(flatten(corpus))
num_total_words = sum([c for w, c in word_count.items()])

In [18]:
word_count['as']

481

In [19]:
num_total_words

100554

In [20]:
unigram_table = []

for vo in vocab:
    unigram_table.extend([vo] * int(((word_count[vo]/num_total_words)**0.75)/Z))

In [21]:
Counter(unigram_table)

Counter({'event': 1,
         'plus': 1,
         'seemed': 1,
         'What': 1,
         'done': 1,
         'led': 1,
         'while': 2,
         'passed': 1,
         'Austin': 1,
         'what': 4,
         'carry': 1,
         'spirit': 1,
         'civil': 1,
         'we': 4,
         "Kennedy's": 1,
         'battle': 1,
         'counties': 1,
         'worth': 1,
         'Other': 1,
         'front': 1,
         'private': 1,
         'junior': 1,
         'billion': 1,
         'Sen.': 1,
         'condition': 1,
         'School': 1,
         'vote': 2,
         'result': 2,
         'term': 1,
         'up': 8,
         'Texas': 3,
         'ball': 2,
         'All': 1,
         'level': 1,
         'farm': 1,
         'Congo': 2,
         'land': 1,
         'pay': 2,
         'leading': 1,
         'relations': 1,
         'bank': 1,
         'color': 1,
         'collection': 1,
         'period': 1,
         'call': 1,
         'Robert': 2,
         'E.': 2,
    

### Negative Sampling

In [22]:
import random

def prepare_sequence(seq, word2index):
    idxs = list(map(lambda w: word2index[w] if word2index.get(w) is not None else word2index["<UNK>"], seq))
    return torch.LongTensor(idxs)

def negative_sampling(targets, unigram_table, k):
    batch_size = targets.size(0)
    neg_samples = []
    for i in range(batch_size):
        nsample = []
        target_index = targets[i].item()
        while len(nsample) < k: # num of sampling
            neg = random.choice(unigram_table)
            if word2index[neg] == target_index:
                continue
            nsample.append(neg)
        neg_samples.append(prepare_sequence(nsample, word2index).view(1, -1))
    return torch.cat(neg_samples)

### Testing the negative sampling

In [23]:
input_batch  = torch.Tensor(input_batch)
target_batch = torch.LongTensor(target_batch)

In [24]:
target_batch.shape

torch.Size([2, 1])

In [25]:
input_batch

tensor([[ 1374.],
        [12330.]])

In [26]:
num_neg = 3
negative_sampling(target_batch, unigram_table, num_neg)

tensor([[11938, 12330, 11938],
        [12781,  7904, 11920]])

In [27]:
target_batch[1]

tensor([9720])

## 4. Model

$$\mathbf{J}_{\text{neg-sample}}(\mathbf{v}_c,o,\mathbf{U})=-\log(\sigma(\mathbf{u}_o^T\mathbf{v}_c))-\sum_{k=1}^K\log(\sigma(-\mathbf{u}_k^T\mathbf{v}_c))$$

In [28]:
class SkipgramNegSampling(nn.Module):
    
    def __init__(self, vocab_size, emb_size, word2index):
        super(SkipgramNegSampling, self).__init__()
        self.embedding_v = nn.Embedding(vocab_size, emb_size) # center embedding
        self.embedding_u = nn.Embedding(vocab_size, emb_size) # out embedding
        self.logsigmoid = nn.LogSigmoid()
        self.word2index  = word2index
                    
    def forward(self, center_words, target_words, negative_words):
        center_embeds = self.embedding_v(center_words) # [batch_size, 1, emb_size]
        target_embeds = self.embedding_u(target_words) # [batch_size, 1, emb_size]
        neg_embeds    = -self.embedding_u(negative_words) # [batch_size, num_neg, emb_size]
        
        positive_score = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
        #[batch_size, 1, emb_size] @ [batch_size, emb_size, 1] = [batch_size, 1, 1] = [batch_size, 1]
        
        negative_score = neg_embeds.bmm(center_embeds.transpose(1, 2))
        #[batch_size, k, emb_size] @ [batch_size, emb_size, 1] = [batch_size, k, 1]
        
        loss = self.logsigmoid(positive_score) + torch.sum(self.logsigmoid(negative_score), 1)
                
        return -torch.mean(loss)
    
    def get_embed(self, word):
        word2index = self.word2index
        
        try:
            index = word2index[word]
        except:
            index = word2index['<UNK>']
            
        word = torch.LongTensor([index])
        
        embed_c = self.embedding_v(word)
        embed_o = self.embedding_u(word)
        embed   = (embed_c + embed_o) / 2
        
        return embed[0][0].item(), embed[0][1].item()
    
    def prediction(self, inputs):
        embeds = self.embedding_v(inputs)
        
        return embeds
    

## 5. Training

In [29]:
batch_size     = 2 # mini-batch size
embedding_size = 2 #so we can later plot
model          = SkipgramNegSampling(voc_size, embedding_size, word2index)
num_neg        = 10 # num of negative sampling

optimizer = optim.Adam(model.parameters(), lr=0.001)

In [30]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [31]:
import time

# Training
num_epochs = 100
start = time.time()
for epoch in range(num_epochs):
    
    input_batch, target_batch = random_batch(batch_size, corpus)
    
    #input_batch: [batch_size, 1]
    input_batch = torch.LongTensor(input_batch)
    
    #target_batch: [batch_size, 1]
    target_batch = torch.LongTensor(target_batch)
    
    #negs_batch:   [batch_size, num_neg]
    negs_batch = negative_sampling(target_batch, unigram_table, num_neg)
    
    optimizer.zero_grad()
        
    loss = model(input_batch, target_batch, negs_batch)
    
    end = time.time()
    
    epoch_mins, epoch_secs = epoch_time(start, end)
    
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch: {epoch + 1} | cost: {loss:.6f} | time: {epoch_mins}m {epoch_secs}s")

print(f"\nComplete: \nTotal Loss: {loss:2.2f} | Time Taken: {epoch_mins} minutes and {epoch_secs} seconds")



Epoch: 10 | cost: 7.456579 | time: 0m 4s
Epoch: 20 | cost: 8.779901 | time: 0m 8s
Epoch: 30 | cost: 11.821223 | time: 0m 12s
Epoch: 40 | cost: 9.472773 | time: 0m 17s
Epoch: 50 | cost: 7.429150 | time: 0m 21s
Epoch: 60 | cost: 8.852638 | time: 0m 25s
Epoch: 70 | cost: 8.175594 | time: 0m 30s
Epoch: 80 | cost: 10.665074 | time: 0m 34s
Epoch: 90 | cost: 6.503552 | time: 0m 38s
Epoch: 100 | cost: 12.668316 | time: 0m 42s

Complete: 
Total Loss: 12.67 | Time Taken: 0 minutes and 42 seconds


In [32]:
def open_file(path_to_file):
    content = []  # Initialize content to an empty list to avoid returning None
    try:
        with open(path_to_file, 'r') as file:
            content = file.readlines()  # Read all lines of the file into a list
    except FileNotFoundError:
        print(f"The file {path_to_file} does not exist.")  # File not found error
    except Exception as e:
        print(f"An error occurred: {e}")  # Handle any other exceptions (e.g., permission issues)

    return content  # Return content even if it's empty, but not None


In [33]:
file_path = "file/word-test.v1.1.txt"

content = open_file(file_path)

semantic = []
syntatic = []

current_test = semantic
for sent in content:
    if sent[0] == ':':
        current_test = syntatic
        continue
    
    current_test.append(sent.strip())

In [34]:
vector_space = []

for word in vocab:
    vector_space.append(model.get_embed(word))

vector_space = np.array(vector_space)

In [35]:
#scipy version
from scipy import spatial

def cos_sim(a, b):
    norm_a = a / np.linalg.norm(a)  # Normalize vector a
    norm_b = b / np.linalg.norm(b)  # Normalize vector b
    return 1 - spatial.distance.cosine(norm_a, norm_b)  # Cosine similarity after normalization


def cos_sim_scores(vector_space, target_vector):
    scores = []
    for each_vect in vector_space:
        scores.append(cos_sim(target_vector, each_vect))

    return np.array(scores)

def similarity(model, test_data):
    words = test_data.split(" ")
    embeddings = [np.array(model.get_embed(word)) for word in words[:3]]  # Precompute embeddings for all words
    embed0, embed1, embed2 = embeddings  # Unpack embeddings
    similar_vector = embed1 - embed0 + embed2  # Perform vector arithmetic

    similarity_scores = cos_sim_scores(vector_space, similar_vector)
    max_score_idx = np.argmax(similarity_scores)
    similar_word = index2word[max_score_idx]

    return similar_word == words[3]  # Directly return the result

Semantic accuracy

In [36]:
sem_total = len(semantic)
sem_correct = 0
for sent in semantic:
    if similarity(model, sent):
        sem_correct += 1

sem_accuracy = sem_correct / sem_total
print(f"Semantic accuracy: {sem_accuracy:2.2f}")

Semantic accuracy: 0.00


Syntactic Accuracy

In [37]:
syn_total = len(syntatic)
syn_correct = 0
for sent in syntatic:
    if similarity(model, sent):
        syn_correct += 1

syn_accuracy = syn_correct / syn_total
print(f"Syntatic accuracy: {syn_accuracy:2.2f}")

Syntatic accuracy: 0.00


Similarity Accuracy

In [38]:
file_path = "file/wordsim_similarity_goldstandard.txt"

content = open_file(file_path)

sim_data = []

for sent in content:
    sim_data.append(sent.strip())

In [39]:
def compute_similarity(model, test_data):
    words = test_data.split("\t")

    embed0 = np.array(model.get_embed(words[0].strip()))
    embed1 = np.array(model.get_embed(words[1].strip()))

    similarity_model = embed1 @ embed0.T
    similarity_provided = float(words[2].strip())

    return similarity_provided, similarity_model

In [40]:
ds_scores = []
model_scores = []
for sent in sim_data:
    ds_score, model_score = compute_similarity(model, sent)

    ds_scores.append(ds_score)
    model_scores.append(model_score)

In [41]:
from scipy.stats import spearmanr

correlation = spearmanr(ds_scores, model_scores)[0]

print(f"Correlation between the dataset similarity metrics and models’ dot product is {correlation:2.2f}.")

Correlation between the dataset similarity metrics and models’ dot product is 0.12.


In [42]:
import torch
import pickle

# Define the folder where want to save the files
model_folder = 'model'  # Change this to your desired folder path

# Save the model's state_dict
torch.save(model.state_dict(), f'{model_folder}/neg.model')

# Save the arguments (such as voc_size, emb_size, word2index)
neg_args = {
    'vocab_size': voc_size,
    'emb_size': embedding_size,
    'word2index': word2index,
}
with open(f'{model_folder}/neg.args', 'wb') as f:
    pickle.dump(neg_args, f)

print(f"Model and arguments saved to {model_folder}")


Model and arguments saved to model


In [43]:
import torch
import pickle

# Define the folder where the files are saved
model_folder = 'model'  # Change this to the folder where you saved the files

# Load the arguments from the pickle file
with open(f'{model_folder}/neg.args', 'rb') as f:
    neg_args = pickle.load(f)

# Define the model class and initialize it with the loaded arguments
# Make sure the model class and arguments match the training code
model_neg = SkipgramNegSampling(**neg_args)  # Assuming you have a neg model class

# Now, load the model weights (this should be from neg.model, not neg.args)
model_neg.load_state_dict(torch.load(f'{model_folder}/neg.model'))

# Now the model is loaded with the arguments and weights, and you're ready to use it
model_neg.eval()  # Set the model to evaluation mode if you're not training

print("Model loaded successfully.")


Model loaded successfully.


  model_neg.load_state_dict(torch.load(f'{model_folder}/neg.model'))


In [44]:
model_neg.get_embed('sad')

(-0.6971598267555237, -1.3981492519378662)

## 6. Plotting the embeddings

In [None]:
#list of vocabs
vocab[:10]

In [32]:
word = vocab[0]

In [None]:
#numericalization
id = word2index[word]
id

In [None]:
id_tensor = torch.LongTensor([id])
id_tensor

In [None]:
#get the embedding by averaging
v_embed = model.embedding_v(id_tensor)
u_embed = model.embedding_u(id_tensor)

v_embed, u_embed

In [None]:
#average to get the word embedding
word_embed = (v_embed + u_embed) / 2
word_embed[0][1]

In [37]:
#let's write a function to get embedding given a word
def get_embed(word):
    id_tensor = torch.LongTensor([word2index[word]])
    v_embed = model.embedding_v(id_tensor)
    u_embed = model.embedding_u(id_tensor) 
    word_embed = (v_embed + u_embed) / 2 
    x, y = word_embed[0][0].item(), word_embed[0][1].item()

    return x, y

In [None]:
plt.figure(figsize=(6,3))
for i, word in enumerate(vocab[:20]): #loop each unique vocab
    x, y = get_embed(word)
    plt.scatter(x, y)
    plt.annotate(word, xy=(x, y), xytext=(5, 2), textcoords='offset points')
plt.show()

## 7. Cosine similarity

Formally the [Cosine Similarity](https://en.wikipedia.org/wiki/Cosine_similarity) $s$ between two vectors $p$ and $q$ is defined as:

$$s = \frac{p \cdot q}{||p|| ||q||}, \textrm{ where } s \in [-1, 1] $$ 

If $p$ and $q$ is super similar, the result is 1 otherwise 0.

In [None]:
vocab

In [40]:
#let's try similarity between first and second, and second and third
cat          = get_embed('cat')
fruit        = get_embed('fruit')
animal       = get_embed('animal')

In [None]:
#numpy version
from numpy import dot
from numpy.linalg import norm

def cos_sim(a, b):
    cos_sim = dot(a, b)/(norm(a)*norm(b))
    return cos_sim
    
print(f"cat vs. fruit: ",        cos_sim(cat, fruit))
print(f"cat vs. animal: ",       cos_sim(cat, animal))
print(f"cat vs. cat: ",          cos_sim(cat, cat))

In [None]:
#scipy version
from scipy import spatial

def cos_sim(a, b):
    cos_sim = 1 - spatial.distance.cosine(a, b)  #distance = 1 - similarlity, because scipy only gives distance
    return cos_sim

print(f"cat vs. fruit: ",        cos_sim(cat, fruit))
print(f"cat vs. animal: ",       cos_sim(cat, animal))
print(f"cat vs. cat: ",          cos_sim(cat, cat))