# Homework 2: Train A Skip-Gram model with negative sampling without using Word2Vec library.

## Author: Ou, Dongwen; Student ID: 15220212202866

### The HW2 is divided into 6 parts:

* 1. Data preprocessing(Tokenization)
  2. Skip-Gram pair data preparation
  3. Negative Sampling data generation
  4. Model definition using Pytorch with negative sampling MLE loss function
  5. Using SGD to train the model and some hyperparameter settings
  6. Check the results(output embeddings)

In [83]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import Counter
from typing import List

########################################
# 1. 数据准备 & 预处理
########################################

with open("/Users/dongwenou/HW2.txt", "r", encoding="utf-8") as f:
    mytext = f.read()


# 将文本切分成词序列
# 简单按空格拆分，并转小写
# text: str	说明参数 text 应该是一个 str（字符串）类型
# -> List[str]	表示函数的返回值是一个 List[str]，即“字符串列表”
# List[str]	表示列表中每个元素都是 str 类型（需要 from typing import List）
def tokenize_corpus(text: str) -> List[str]:    
    return text.lower().split()

tokens = tokenize_corpus(mytext)
print("Tokens:", tokens)

# 构建词表(去重) Vocabulary
# 使用 collections.Counter 类来统计 tokens 中每个单词的出现次数。
# word_counts 是一个类似字典的对象，记录了词频
# 从 word_counts 中提取所有词（即字典的 key），并排序，生成词汇表。
# vocab 是一个列表，里面是按字典序排序的词：
word_counts = Counter(tokens)
vocab = sorted(word_counts.keys())
word2idx = {w: i for i, w in enumerate(vocab)} #0开始
idx2word = {i: w for w, i in word2idx.items()} #items 返回key+value

vocab_size = len(vocab)
print("Vocab Size:", vocab_size, vocab)

