# Task 1. Preparation and Training

# 1.1 Word2Vec

## 1) Import Libraries and Data

In [18]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [19]:
import nltk
from nltk.corpus import reuters

In [None]:
# Download NLTK dataset
nltk.download('reuters')
nltk.download('punkt')

In [21]:
# Get the raw text from the Reuters corpus
corpus = [reuters.raw(fileid) for fileid in reuters.fileids()]

# Split words
corpus = [sent.split() for sent in corpus]

# Lower case
corpus = [[word.lower() for word in sent] for sent in corpus]

In [None]:
# Word sequences and unique words
flatten = lambda l: [item for sublist in l for item in sublist]
vocab = list(set(flatten(corpus)))
vocab

In [None]:
# Numericalization
word2index = {w: i for i, w in enumerate(vocab)}
print(word2index)

In [None]:
# Vocab size
voc_size = len(vocab)
print(voc_size)

In [None]:
# Add <UNK> for unknown words
vocab.append('<UNK>')
vocab[-1]

In [26]:
# Assign index 0 to the <UNK> (unknown)
word2index['<UNK>'] = 0

## 2) Prepare Train Data

In [None]:
for c in corpus:
    print(c)

In [28]:
def random_batch(batch_size, word_sequence, window_size=2):
    skip_grams = []  # List to store generated skip-grams
    # Iterate through each sentence in the corpus
    for sent in corpus:
        for i in range(window_size, len(sent) - window_size):
            target = word2index[sent[i]] # Target word (current word)
            # Collect context words within the window size around the target word
            context = [word2index[sent[i - j]] for j in range(1, window_size + 1)] + [word2index[sent[i + j]] for j in range(1, window_size + 1)]
            # Create skip-gram pairs (target, context word)
            for w in context:
                skip_grams.append([target, w])

    random_inputs = []
    random_labels = []
    random_index = np.random.choice(range(len(skip_grams)), batch_size, replace=False) # Randomly pick without replacement
    
    for i in random_index:
        random_inputs.append([skip_grams[i][0]]) # Add target to input batch
        random_labels.append([skip_grams[i][1]]) # Add context word to label batch
        
    return np.array(random_inputs), np.array(random_labels)

### Testing the method

In [None]:
batch_size = 2 # mini-batch size
input_batch, target_batch = random_batch(batch_size, corpus)

print("Input: ", input_batch)
print("Target: ", target_batch)

## 3) Model

In [30]:
class Skipgram(nn.Module):

    def __init__(self, vocab_size, emb_size):
        super(Skipgram, self).__init__()
        self.embedding_v = nn.Embedding(vocab_size, emb_size)
        self.embedding_u = nn.Embedding(vocab_size, emb_size)

    def forward(self, center_words, target_words, all_vocabs):
        center_embeds = self.embedding_v(center_words) # [batch_size, 1, emb_size]
        target_embeds = self.embedding_u(target_words) # [batch_size, 1, emb_size]
        all_embeds = self.embedding_u(all_vocabs) # [batch_size, voc_size, emb_size]
        
        scores = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
        # [batch_size, 1, emb_size] @ [batch_size, emb_size, 1] = [batch_size, 1, 1] = [batch_size, 1]

        norm_scores = all_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
        # [batch_size, voc_size, emb_size] @ [batch_size, emb_size, 1] = [batch_size, voc_size, 1] = [batch_size, voc_size]

        nll = -torch.mean(torch.log(torch.exp(scores) / torch.sum(torch.exp(norm_scores), 1).unsqueeze(1))) # log-softmax
        # Loss must be scalar
        
        return nll # Negative log likelihood

## 4) Training

In [31]:
# Parameters
batch_size = 2
embedding_size = 2
window_size = 2
skipgram_model = Skipgram(voc_size, embedding_size)
optimizer = optim.Adam(skipgram_model.parameters(), lr=0.001)

