# Goals of this project
The aim of this project was to act as a simple exploratory project to practice building a fairly fundamental tool in NLP.

I learned the concepts behind the word2vec model, and while it was fairly understandable I wanted to see how it would translate to code.

I also got to practice working more with the pytorch library as a result, which was a big win.

The biggest challenge for me in building this was getting the vector dimensions right for matrix multiplication. Learning to respect that process and approach it slowly was valuable.



In [48]:
import re
import random
import torch.nn as nn
import torch
import numpy as np
from fastcore import *
from nbdev.showdoc import *
from tqdm import tqdm
import pickle as pkl

In [2]:
def read_file(file_name):
    with open(file_name, 'r') as f:
        data = f.read()
    return data

In [3]:
raw_data = read_file('shakespeare.txt')


In [4]:
def remove_non_alpha_characters(data):
    data = data.lower()
    # use regex to remove all non-alphanumeric characters
    data = re.sub(r'[^a-zA-Z\s]', '', data)
    # use regex to remove all whitespace characters
    data = re.sub(r'\s+', ' ', data)
    return data


def remove_stopwords(data):
    stopwords = ['a', 'an', 'the', 'and', 'or', 'but', 'if', 'then', 'else', 'when', 'at', 'from', 'by', 'on', 'off', 'for', 'in', 'out', 'over', 'to', 'into', 'with', ""]
    data = [word for word in data if word not in stopwords]
    return data




def one_hot_encode(words):
    length = len(words.keys())
    encoded_words = {}
    for key, value in words.items():
        one_hot = np.zeros(length)
        one_hot[value] = 1
        tensor = torch.from_numpy(one_hot).to(torch.int64)
        encoded_words[key] = tensor

    return encoded_words



def get_scalar_loss(pos_score, neg_score, criterion, concatenated_data):
    """function to get the scalar loss. Unused because the results are generally bad from my current experiments."""
    score = torch.cat([pos_score, neg_score.flatten()], dim=0)
    combined_len = len(pos_score) + len(neg_score)
    pos_u_data = torch.ones(len(pos_score), 1)
    neg_v_data = torch.zeros(len(neg_score.flatten()), 1)
    loss = criterion(score, concatenated_data)
    return loss
    loss = get_scalar_loss( pos_score, neg_score, criterion, concat_data)


In [5]:
data = remove_non_alpha_characters(raw_data)
data = data.split(" ")
data = remove_stopwords(data)
unique_words = set(list(data))

In [6]:
unique_dict = {word: i for i, word in enumerate(unique_words)}


encoded_data = one_hot_encode(unique_dict)

### Create a train loader and dataset
Here we create a train_loader that will randomly generate examples forever

We choose a large batch size as this task does not demand a great deal of RAM and larger batches help with training speed.

In [7]:
def return_list_without_a_value(data, value):
    return [x for x in data if x != value]

def create_dataset(window_size, data):
    dataset = []

    for index, val in enumerate(data):
        sub = data[max(0,index-window_size):index]
        sub.extend(data[index+1:min(index+window_size, len(data))])
        for target in sub:
            dataset.append((unique_dict[val],unique_dict[target]))
    return dataset  

window_size = 8
dataset = create_dataset(window_size , data) 

batch_size = 100
train_loader = torch.utils.data.DataLoader(dataset=dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)