Tokens: ['it', 'is', 'a', 'sunny', 'day.', 'anna', 'wakes', 'up', 'early.', 'she', 'eats', 'her', 'breakfast.', 'she', 'drinks', 'a', 'glass', 'of', 'milk.', 'then', 'she', 'puts', 'on', 'her', 'shoes.', 'anna', 'is', 'happy', 'today.', 'anna', 'goes', 'to', 'the', 'park', 'with', 'her', 'dog.', 'her', 'dog', 'is', 'small.', 'her', 'dog', 'is', 'brown.', 'his', 'name', 'is', 'max.', 'max', 'is', 'a', 'happy', 'dog.', 'max', 'likes', 'to', 'run.', 'in', 'the', 'park,', 'there', 'are', 'many', 'trees.', 'the', 'trees', 'are', 'tall.', 'the', 'leaves', 'are', 'green.', 'birds', 'sing', 'in', 'the', 'trees.', 'the', 'sun', 'shines.', 'the', 'sky', 'is', 'blue.', 'it', 'is', 'a', 'nice', 'day.', 'anna', 'sees', 'her', 'friend.', 'her', 'friend', 'is', 'tom.', 'tom', 'has', 'a', 'bike.', 'the', 'bike', 'is', 'red.', 'tom', 'rides', 'his', 'bike', 'fast.', 'anna', 'waves', 'to', 'tom.', 'tom', 'waves', 'back.', 'anna', 'and', 'tom', 'sit', 'on', 'a', 'bench.', 'they', 'eat', 'snacks.', 'anna'

In [58]:
########################################
# 2. 生成 Skip-Gram 训练样本
########################################

# 在 skip-gram 模型中，我们对每个目标词 w_c (center word)
# 从其上下文窗口内采样到 w_o (outside words) 来组成 (w_c, w_o)
# 这里设置一个小的窗口大小 window_size=2 仅作示例

def generate_skip_gram_pairs(tokens, window_size=5):
    pairs = []
    for i, center_word in enumerate(tokens):
        center_idx = word2idx[center_word]
        # 在中心词左右各 window_size 范围内找上下文词
        start = max(0, i - window_size)
        end = min(len(tokens), i + window_size + 1)
        for j in range(start, end):
            if j != i:
                outside_idx = word2idx[tokens[j]]
                pairs.append((center_idx, outside_idx))
    return pairs

skip_gram_pairs = generate_skip_gram_pairs(tokens, window_size=5)
print("Number of skip-gram pairs:", len(skip_gram_pairs))
print("Sample pairs (center_idx, outside_idx):", skip_gram_pairs[:10])

Number of skip-gram pairs: 10270
Sample pairs (center_idx, outside_idx): [(148, 147), (148, 0), (148, 285), (148, 67), (148, 8), (147, 148), (147, 0), (147, 285), (147, 67), (147, 8)]


In [60]:
########################################
# 3. 负采样函数
########################################

# 负采样: 对于每个 (center, outside) 正样本, 随机抽若干个负样本(词)
# 这里使用词频分布近似, 并且简单示范: 负样本数量 negative_k 可调
# 在实际应用中, 通常会构建一个更高效的采样表(如使用unigram^3/4等)

word_freqs = np.array([word_counts[w] for w in vocab], dtype=np.float32)
word_freqs = word_freqs / word_freqs.sum()  # 归一化为分布
# 这里从分布的概率来抽取，比uniform抽全部高效些

def negative_sampling(center_idx, outside_idx, negative_k=10):
    # 返回 size=negative_k 的负样本索引列表
    neg_samples = []
    for _ in range(negative_k):
        # 多次采样一个负样本, 与 outside_idx 不同即可
        neg = np.random.choice(range(vocab_size), p=word_freqs)
        while neg == outside_idx:
            neg = np.random.choice(range(vocab_size), p=word_freqs) #抽到window内的词就进入这个无限循环until抽到不在的
        neg_samples.append(neg)
    return neg_samples



所以负样本部分的 loss 是：
$$
\text{neg loss} = -\sum{j=1}^k \log \sigma\left( -v_{w_c}^\top v_{w_j^-} \right)
$$
其中：
* $v_{w_c}$：中心词的向量
* $v_{w_j^-}$：第 j 个负样本词的向量
* $\sigma(\cdot)$：sigmoid 函数

bmm 是 batch matrix multiplication → 每个 batch 执行：
$$
\text{neg\_score}_i = \text{neg\_embed}_i \cdot \text{center}_i^\top
$$

In [63]:
########################################
# 4. 模型定义: SkipGramNegativeSamplingModel
########################################

class SkipGramNegativeSamplingModel(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super(SkipGramNegativeSamplingModel, self).__init__()
        # 输入词向量表
        self.input_embeddings = nn.Embedding(vocab_size, embed_dim)
        # 输出词向量表 (可视为 "context" embeddings)
        self.output_embeddings = nn.Embedding(vocab_size, embed_dim)
        
        # 初始化: 
        nn.init.uniform_(self.input_embeddings.weight, -0.5, 0.5) 
        nn.init.uniform_(self.output_embeddings.weight, -0.5, 0.5)
        # 把 input_embeddings 这个矩阵里的所有数，全部设为 -0.5 到 +0.5 之间的随机数。

    def forward(self, center_idx, outside_idx, neg_indices):
        """
        center_idx: (batch_size,)  长度为 batch_size 的中心词索引
        outside_idx: (batch_size,) 长度为 batch_size 的正样本(上下文)词索引
        neg_indices: (batch_size, negative_k)  负样本词索引
        """
        # 1) 取出中心词向量: shape=(batch_size, embed_dim)
        center_embed = self.input_embeddings(center_idx)
        
        # 2) 取出正样本的输出向量: shape=(batch_size, embed_dim)
        outside_embed = self.output_embeddings(outside_idx)
        
        # 3) 负样本的输出向量: shape=(batch_size, negative_k, embed_dim)
        neg_embed = self.output_embeddings(neg_indices)
        
        # 计算正样本的 log-sigma(center_embed dot outside_embed)
        # 先做逐元素乘法后再求和得到内积: shape=(batch_size,)
        pos_score = torch.sum(center_embed * outside_embed, dim=1)
        # log-sigma
        pos_loss = -torch.log(torch.sigmoid(pos_score) + 1e-9)
        
        # 计算负样本的 log-sigma(- center_embed dot neg_embed)
        # 先对 (batch_size, negative_k, embed_dim) 与 (batch_size,1,embed_dim) 做广播
        center_embed_3d = center_embed.unsqueeze(1)  # 在指定位置插入维度 shape=(batch_size,1,embed_dim) 
        neg_score = torch.bmm(neg_embed, center_embed_3d.transpose(1,2)).squeeze() # 第1，2dim transpose：shape=(batch_size,embed_dim,1)
        # batch matrix multiplication: (batch_size, negative_k, embed_dim) * (batch_size,embed_dim,1)
        # squeeze() 会移除 tensor 中 所有 维度为 1 的“伪维度”
        # neg_score shape = (batch_size, negative_k)
        neg_loss = -torch.sum(torch.log(torch.sigmoid(-neg_score) + 1e-9), dim=1)
        
        # 总损失 = pos_loss + neg_loss, 取平均
        loss = pos_loss + neg_loss
        return loss.mean() #对batch取mean



In [72]:
########################################
# 5. 训练循环
########################################

embed_dim = 250       # 词向量维度(示例用小值)
batch_size = 128      # 小批量示例
negative_k = 10      # 每个正样本对应的负样本数
num_epochs = 100    # 训练轮数
learning_rate = 0.01

model = SkipGramNegativeSamplingModel(vocab_size, embed_dim)
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

skip_gram_dataset = skip_gram_pairs  # (center, outside)

# 简单的随机打乱
random.shuffle(skip_gram_dataset)

for epoch in range(num_epochs):
    total_loss = 0.0
    # 小批量迭代
    for i in range(0, len(skip_gram_dataset), batch_size): # 按batch_size前进
        batch = skip_gram_dataset[i:i+batch_size] # 会自动截断如果最后个batch会超出len一些
        center_idxs = []
        outside_idxs = []
        neg_idxs = []
        
        for (c_idx, o_idx) in batch: # 对batch中的每一对pair迭代
            center_idxs.append(c_idx)
            outside_idxs.append(o_idx)
            # 采样负样本
            neg_samples = negative_sampling(c_idx, o_idx, negative_k)
            neg_idxs.append(neg_samples)
        
        # 转成张量
        center_idxs = torch.LongTensor(center_idxs)
        outside_idxs = torch.LongTensor(outside_idxs)
        neg_idxs = torch.LongTensor(neg_idxs)
        
        optimizer.zero_grad() #清除旧梯度
        loss = model(center_idxs, outside_idxs, neg_idxs)
        loss.backward()
        optimizer.step() # 用梯度更新模型参数
        
        total_loss += loss.item()
    
    avg_loss = total_loss / (len(skip_gram_dataset)//batch_size + 1)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss = {avg_loss:.4f}")



Epoch 1/100, Loss = 9.4647
Epoch 2/100, Loss = 9.2849
Epoch 3/100, Loss = 9.1034
Epoch 4/100, Loss = 9.0104
Epoch 5/100, Loss = 8.8474
Epoch 6/100, Loss = 8.7979
Epoch 7/100, Loss = 8.7205
Epoch 8/100, Loss = 8.6335
Epoch 9/100, Loss = 8.5581
Epoch 10/100, Loss = 8.4222
Epoch 11/100, Loss = 8.4057
Epoch 12/100, Loss = 8.3247
Epoch 13/100, Loss = 8.2305
Epoch 14/100, Loss = 8.1617
Epoch 15/100, Loss = 8.1350
Epoch 16/100, Loss = 8.0765
Epoch 17/100, Loss = 8.0238
Epoch 18/100, Loss = 7.9622
Epoch 19/100, Loss = 7.9140
Epoch 20/100, Loss = 7.9021
Epoch 21/100, Loss = 7.8145
Epoch 22/100, Loss = 7.7469
Epoch 23/100, Loss = 7.6907
Epoch 24/100, Loss = 7.6938
Epoch 25/100, Loss = 7.6156
Epoch 26/100, Loss = 7.5590
Epoch 27/100, Loss = 7.5623
Epoch 28/100, Loss = 7.5066
Epoch 29/100, Loss = 7.4785
Epoch 30/100, Loss = 7.4452
Epoch 31/100, Loss = 7.3737
Epoch 32/100, Loss = 7.3637
Epoch 33/100, Loss = 7.2875
Epoch 34/100, Loss = 7.2626
Epoch 35/100, Loss = 7.2662
Epoch 36/100, Loss = 7.1876
E

In [77]:
########################################
# 6. 结果查看: Embeddings
########################################

# 最终的词向量(输入embedding)
word_embeddings = model.input_embeddings.weight.data.cpu().numpy()

print("\nSample word embeddings:")
for i, w in enumerate(vocab[:10]):
    print(w, word_embeddings[i])
    print()


Sample word embeddings:
a [ 0.12500648 -0.24841152  0.11608797 -0.43430963  0.1264059  -0.16214198
  0.03620679 -0.400952   -0.2495335  -0.09767942  0.27385178  0.16979522
 -0.24268594  0.34422678 -0.37057665  0.17289004  0.0265836  -0.07478195
 -0.193217   -0.07839001  0.02435297  0.34743235  0.07988136  0.18423913
 -0.43022582  0.05761949 -0.32126608 -0.11741664  0.30754644 -0.02126164
 -0.5402174   0.0211986   0.22809786 -0.00406621 -0.48834205  0.22167626
  0.5146758  -0.36203519 -0.12522145 -0.07781565  0.6099289  -0.43707013
  0.43391475 -0.5692686   0.05781138 -0.20382635 -0.23156177 -0.09643977
 -0.12604962  0.56816494  0.10666183 -0.35973626  0.30222863 -0.43313092
  0.0391289  -0.29154727  0.10652043 -0.09386367  0.21546584  0.19213304
  0.1583878  -0.00696401  0.10511545  0.27266312 -0.57757187 -0.19456562
  0.59627664  0.43019283  0.07762172  0.1327848  -0.03823346  0.04018035
 -0.3422041  -0.04510762 -0.14435397  0.30756602  0.14534819  0.21165152
 -0.43293792  0.4421645 

* From the above output, we can see the loss function value is dropping gradually with our SGD technique.

* However, it may not be very good, as the text I used to train the model is just several thousand words. In order to improve the model performance, more words and topics should be covered.  

In [30]:
array_b = range(0,23)
print(array_b[5])
for i in range(0, 23, 5):
    a = array_b[i:i+5]
    print(a)
# python list would automatically cut the out-lier.

5
range(0, 5)
range(5, 10)
range(10, 15)
range(15, 20)
range(20, 23)