In [None]:
# Training preparation
def prepare_sequence(seq, word2index):
    idxs = list(map(lambda w: word2index[w] if word2index.get(w) is not None else word2index["<UNK>"], seq))
    return torch.LongTensor(idxs)

# Use for the normalized term in the probability calculation
all_vocabs = prepare_sequence(list(vocab), word2index).expand(batch_size, len(vocab)) # [batch_size, voc_size]
all_vocabs.shape

In [33]:
# Placeholder for epoch time calculation
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
import time

# Training
num_epochs = 100
total_loss = 0.0
total_time = 0.0

for epoch in range(num_epochs):
    
    start = time.time()
    
    input_batch, target_batch = random_batch(batch_size, corpus, window_size=window_size)
    input_batch = torch.LongTensor(input_batch) # [batch_size, 1]
    target_batch = torch.LongTensor(target_batch) # [batch_size, 1]
    
    optimizer.zero_grad()
    loss = skipgram_model(input_batch, target_batch, all_vocabs)
    
    loss.backward()
    optimizer.step()
    
    end = time.time()
    
    epoch_mins, epoch_secs = epoch_time(start, end)

    # Update total loss and total time
    total_loss += loss.item()
    total_time += (end - start)

    # Print progress every 10 epochs
    if (epoch + 1) % 10 == 0:
        print(f"Epoch: {epoch + 1} | Loss: {loss:.6f} | Time: {epoch_mins}m {epoch_secs}s")

# Loss and Time
avg_loss = total_loss / num_epochs

total_mins = total_time // 60
total_secs = total_time % 60

