In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import Counter
from torch.utils.data import Dataset, DataLoader

# Hyperparameters
embedding_dim = 100
context_size = 2  # Number of context words to use
num_negative_samples = 5  # Number of negative samples per positive sample
learning_rate = 0.001
num_epochs = 5

# Example corpus
corpus = [
    "we are what we repeatedly do excellence then is not an act but a habit",
    "the only way to do great work is to love what you do",
    "if you can dream it you can do it",
    "do not wait to strike till the iron is hot but make it hot by striking",
    "whether you think you can or you think you cannot you are right",
]

# Preprocess the corpus
def preprocess_corpus(corpus):
    words = [word for sentence in corpus for word in sentence.split()]
    vocab = set(words)
    word_to_idx = {word: idx for idx, word in enumerate(vocab)}
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    return words, word_to_idx, idx_to_word

words, word_to_idx, idx_to_word = preprocess_corpus(corpus)

# Generate training data
def generate_training_data(words, word_to_idx, context_size):
    data = []
    for i in range(context_size, len(words) - context_size):
        target_word = word_to_idx[words[i]]
        context_words = [word_to_idx[words[i - j - 1]] for j in range(context_size)]
        context_words += [word_to_idx[words[i + j + 1]] for j in range(context_size)]
        for context_word in context_words:
            data.append((target_word, context_word))
    return data

training_data = generate_training_data(words, word_to_idx, context_size)

# Custom Dataset class
class Word2VecDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

dataset = Word2VecDataset(training_data)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Negative Sampling
def get_negative_samples(target, num_negative_samples, vocab_size):
    neg_samples = []
    while len(neg_samples) < num_negative_samples:
        neg_sample = np.random.randint(0, vocab_size)
        if neg_sample != target:
            neg_samples.append(neg_sample)
    return neg_samples

# Skip-gram Model with Negative Sampling
class SkipGramNegSampling(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(SkipGramNegSampling, self).__init__()
        self.vocab_size = vocab_size
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.context_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.log_sigmoid = nn.LogSigmoid()

    def forward(self, target, context, negative_samples):
        target_embedding = self.embeddings(target)
        context_embedding = self.context_embeddings(context)
        negative_embeddings = self.context_embeddings(negative_samples)

        positive_score = self.log_sigmoid(torch.sum(target_embedding * context_embedding, dim=1))
        negative_score = self.log_sigmoid(-torch.bmm(negative_embeddings, target_embedding.unsqueeze(2)).squeeze(2)).sum(1)

        loss = - (positive_score + negative_score).mean()
        return loss

# Training the model
vocab_size = len(word_to_idx)
model = SkipGramNegSampling(vocab_size, embedding_dim)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    total_loss = 0
    for target, context in dataloader:
        target = target.long()
        context = context.long()
        negative_samples = torch.LongTensor([get_negative_samples(t.item(), num_negative_samples, vocab_size) for t in target])

        optimizer.zero_grad()
        loss = model(target, context, negative_samples)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(dataloader)}")

# Getting the word embeddings
embeddings = model.embeddings.weight.detach().numpy()

# Function to get similar words
def get_similar_words(word, top_n=5):
    idx = word_to_idx[word]
    word_embedding = embeddings[idx]
    similarities = np.dot(embeddings, word_embedding)
    closest_idxs = (-similarities).argsort()[1:top_n+1]
    return [idx_to_word[idx] for idx in closest_idxs]

# Example usage
print(get_similar_words("do"))


Epoch 1, Loss: 23.277075052261353
Epoch 2, Loss: 22.667892694473267
Epoch 3, Loss: 22.502378225326538
Epoch 4, Loss: 21.587493658065796
Epoch 5, Loss: 20.702043294906616
['wait', 'act', 'make', 'iron', 'not']
