In [20]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

## 1. Defining simple data for understanding

In [21]:
corpus = ["apple banana fruit", "banana apple fruit", "banana fruit apple",
                 "dog cat animal", "cat animal dog", "cat dog animal"]

In [22]:
corpus = [sent.split(" ") for sent in corpus]
corpus

[['apple', 'banana', 'fruit'],
 ['banana', 'apple', 'fruit'],
 ['banana', 'fruit', 'apple'],
 ['dog', 'cat', 'animal'],
 ['cat', 'animal', 'dog'],
 ['cat', 'dog', 'animal']]

In [23]:
#get word sequences and unique words
flatten = lambda l: [item for sublist in l for item in sublist]
vocab = list(set(flatten(corpus)))
vocab

['apple', 'dog', 'banana', 'fruit', 'animal', 'cat']

In [24]:
#numerical transform
word2index = {w: i for i, w in enumerate(vocab)}
print(word2index)

{'apple': 0, 'dog': 1, 'banana': 2, 'fruit': 3, 'animal': 4, 'cat': 5}


In [25]:
#vocab size
voc_size = len(vocab)
print(voc_size)

6


In [26]:
#append UNK
vocab.append('<UNK>')

In [27]:
vocab

['apple', 'dog', 'banana', 'fruit', 'animal', 'cat', '<UNK>']

In [28]:
word2index['<UNK>'] = 0

In [29]:
#in case of requirement
index2word = {v:k for k, v in word2index.items()} 

## 2. Build Co-occurence Matrix X

Here, we need to count the co-occurence of two words given some window size. We gonna use window size of 1.

In [30]:
from collections import Counter

X_i = Counter(flatten(corpus)) # X_i
X_i

Counter({'apple': 3, 'banana': 3, 'fruit': 3, 'dog': 3, 'cat': 3, 'animal': 3})

In [31]:
# Make skip gram of one size window
skip_grams = []
# loop each word sequence
# we starts from 1 because 0 has no context
# we stop at second last for the same reason
for sent in corpus:
    for i in range(1, len(sent) - 1):
        target = sent[i]
        context = [sent[i - 1], sent[i + 1]]
        for w in context:
            skip_grams.append((target, w))

skip_grams

[('banana', 'apple'),
 ('banana', 'fruit'),
 ('apple', 'banana'),
 ('apple', 'fruit'),
 ('fruit', 'banana'),
 ('fruit', 'apple'),
 ('cat', 'dog'),
 ('cat', 'animal'),
 ('animal', 'cat'),
 ('animal', 'dog'),
 ('dog', 'cat'),
 ('dog', 'animal')]

In [32]:
X_ik_skipgram = Counter(skip_grams) # Co-occurece in window size 1
X_ik_skipgram

Counter({('banana', 'apple'): 1,
         ('banana', 'fruit'): 1,
         ('apple', 'banana'): 1,
         ('apple', 'fruit'): 1,
         ('fruit', 'banana'): 1,
         ('fruit', 'apple'): 1,
         ('cat', 'dog'): 1,
         ('cat', 'animal'): 1,
         ('animal', 'cat'): 1,
         ('animal', 'dog'): 1,
         ('dog', 'cat'): 1,
         ('dog', 'animal'): 1})

## Weighting function
GloVe includes a weighting function to scale down too frequent words.

![image.png](attachment:b0a0b21f-4311-4dba-8e17-dffb88c05e75.png)![image.png](attachment:13729d0a-c399-4d24-b902-b8ea6200f632.png)

In [33]:
#basic ormalization function
def weighting(w_i, w_j, X_ik):
        
    #check whether the co-occurrences exist between these two words
    try:
        x_ij = X_ik[(w_i, w_j)]
    except:
        x_ij = 1  #if does not exist, set it to 1
                
    x_max = 100 #100 # fixed in paper  #cannot exceed 100 counts
    alpha = 0.75
    
    #if co-occurrence does not exceed 100, scale it based on some alpha
    if x_ij < x_max:
        result = (x_ij/x_max)**alpha  #scale it
    else:
        result = 1  #if is greater than max, set it to 1 maximum
    
    return result