avg_time = total_time / num_epochs
avg_mins = int(avg_time // 60)
avg_secs = int(avg_time % 60)

print(f"\nSkipgram Training completed in {total_mins}m {total_secs}s")
print(f"Skipgram Average Training Time per Epoch: {avg_mins}m {avg_secs}s")

print(f"\nSkipgram Total Loss: {total_loss:.6f}")
print(f"Skipgram Average Loss per Epoch: {avg_loss:.6f}")

# 1.2 Word2Vec (Negative Sampling)

## 1) Import Libraries and Data

same as 1.1

## 2) Prepare Train Data

same as 1.1

## 3) Negative Sampling

### Unigram distribution

In [35]:
Z = 0.001 # Scaling factor

In [36]:
from collections import Counter

# Count the occurrences of words in the corpus
word_count = Counter(flatten(corpus))
num_total_words = sum([c for w, c in word_count.items()])

In [37]:
unigram_table = []
for vo in vocab:
    unigram_table.extend([vo] * int(((word_count[vo] / num_total_words) ** 0.75) / Z))

In [None]:
Counter(unigram_table)

### Negative Sampling

In [39]:
import random

# Training preparation: mapping words to indices in a dictionary
def prepare_sequence(seq, word2index):
    idxs = list(map(lambda w: word2index[w] if word2index.get(w) is not None else word2index["<UNK>"], seq))
    return torch.LongTensor(idxs)

# Negative sampling function
def negative_sampling(targets, unigram_table, k):
    batch_size = targets.size(0)
    neg_samples = []
    for i in range(batch_size):
        nsample = []
        target_index = targets[i].item()
        while len(nsample) < k: # Generate k negative samples
            neg = random.choice(unigram_table)
            if word2index[neg] == target_index:
                continue
            nsample.append(neg)
        neg_samples.append(prepare_sequence(nsample, word2index).view(1, -1))
    return torch.cat(neg_samples)

### Testing the negative sampling

In [40]:
input_batch  = torch.Tensor(input_batch)
target_batch = torch.LongTensor(target_batch)

In [None]:
target_batch.shape

In [None]:
input_batch

In [None]:
num_neg = 3 # Number of negative samples per target
negative_sampling(target_batch, unigram_table, num_neg)

In [None]:
target_batch[1]

## 4) Model

In [45]:
class SkipgramNegSampling(nn.Module):
    def __init__(self, vocab_size, emb_size):
        super(SkipgramNegSampling, self).__init__()
        self.embedding_v = nn.Embedding(vocab_size, emb_size) # center embedding
        self.embedding_u = nn.Embedding(vocab_size, emb_size) # context embedding
        self.logsigmoid = nn.LogSigmoid()
                    
    def forward(self, center_words, target_words, negative_words):
        center_embeds = self.embedding_v(center_words) # [batch_size, 1, emb_size]
        target_embeds = self.embedding_u(target_words) # [batch_size, 1, emb_size]
        neg_embeds = -self.embedding_u(negative_words) # [batch_size, num_neg, emb_size]
        
        positive_score = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
        negative_score = neg_embeds.bmm(center_embeds.transpose(1, 2))
        
        loss = self.logsigmoid(positive_score) + torch.sum(self.logsigmoid(negative_score), 1)
                
        return -torch.mean(loss)
    
    def prediction(self, inputs):
        embeds = self.embedding_v(inputs)
        return embeds


## 5) Training

In [46]:
# Parameters
batch_size = 2
embedding_size = 2
num_neg = 3
window_size = 2
skipgram_neg_model = SkipgramNegSampling(voc_size, embedding_size)
optimizer = optim.Adam(skipgram_neg_model.parameters(), lr=0.001)

In [None]:
# Training
num_epochs = 100
total_loss = 0.0
total_time = 0.0

for epoch in range(num_epochs):
    
    start = time.time()
    
    input_batch, target_batch = random_batch(batch_size, corpus, window_size=window_size)
    input_batch = torch.LongTensor(input_batch) # [batch_size, 1]
    target_batch = torch.LongTensor(target_batch) # [batch_size, 1]
    negs_batch = negative_sampling(target_batch, unigram_table, num_neg) # [batch_size, num_neg]
    
    optimizer.zero_grad()
    loss = skipgram_neg_model(input_batch, target_batch, negs_batch)

    end = time.time()

    epoch_mins, epoch_secs = epoch_time(start, end)
    
    loss.backward()
    optimizer.step()

    # Update total loss and total time
    total_loss += loss.item()
    total_time += (end - start)

    # Print progress every 10 epochs
    if (epoch + 1) % 10 == 0:
        print(f"Epoch: {epoch + 1} | Loss: {loss:.6f} | Time: {epoch_mins}m {epoch_secs}s")

# Loss and Time
avg_loss = total_loss / num_epochs

total_mins = total_time // 60
total_secs = total_time % 60

avg_time = total_time / num_epochs
avg_mins = int(avg_time // 60)
avg_secs = int(avg_time % 60)

print(f"\nSkipgram Neg Training completed in {total_mins}m {total_secs}s")
print(f"Skipgram Neg Average Training Time per Epoch: {avg_mins}m {avg_secs}s")

print(f"\nSkipgram Neg Total Loss: {total_loss:.6f}")
print(f"Skipgram Neg Average Loss per Epoch: {avg_loss:.6f}")

# 1.3 GloVe

## 1) Import Libraries and Data

same as 1.1

## 2) Build Co-occurence Matrix X

In [48]:
def create_skipgrams(corpus, window_size=2):
    skip_grams = []
    for sent in corpus:
        for i in range(1, len(sent) - 1):
            target = sent[i]
            context = sent[max(i - window_size, 0):i] + sent[i + 1:i + window_size + 1]
            for w in context:
                skip_grams.append((target, w))
    return skip_grams

In [49]:
window_size = 2  # Default
skip_grams = create_skipgrams(corpus, window_size)

In [None]:
# Calculate co-occurrence counts
X_ik_skipgram = Counter(skip_grams)
X_ik_skipgram

### Weighting function

In [51]:
#simply a normalized function...don't worry too much
def weighting(w_i, w_j, X_ik):
        
    #check whether the co-occurrences exist between these two words
    try:
        x_ij = X_ik[(w_i, w_j)]
    except:
        x_ij = 1  #if does not exist, set it to 1
                
    x_max = 100 #100 # fixed in paper  #cannot exceed 100 counts
    alpha = 0.75
    
    #if co-occurrence does not exceed 100, scale it based on some alpha
    if x_ij < x_max:
        result = (x_ij/x_max)**alpha  #scale it
    else:
        result = 1  #if is greater than max, set it to 1 maximum
    
    return result

In [None]:
from itertools import combinations_with_replacement
import numpy as np

X_ik = {}  # Keep the co-occurrences (sparse dictionary)
weighting_dic = {}  # Dictionary for scaling factors

# Instead of looping over all combinations of vocab, only consider co-occurrences in your skip-grams
for bigram in combinations_with_replacement(vocab, 2):
    if X_ik_skipgram.get(bigram) is not None:  # Matches found in the skip-grams
        co_occur = X_ik_skipgram[bigram]  # Get the count from what we already counted
        
        # Increment co-occurrence count in X_ik (only store when necessary)
        X_ik[bigram] = X_ik.get(bigram, 0) + co_occur + 1
        X_ik[(bigram[1], bigram[0])] = X_ik.get((bigram[1], bigram[0]), 0) + co_occur + 1

        # Apply weighting function and store result in weighting_dic
        weighting_value = weighting(bigram[0], bigram[1], X_ik)
        weighting_dic[bigram] = weighting_value
        weighting_dic[(bigram[1], bigram[0])] = weighting_value  # For symmetry

# Print out the resulting dictionaries (will be much smaller than before)
print(f"{X_ik=}")
print(f"{weighting_dic=}")

In [None]:
print(f"First 10 skip-grams: {list(X_ik_skipgram.items())[:10]}")

## 3) Prepare Train Data

In [None]:
for c in corpus:
    print(c)

In [55]:
import math

def random_batch(batch_size, word_sequence, skip_grams, X_ik, weighting_dic):
    
    #convert to id since our skip_grams is word, not yet id
    skip_grams_id = [(word2index[skip_gram[0]], word2index[skip_gram[1]]) for skip_gram in skip_grams]
    
    random_inputs = []
    random_labels = []
    random_coocs  = []
    random_weightings = []
    random_index = np.random.choice(range(len(skip_grams_id)), batch_size, replace=False) #randomly pick without replacement
        
    for i in random_index:
        random_inputs.append([skip_grams_id[i][0]])  # target, e.g., 2
        random_labels.append([skip_grams_id[i][1]])  # context word, e.g., 3
        
        #get cooc
        pair = skip_grams[i]
        try:
            cooc = X_ik[pair]
        except:
            cooc = 1
        random_coocs.append([math.log(cooc)])
        
        #get weighting
        weighting = weighting_dic[pair]
        random_weightings.append([weighting])
                    
    return np.array(random_inputs), np.array(random_labels), np.array(random_coocs), np.array(random_weightings)

### Testing the method

In [None]:
batch_size = 2 # mini-batch size
input_batch, target_batch, cooc_batch, weighting_batch = random_batch(batch_size, corpus, skip_grams, X_ik, weighting_dic)

print("Input: ", input_batch)
print("Target: ", target_batch)
print("Cooc: ", cooc_batch)
print("Weighting: ", weighting_batch)

## 4) Model

In [57]:
class GloVe(nn.Module):
    
    def __init__(self, vocab_size,embed_size):
        super(GloVe,self).__init__()
        self.embedding_v = nn.Embedding(vocab_size, embed_size) # center embedding
        self.embedding_u = nn.Embedding(vocab_size, embed_size) # out embedding
        
        self.v_bias = nn.Embedding(vocab_size, 1)
        self.u_bias = nn.Embedding(vocab_size, 1)
        
    def forward(self, center_words, target_words, coocs, weighting):
        center_embeds = self.embedding_v(center_words) # [batch_size, 1, emb_size]
        target_embeds = self.embedding_u(target_words) # [batch_size, 1, emb_size]
        
        center_bias = self.v_bias(center_words).squeeze(1)
        target_bias = self.u_bias(target_words).squeeze(1)
        
        inner_product = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
        #[batch_size, 1, emb_size] @ [batch_size, emb_size, 1] = [batch_size, 1, 1] = [batch_size, 1]
        
        #note that coocs already got log
        loss = weighting*torch.pow(inner_product +center_bias + target_bias - coocs, 2)
        
        return torch.sum(loss)

## 5) Training

In [58]:
# Parameters
batch_size = 2
embedding_size = 2
glove_model = GloVe(voc_size, embedding_size)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(glove_model.parameters(), lr=0.001)

In [None]:
# Training
num_epochs = 100
total_loss = 0.0
total_time = 0.0

for epoch in range(num_epochs):
    
    start = time.time()
    
    input_batch, target_batch, cooc_batch, weighting_batch = random_batch(batch_size, corpus, skip_grams, X_ik, weighting_dic)
    input_batch  = torch.LongTensor(input_batch) #[batch_size, 1]
    target_batch = torch.LongTensor(target_batch) #[batch_size, 1]
    cooc_batch   = torch.FloatTensor(cooc_batch) #[batch_size, 1]
    weighting_batch = torch.FloatTensor(weighting_batch) #[batch_size, 1]
    
    optimizer.zero_grad()
    loss = glove_model(input_batch, target_batch, cooc_batch, weighting_batch)
    
    loss.backward()
    optimizer.step()
    
    end = time.time()
    
    epoch_mins, epoch_secs = epoch_time(start, end)

    # Update total loss and total time
    total_loss += loss.item()
    total_time += (end - start)

    # Print progress every 10 epochs
    if (epoch + 1) % 10 == 0:
        print(f"Epoch: {epoch + 1} | Loss: {loss:.6f} | Time: {epoch_mins}m {epoch_secs}s")

# Loss and Time
avg_loss = total_loss / num_epochs

total_mins = total_time // 60
total_secs = total_time % 60

avg_time = total_time / num_epochs
avg_mins = int(avg_time // 60)
avg_secs = int(avg_time % 60)

print(f"\nGloVe Training completed in {total_mins}m {total_secs:6f}s")
print(f"GloVe Average Training Time per Epoch: {avg_mins}m {avg_secs:.6f}s")

print(f"\nGloVe Total Loss: {total_loss:.6f}")
print(f"GloVe Average Loss per Epoch: {avg_loss:.6f}")


# Task 2. Model Comparison and Analysis

In [60]:
from gensim.models import KeyedVectors
from gensim.scripts.glove2word2vec import glove2word2vec
import re

glove_file = 'glove.6B.100d.txt'
glove_gensim_model = KeyedVectors.load_word2vec_format(glove_file, binary=False, no_header=True)

## Analogies

In [61]:
import torch.nn.functional as F

# Load the analogies dataset
def load_analogies(filepath):
    with open(filepath, 'r') as f:
        lines = f.readlines()

    # Ignore the first line with copyright info
    lines = lines[1:]

    analogies = {'semantic': [], 'syntactic': []}
    current_category = None
    
    for line in lines:
        if line.startswith(':'):
            if 'capital-common-countries' in line:
                current_category = 'semantic'
            elif 'gram7-past-tense' in line:
                current_category = 'syntactic'
        elif line.strip():
            words = line.strip().split()
            analogies[current_category].append(words)
    
    return analogies

analogies = load_analogies('word-test.v1.txt')

# Solving word analogies using cosine similarity
def cosine_similarity(a, b):
    return F.cosine_similarity(a, b, dim=1)

def solve_analogy(model, word2index, analogy):
    # Extract words
    a, b, c = analogy
    a_idx = word2index.get(a, word2index["<UNK>"])
    b_idx = word2index.get(b, word2index["<UNK>"])
    c_idx = word2index.get(c, word2index["<UNK>"])
    
    # Get embeddings
    a_embed = model.embedding_v(torch.LongTensor([a_idx])) # [1, emb_size]
    b_embed = model.embedding_v(torch.LongTensor([b_idx])) # [1, emb_size]
    c_embed = model.embedding_v(torch.LongTensor([c_idx])) # [1, emb_size]
    
    # Calculate d_embed (d is the missing word in the analogy: a : b :: c : d)
    d_embed = b_embed - a_embed + c_embed
    
    # Find the most similar word to d_embed in the vocabulary
    vocab_embeds = model.embedding_v.weight.detach() # [voc_size, emb_size]
    similarities = cosine_similarity(d_embed, vocab_embeds)
    
    # Find the word with the maximum similarity
    best_word_idx = torch.argmax(similarities).item()
    predicted_word = [word for word, idx in word2index.items() if idx == best_word_idx][0]
    
    return predicted_word

# Calculate accuracy for semantic and syntactic analogies
def calculate_accuracy(analogies, model, word2index, category):
    correct = 0
    total = 0
    for analogy in analogies[category]:
        a, b, c, d = analogy  # a : b :: c : d
        predicted = solve_analogy(model, word2index, [a, b, c])
        if predicted == d:
            correct += 1
        total += 1
    
    accuracy = correct / total * 100 if total > 0 else 0
    return accuracy

### SkipGram

In [None]:
semantic_accuracy_skipgram = calculate_accuracy(analogies, skipgram_model, word2index, 'semantic')
print(f"Semantic Accuracy (SkipGram): {semantic_accuracy_skipgram:.4f}%")

syntactic_accuracy_skipgram = calculate_accuracy(analogies, skipgram_model, word2index, 'syntactic')
print(f"Syntactic Accuracy (SkipGram): {syntactic_accuracy_skipgram:.4f}%")

### SkipGram with Negative Sampling

In [None]:
semantic_accuracy_skipgram_neg = calculate_accuracy(analogies, skipgram_neg_model, word2index, 'semantic')
print(f"Semantic Accuracy (SkipGram with Negative Sampling): {semantic_accuracy_skipgram_neg:.4f}%")

syntactic_accuracy_skipgram_neg = calculate_accuracy(analogies, skipgram_neg_model, word2index, 'syntactic')
print(f"Syntactic Accuracy (SkipGram with Negative Sampling): {syntactic_accuracy_skipgram_neg:.4f}%")

### GloVe

In [None]:
semantic_accuracy_glove = calculate_accuracy(analogies, glove_model, word2index, 'semantic')
print(f"Semantic Accuracy (GloVe): {semantic_accuracy_glove:.4f}%")

syntactic_accuracy_glove = calculate_accuracy(analogies, glove_model, word2index, 'syntactic')
print(f"Syntactic Accuracy (GloVe): {syntactic_accuracy_glove:.4f}%")


### GloVe Gensim

In [None]:
# Load analogy dataset
def load_analogies(file_path):
    analogies = {'semantic': [], 'syntactic': []}
    with open(file_path, 'r') as f:
        lines = f.readlines()
    
    current_section = None
    for line in lines:
        line = line.strip()
        
        # Skip copyright line
        if line.startswith("//") or not line:
            continue
        
        if line.startswith(":"):
            if "capital" in line:
                current_section = 'semantic'
            elif "past-tense" in line:
                current_section = 'syntactic'
        elif line:
            words = re.split(r'\s+', line)
            if len(words) == 4:
                analogies[current_section].append(words)
    return analogies

analogies = load_analogies('word-test.v1.txt')

# Calculate the analogy accuracy
def calculate_analogy_accuracy(analogies, model):
    correct = 0
    total = 0
    
    for section, items in analogies.items():
        for analogy in items:
            A, B, C, D_true = analogy
            if A not in model or B not in model or C not in model:
                continue
            
            # Perform analogy calculation: vec(B) - vec(A) + vec(C)
            vec_analogy = model[B] - model[A] + model[C]
            
            # Find the closest word to the resulting vector
            most_similar = model.similar_by_vector(vec_analogy, topn=10)
            
            # Check if the most similar word matches D_true
            predicted_word = most_similar[0][0]
            
            if predicted_word.lower() == D_true.lower():
                correct += 1
            total += 1
    
    accuracy = correct / total if total > 0 else 0
    return accuracy

# Calculate accuracy for both semantic and syntactic analogies
semantic_accuracy_glove_gensim = calculate_analogy_accuracy({'semantic': analogies['semantic']}, glove_gensim_model)
syntactic_accuracy_glove_gensim = calculate_analogy_accuracy({'syntactic': analogies['syntactic']}, glove_gensim_model)

print(f"Semantic Accuracy (GloVe Gensim): {semantic_accuracy_glove_gensim * 100:.4f}%")
print(f"Syntactic Accuracy (GloVe Gensim): {syntactic_accuracy_glove_gensim * 100:.4f}%")

## Similarity

### SkipGram

In [None]:
from scipy.stats import spearmanr
from sklearn.metrics import mean_squared_error

# Load similarity dataset
def load_similarity_data(file_path):
    word_pairs = []
    human_scores = []
    with open(file_path, 'r') as file:
        for line in file.readlines():
            word1, word2, score = line.strip().split('\t')
            word_pairs.append((word1, word2))
            human_scores.append(float(score))
    return word_pairs, np.array(human_scores)

# Get word embedding
def get_word_embedding(word, word2index, embedding_layer):
    idx = word2index.get(word, word2index["<UNK>"])
    return embedding_layer(torch.LongTensor([idx]))  # [1, emb_size]

# Calculate cosine similarity
def cosine_similarity(embedding1, embedding2):
    dot_product = torch.dot(embedding1.squeeze(), embedding2.squeeze())
    norm1 = torch.norm(embedding1)
    norm2 = torch.norm(embedding2)
    return dot_product / (norm1 * norm2)

# Main procedure
def run_similarity_evaluation(similarity_file, skipgram_model, word2index):
    word_pairs, human_scores = load_similarity_data(similarity_file)
    
    predicted_scores = []
    for word1, word2 in word_pairs:
        embedding1 = get_word_embedding(word1, word2index, skipgram_model.embedding_v)
        embedding2 = get_word_embedding(word2, word2index, skipgram_model.embedding_v)
        sim = cosine_similarity(embedding1, embedding2)
        predicted_scores.append(sim.item())  # Convert tensor to float
    
    # Calculate Spearman correlation and MSE
    correlation, _ = spearmanr(predicted_scores, human_scores)
    mse_skipgram = mean_squared_error(human_scores, predicted_scores)
    print(f"Skipgram Spearman Correlation: {correlation:.4f}")
    print(f"Skipgram MSE: {mse_skipgram:.4f}")
    print(f"Y_true MSE: {mean_squared_error(human_scores, human_scores):.4f}")  # This is trivially 0, as Y_true is itself.

# Usage
similarity_file = 'wordsim_similarity_goldstandard.txt'
run_similarity_evaluation(similarity_file, skipgram_model, word2index)

### SkipGram with Negative Sampling

In [None]:
# Main procedure
def run_similarity_evaluation(similarity_file, skipgram_neg_model, word2index):
    word_pairs, human_scores = load_similarity_data(similarity_file)
    
    predicted_scores = []
    for word1, word2 in word_pairs:
        embedding1 = get_word_embedding(word1, word2index, skipgram_neg_model.embedding_v)
        embedding2 = get_word_embedding(word2, word2index, skipgram_neg_model.embedding_v)
        sim = cosine_similarity(embedding1, embedding2)
        predicted_scores.append(sim.item())  # Convert tensor to float
    
    # Calculate Spearman correlation and MSE
    correlation, _ = spearmanr(predicted_scores, human_scores)
    mse_skipgram_neg = mean_squared_error(human_scores, predicted_scores)

    print(f"Skipgram with Negative Sampling Spearman Correlation: {correlation:.4f}")    
    print(f"Skipgram with Negative Sampling MSE: {mse_skipgram_neg:.4f}")

# Usage
similarity_file = 'wordsim_similarity_goldstandard.txt'
run_similarity_evaluation(similarity_file, skipgram_neg_model, word2index)

### GloVe

In [None]:
# Main procedure
def run_similarity_evaluation(similarity_file, glove_model, word2index):
    word_pairs, human_scores = load_similarity_data(similarity_file)
    
    predicted_scores = []
    for word1, word2 in word_pairs:
        embedding1 = get_word_embedding(word1, word2index, glove_model.embedding_v)
        embedding2 = get_word_embedding(word2, word2index, glove_model.embedding_v)
        sim = cosine_similarity(embedding1, embedding2)
        predicted_scores.append(sim.item())  # Convert tensor to float
    
    # Calculate Spearman correlation and MSE
    correlation, _ = spearmanr(predicted_scores, human_scores)
    mse_glove = mean_squared_error(human_scores, predicted_scores)

    print(f"GloVe Spearman Correlation: {correlation:.4f}")    
    print(f"GloVe MSE: {mse_glove:.4f}")

# Usage
similarity_file = 'wordsim_similarity_goldstandard.txt'
run_similarity_evaluation(similarity_file, glove_model, word2index)

### GloVe Gensim

In [None]:
# Load similarity dataset
def load_similarity_data(file_path):
    word_pairs = []
    human_scores = []
    with open(file_path, 'r') as file:
        for line in file.readlines():
            word1, word2, score = line.strip().split('\t')
            word_pairs.append((word1, word2))
            human_scores.append(float(score))
    return word_pairs, np.array(human_scores)

# Get word embedding
def get_word_embedding(word, embedding_layer):
    # Check if word exists in Gensim model
    if word in embedding_layer:
        return torch.tensor(embedding_layer[word], dtype=torch.float32)  # convert Gensim word vector to a torch tensor
    else:
        # Return a zero vector if the word is not found
        return torch.zeros(embedding_layer.vector_size, dtype=torch.float32)  # handle unknown words

# Calculate cosine similarity
def cosine_similarity(embedding1, embedding2):
    dot_product = torch.dot(embedding1.squeeze(), embedding2.squeeze())
    norm1 = torch.norm(embedding1)
    norm2 = torch.norm(embedding2)
    return dot_product / (norm1 * norm2)

# Main procedure
def run_similarity_evaluation(similarity_file, glove_gensim_model):
    word_pairs, human_scores = load_similarity_data(similarity_file)
    
    predicted_scores = []
    for word1, word2 in word_pairs:
        embedding1 = get_word_embedding(word1, glove_gensim_model)
        embedding2 = get_word_embedding(word2, glove_gensim_model)
        # Calculate similarity
        sim = cosine_similarity(embedding1, embedding2)
        predicted_scores.append(sim.item())  # Convert tensor to float
    
    # Remove NaN or invalid values
    valid_indices = ~np.isnan(predicted_scores) & ~np.isnan(human_scores)
    human_scores_valid = human_scores[valid_indices]
    predicted_scores_valid = np.array(predicted_scores)[valid_indices]
    
    # Calculate Spearman Correlation and MSE
    correlation, _ = spearmanr(predicted_scores_valid, human_scores_valid)
    mse_glove_gensim = mean_squared_error(human_scores_valid, predicted_scores_valid)
    print(f"GloVe Gensim Spearman Correlation: {correlation:.4f}")
    print(f"GloVe Gensim MSE: {mse_glove_gensim:.4f}")

# Usage
similarity_file = 'wordsim_similarity_goldstandard.txt'
run_similarity_evaluation(similarity_file, glove_gensim_model)

In [70]:
# Save model
torch.save(skipgram_model.state_dict(), "skipgram_model.pth")

In [71]:
import pickle

with open('word2index.pkl', 'wb') as f:
    pickle.dump(word2index, f)