In [50]:
class SkipGramModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(SkipGramModel, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.u_embeddings = nn.Embedding(vocab_size, embedding_dim, sparse=True)
        self.v_embeddings = nn.Embedding(vocab_size, embedding_dim, sparse=True)
        self.init_emb()

    def init_emb(self):
        init_mean = 0
        init_std = 0.01
        self.u_embeddings.weight.data.normal_(init_mean, init_std)
        self.v_embeddings.weight.data.normal_(init_mean, init_std)

    def forward(self, pos_u, pos_v, neg_v):
        # Precompute embeddings for pos_u and pos_v
        emb_u = self.u_embeddings(pos_u).view(-1, 1, self.embedding_dim).squeeze()
        emb_v = self.v_embeddings(pos_v).view(-1, self.embedding_dim).squeeze()

        # Compute score for pos_u and pos_v
        score = torch.bmm(emb_u.unsqueeze(1), emb_v.unsqueeze(2)).squeeze()
        score = torch.sigmoid(score)

        # Precompute embeddings for neg_v
        neg_emb_v = self.v_embeddings(neg_v).view(-1, self.embedding_dim, neg_v.shape[1])

        # Compute scores for neg_v
        neg_score = torch.bmm(emb_u.unsqueeze(1), neg_emb_v).squeeze()
        neg_score = torch.sigmoid(neg_score)

        return score, neg_score


    
    def forward_without_negatives(self, word1, word2):
        pos_u = torch.tensor([unique_dict[word1]])
        pos_v = torch.tensor([unique_dict[word2]])
        emb_u = self.u_embeddings(pos_u).view(-1, 1, self.embedding_dim).squeeze()
        emb_v = self.v_embeddings(pos_v).view(-1, self.embedding_dim).squeeze()
        score = torch.dot(emb_u, emb_v)
        score = torch.sigmoid(score)
        return score

    def get_dict_embeddings(self):
        return self.u_embeddings.weight.data.cpu().numpy()
    
    def get_embedding_from_word(self, word):
        index = unique_dict[word]
        return self.u_embeddings.weight.data[index]
    
    def get_embedding_from_index(self, index):
        return self.u_embeddings.weight.data[index]

    def save_embedding(self, file_name):
        # Save embedding lookup table as pkl file
        with open(file_name, 'wb') as f:
            pkl.dump(self.u_embeddings.weight.data.cpu().numpy(), f)
    
    def import_embeddings(self, file_name):
        with open(file_name, 'rb') as f:
            self.u_embeddings.weight.data = torch.from_numpy(pkl.load(f)).to(torch.float32)
            self.v_embeddings.weight.data = torch.from_numpy(pkl.load(f)).to(torch.float32)

  
embedding_dim = 100
dictionary_length = len(unique_words)
model = SkipGramModel(dictionary_length, embedding_dim)



### Loss function
Here we create a custom loss function which gives us a loss based on the model's error when predicting a 1 or 0 for the context word or randomly sampled words.

We use a custom loss function because it allows us to add weight decay to our training and capture the specific nature of what we want the model to improve at, which in this case is relatedness of words.


In [51]:

def loss_function(score, neg_score, lr, weight_decay, model):
    pos_loss = -torch.mean(torch.log(score))
    neg_loss = -torch.mean(torch.sum(torch.log(1 - neg_score), dim=1))
    loss = pos_loss + neg_loss
    # add L2 regularization term
    l2_loss = 0
    for param in model.parameters():
        l2_loss += torch.sum(param**2)
        loss += weight_decay * l2_loss
    return loss

## Training loop
This is the training loop for our model.

Our train loader iterator is declared every epoch and we then iterate over it according to our steps per epoch.

We generate our negative samples randomly at runtime as the cost of doing so is very low.

The parameters passed in for training have a massive impact on model performance. 
The length of negative samples should be somewhere between 5 and 20. Having a lower numbers means the model may stray into simply having all of its values tend to 1 which is not what we want. Thus a higher number is favoured.


In [52]:
def train(model, train_loader, batch_size, negative_sample_length, weight_decay, learning_rate, steps_per_epoch, epochs):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    for epoch in range(epochs):
        loss_sum = 0
        train_loader_iter = iter(train_loader)
        for i in tqdm(range(steps_per_epoch)):
            x, y = next(train_loader_iter)
            pos_u = torch.tensor(x)
            pos_v = torch.tensor(y)
            neg_v = torch.randint(0, dictionary_length, (batch_size, negative_sample_length))
            optimizer.zero_grad()
            pos_score, neg_score = model(pos_u, pos_v, neg_v)
            loss = loss_function(pos_score, neg_score, learning_rate, weight_decay, model)
            loss.backward()
            optimizer.step()
            loss_sum += loss.item()
        print("Epoch: {}, Loss: {}".format(epoch, loss_sum / steps_per_epoch))
    return model

learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.0001)
negative_sample_length = 15
weight_decay = 0.0006

steps_per_epoch = 300
epochs = 15

train(model, train_loader, batch_size, negative_sample_length, weight_decay, learning_rate, steps_per_epoch, epochs)


  pos_u = torch.tensor(x)
  pos_v = torch.tensor(y)
100%|██████████| 300/300 [00:05<00:00, 59.45it/s]


Epoch: 0, Loss: 11.1058136622111


100%|██████████| 300/300 [00:05<00:00, 58.77it/s]


Epoch: 1, Loss: 11.077984917958577


 24%|██▎       | 71/300 [00:01<00:03, 61.46it/s]