In [34]:
from itertools import combinations_with_replacement

X_ik = {}  #for keeping the co-occurences
weighting_dic = {} #scaling the percentage of sampling

for bigram in combinations_with_replacement(vocab, 2):
    if X_ik_skipgram.get(bigram) is not None:  #matches 
        co_occer = X_ik_skipgram[bigram]  #get the count from what we already counted
        X_ik[bigram] = co_occer + 1 # + 1 for stability issue
        X_ik[(bigram[1],bigram[0])] = co_occer+1   #count also for the opposite
    else:
        pass
        
    weighting_dic[bigram] = weighting(bigram[0], bigram[1], X_ik)
    weighting_dic[(bigram[1], bigram[0])] = weighting(bigram[1], bigram[0], X_ik)

print(f"{X_ik=}")
print(f"{weighting_dic=}")

X_ik={('apple', 'banana'): 2, ('banana', 'apple'): 2, ('apple', 'fruit'): 2, ('fruit', 'apple'): 2, ('dog', 'animal'): 2, ('animal', 'dog'): 2, ('dog', 'cat'): 2, ('cat', 'dog'): 2, ('banana', 'fruit'): 2, ('fruit', 'banana'): 2, ('animal', 'cat'): 2, ('cat', 'animal'): 2}
weighting_dic={('apple', 'apple'): 0.03162277660168379, ('apple', 'dog'): 0.03162277660168379, ('dog', 'apple'): 0.03162277660168379, ('apple', 'banana'): 0.053182958969449884, ('banana', 'apple'): 0.053182958969449884, ('apple', 'fruit'): 0.053182958969449884, ('fruit', 'apple'): 0.053182958969449884, ('apple', 'animal'): 0.03162277660168379, ('animal', 'apple'): 0.03162277660168379, ('apple', 'cat'): 0.03162277660168379, ('cat', 'apple'): 0.03162277660168379, ('apple', '<UNK>'): 0.03162277660168379, ('<UNK>', 'apple'): 0.03162277660168379, ('dog', 'dog'): 0.03162277660168379, ('dog', 'banana'): 0.03162277660168379, ('banana', 'dog'): 0.03162277660168379, ('dog', 'fruit'): 0.03162277660168379, ('fruit', 'dog'): 0.03

## 3. Prepare train data

In [35]:
for c in corpus:
    print(c)

['apple', 'banana', 'fruit']
['banana', 'apple', 'fruit']
['banana', 'fruit', 'apple']
['dog', 'cat', 'animal']
['cat', 'animal', 'dog']
['cat', 'dog', 'animal']


In [36]:
import numpy as np
import math

def random_batch(batch_size, skip_grams, X_ik, weighting_dic):
    """
    Simplified version that generates a random batch of training data
    
    Args:
        batch_size: Number of samples to return
        skip_grams: List of (target_word, context_word) tuples
        X_ik: Co-occurrence dictionary
        weighting_dic: Weighting dictionary
    
    Returns:
        inputs: Target word indices (shape: [batch_size, 1])
        labels: Context word indices (shape: [batch_size, 1])
        coocs: Log co-occurrence values (shape: [batch_size, 1])
        weights: Weighting values (shape: [batch_size, 1])
    """
    # Convert words to indices
    skip_grams_id = [(word2index[t], word2index[c]) for t, c in skip_grams]
    
    # Randomly select samples
    indices = np.random.choice(len(skip_grams_id), batch_size, replace=False)
    
    # Prepare batch data
    inputs, labels, coocs, weights = [], [], [], []
    
    for idx in indices:
        target, context = skip_grams_id[idx]
        word_pair = skip_grams[idx]
        
        inputs.append([target])
        labels.append([context])
        
        # Get co-occurrence (default to 1 if missing)
        cooc = X_ik.get(word_pair, 1)
        coocs.append([math.log(cooc)])
        
        # Get weighting
        weights.append([weighting_dic[word_pair]])
    
    return (
        np.array(inputs),
        np.array(labels),
        np.array(coocs),
        np.array(weights)
    )

In [37]:
# Test setup
import numpy as np
import math

# Sample vocabulary and word2index
vocab = ["apple", "banana", "fruit", "dog", "cat", "animal"]
word2index = {word:i for i, word in enumerate(vocab)}

# Sample skip_grams (as word pairs)
skip_grams = [
    ("apple", "banana"),
    ("banana", "fruit"),
    ("dog", "cat"),
    ("cat", "animal"),
    ("apple", "fruit")
]

# Sample co-occurrence counts (X_ik)
X_ik = {
    ("apple", "banana"): 10,
    ("banana", "fruit"): 8,
    ("dog", "cat"): 5,
    ("cat", "animal"): 3,
    ("apple", "fruit"): 6
}

# Sample weighting dictionary
weighting_dic = {
    ("apple", "banana"): 0.9,
    ("banana", "fruit"): 0.8,
    ("dog", "cat"): 0.7,
    ("cat", "animal"): 0.6,
    ("apple", "fruit"): 0.5
}

# Testing the function
print("=== Testing random_batch ===")
batch_size = 2
input_batch, target_batch, cooc_batch, weighting_batch = random_batch(
    batch_size, skip_grams, X_ik, weighting_dic
)

print("\nInput words:")
print([vocab[idx[0]] for idx in input_batch])
print("Input indices:", input_batch)

print("\nTarget words:")
print([vocab[idx[0]] for idx in target_batch])
print("Target indices:", target_batch)

print("\nCo-occurrence values (log):", cooc_batch)
print("Weighting values:", weighting_batch)

# Additional test to show multiple runs give different batches
print("\n=== Testing randomness ===")
for i in range(3):
    ib, tb, cb, wb = random_batch(2, skip_grams, X_ik, weighting_dic)
    print(f"\nBatch {i+1}:")
    print("Input:", ib[:,0], "->", [vocab[idx] for idx in ib[:,0]])
    print("Target:", tb[:,0], "->", [vocab[idx] for idx in tb[:,0]])

=== Testing random_batch ===

Input words:
['apple', 'dog']
Input indices: [[0]
 [3]]

Target words:
['fruit', 'cat']
Target indices: [[2]
 [4]]

Co-occurrence values (log): [[1.79175947]
 [1.60943791]]
Weighting values: [[0.5]
 [0.7]]

=== Testing randomness ===

Batch 1:
Input: [4 0] -> ['cat', 'apple']
Target: [5 2] -> ['animal', 'fruit']

Batch 2:
Input: [3 4] -> ['dog', 'cat']
Target: [4 5] -> ['cat', 'animal']

Batch 3:
Input: [0 4] -> ['apple', 'cat']
Target: [2 5] -> ['fruit', 'animal']


In [38]:
class GloVe(nn.Module):
    
    def __init__(self, vocab_size,embed_size):
        super(GloVe,self).__init__()
        self.embedding_v = nn.Embedding(vocab_size, embed_size) # center embedding
        self.embedding_u = nn.Embedding(vocab_size, embed_size) # out embedding
        
        self.v_bias = nn.Embedding(vocab_size, 1)
        self.u_bias = nn.Embedding(vocab_size, 1)
        
    def forward(self, center_words, target_words, coocs, weighting):
        center_embeds = self.embedding_v(center_words) # [batch_size, 1, emb_size]
        target_embeds = self.embedding_u(target_words) # [batch_size, 1, emb_size]
        
        center_bias = self.v_bias(center_words).squeeze(1)
        target_bias = self.u_bias(target_words).squeeze(1)
        
        inner_product = target_embeds.bmm(center_embeds.transpose(1, 2)).squeeze(2)
        #[batch_size, 1, emb_size] @ [batch_size, emb_size, 1] = [batch_size, 1, 1] = [batch_size, 1]
        
        #note that coocs already got log
        loss = weighting*torch.pow(inner_product +center_bias + target_bias - coocs, 2)
        
        return torch.sum(loss)

## 5.Training

In [39]:
batch_size     = 10 # mini-batch size
embedding_size = 2 #so we can later plot
model          = GloVe(voc_size, embedding_size)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)