# Testing
We now go to the testing phase to see how our model is performing.

### Testing functions
The following functions primarily exist to add, subtract and compare vectors. The goal is to produce intuitive results from the comparisons of our vectors.

Eg the following should have a high correlation:
flower and rose
man and king

man and woman

queen and woman
#### The following should have a low correlation
Flower and metal

concept and dog

power and table

In [11]:
def subtract_vector(vector1,vector2):
    return get_emb(vector1) - get_emb(vector2)

def add_vector(vector1,vector2):
    return get_emb(vector1) + get_emb(vector2)

def cos_sim(vector1, vector2):
    return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

def cos_sim_word(word1, word2):
    vector1 = get_emb(word1)
    vector2 = get_emb(word2)
    return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

def get_emb(word):
    return model.get_embedding_from_word(word)

def invert_dictionary(dictionary):
    return {v: k for k, v in dictionary.items()}

def get_closest_vector(vector):
    max = 0
    target = None
    for key,item in unique_dict.items():
        comparative = get_emb(key)
        comparison = cos_sim(vector, comparative)
        if comparison > max:
            max = comparison
            target = key

        
    return target


In [12]:
vector = subtract_vector("king", "man")
vector = vector +get_emb("woman") 


In [45]:
print(cos_sim_word("flower", "rose"),("flower", "rose"))
print(cos_sim_word("flower", "tree"), ("flower", "tree"))
print(cos_sim_word("flower", "dog"), ("flower", "dog"))
print(cos_sim_word("flower", "metal"), ("flower", "metal"))
print(cos_sim_word("flower", "cart"), ("flower", "cart"))
print(cos_sim_word("worm", "dog"), ("worm", "dog"))
print(cos_sim_word("king", "queen"), ("king", "queen"))
print(cos_sim_word("king", "royalty"), ("king", "royalty"))
print(cos_sim_word("queen", "royalty"), ("queen", "royalty"))
print(cos_sim_word("man", "king"), ("man", "king"))
print(cos_sim_word("woman", "king"), ("woman", "king"))
print(cos_sim_word("woman", "boot"), ("woman", "boot"))
print(cos_sim_word("child", "prince"), ("child", "prince"))
print(cos_sim_word("child", "thought"), ("child", "thought"))






0.8538131 ('flower', 'rose')
0.68368065 ('flower', 'tree')
0.9014465 ('flower', 'dog')
-0.16055314 ('flower', 'metal')
0.065594286 ('flower', 'cart')
0.8298176 ('worm', 'dog')
0.99682665 ('king', 'queen')
0.9448121 ('king', 'royalty')
0.93152064 ('queen', 'royalty')
0.9976486 ('man', 'king')
0.9650337 ('woman', 'king')
0.64117473 ('woman', 'boot')
0.9606477 ('child', 'prince')
0.9548535 ('child', 'thought')


In [14]:
model.forward_without_negatives("king", "man")

tensor(0.5974, grad_fn=<SigmoidBackward0>)

In [15]:
index1 = unique_dict["king"]
index2 = unique_dict["man"]
vector = subtract_vector("king", "man")
vector = vector+ get_emb("woman")
print(get_closest_vector(vector))

  return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))


fortnight


In [36]:
# save embeddings
path = "embeddings"
reversed_unique_dict = invert_dictionary(unique_dict)
model.save_embedding(reversed_unique_dict, "embeddings.emb")

In [25]:
# display embeddings visually using matplotlib. Each word is represented by a point in 2D space.
# The x and y coordinates of the point are the first and second dimensions of the word's embedding.
# The words are labeled by their actual word.
import matplotlib as plt

def display_embeddings(embeddings, word2id, filename):
    plt.figure(figsize=(20, 20))
    for i, label in enumerate(word2id):
        x, y = embeddings[i, :]
        plt.scatter(x, y)
        plt.annotate(label, xy=(x, y), xytext=(5, 2), textcoords='offset points', ha='right', va='bottom')
    plt.savefig(filename)


In [46]:
embeddings, word2id = model.import_embeddings("embeddings.emb")
display_embeddings(embeddings, word2id, "embeddings.png")

ValueError: could not convert string to float: 'tensor(0.0014)'

In [47]:
torch.convert_to_tensor('tensor(0.0014)', dtype=torch.float64)

AttributeError: module 'torch' has no attribute 'convert_to_tensor'

In [28]:
apple = torch.tensor(12)

In [30]:
apple.numpy()

array(12)