# 汉语子词向量任务 SGNS

本实验基于 SGNS 方法构建汉语子词向量并进行评测。

In [4]:
import math
import os
import random
import torch
from torch import nn
import matplotlib.pyplot as plt
import tqdm

首先导入 corpus 语料库，并得到句子序列 sentences，查看语料库总共有 260004 条。

In [2]:
def read_file(file_name):
    lines = []
    with open(file_name, 'r') as f: 
        for line in f:
            lines.append(line)
    return lines

In [5]:
sentences = read_file('src/corpus.txt')
sentences = sentences[:100000]
len(sentences)

100000

### 句子预处理
对 sentences 序列中的每个句子元素进行预处理，包括如下步骤。
- 除去数据中空格（包括非文本、标点等）
- 除去停用词
- 中文分词

使用这些方法可以得到处理后的 sentences。

In [6]:
def purify(line): 
    line = line.replace(' ', '')
    return line

In [7]:
import jieba
from jieba import analyse

analyse.set_stop_words('src/stopwords.txt') # 停用词表设定

for i in range(len(sentences)):
    sentences[i] = purify(sentences[i])
    sentences[i] = analyse.extract_tags(sentences[i])

sentences[0][-5:]

Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache
Loading model cost 0.685 seconds.
Prefix dict has been built successfully.


['上涨', '最后', '一样', '继续', '美国']

### 生成词表

通过对 corpus.txt 中的所有语句进行分词，得到词表。

In [8]:
words = set([]) # 词表集合

for i in range(len(sentences)):
    for j in range(len(sentences[i])):
        words.add(sentences[i][j])

words = list(words)
len(words)

132474

从中随机取出 10000 个词，最终得到词表 words。

In [10]:
import random

def get_word_pair(line):
    line = line.strip("\n")
    return line.split()
     
words = random.sample(words, 9000) 

with open('src/word_pair.txt', 'r') as f: # 加入测试词汇
    for line in f:
        worda, wordb = get_word_pair(line)
        words.append(worda)
        words.append(wordb) 
        
len(words), words[0]

(10000, '爱子')

将句子根据词表 token 取值替代，换成 one-hot 形式便于生成训练数据。

In [11]:
for sentence in tqdm.tqdm(sentences):
    for j in range(len(sentence)):
        if sentence[j] in words:
            sentence[j] = words.index(sentence[j])
        else:
            sentence[j] = -1

100%|██████████| 100000/100000 [06:58<00:00, 238.83it/s]


### 基于 SGNS 分解

跳元模型假设一个词可以用来在文本序列中生成周围的词，跳元模型的主要思想是使用 softmax 运算来计算基于给定的中心词生成上下文字的条件概率。

跳元模型的梯度计算包含求和。然而，在一个词典上求和的梯度的计算成本是很大的。为了降低计算复杂度，使用负采样技术。

#### 1 构造数据生成器
中心词和上下文词的提取。

In [13]:
def get_centers_and_contexts(sentences, window_size): # 返回跳元模型中的中心词和上下文词
    centers, contexts = [], []
    
    for line in tqdm.tqdm(sentences):
        if len(line) < 2:
            continue
            
        centers += line
        
        for i in range(len(line)):
            indices = list(range(max(0, i - window_size), min(len(line), i + 1 + window_size)))
            indices.remove(i) # 排除中心词
            
            contexts.append([line[idx] for idx in indices])
                
    return centers, contexts

使用负采样进行近似训练。

In [14]:
class RandomGenerator: # 根据 n 个采样权重在 1 到 n 中随机抽取    
    def __init__(self, sampling_weights):
        self.population = list(range(1, len(sampling_weights) + 1))
        self.sampling_weights = sampling_weights
        self.candidates = []
        self.i = 0

    def draw(self):
        if self.i == len(self.candidates):
            self.candidates = random.choices(self.population, self.sampling_weights, k=10000)
            self.i = 0
            
        self.i += 1
        
        return self.candidates[self.i - 1]

In [15]:
def get_negatives(all_contexts, words, n): # 返回负采样中的噪声词
    sampling_weights = [1 ** 0.75 for i in range(1, len(words))] # 由于词表数量有限，采取等权重方案
    all_negatives, generator = [], RandomGenerator(sampling_weights)
    
    for contexts in tqdm.tqdm(all_contexts):
        negatives = []
        
        while len(negatives) < len(contexts) * n:
            neg = generator.draw()
            if neg not in contexts: # 噪声词不能是上下文词
                negatives.append(neg)
                
        all_negatives.append(negatives)
        
    return all_negatives

在提取所有中心词及其上下文词和采样噪声词后，将它们转换成小批量的样本，在训练过程中可以迭代加载。

In [16]:
def batchify(data): # 返回带有负采样的跳元模型的小批量样本  
    max_len = max(len(c) + len(n) for _, c, n in data)
    centers, contexts_negatives, masks, labels = [], [], [], []
    
    for center, context, negative in data:
        cur_len = len(context) + len(negative)
        
        centers += [center]
        contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
        masks += [[1] * cur_len + [0] * (max_len - cur_len)] # 构建掩码
        labels += [[1] * len(context) + [0] * (max_len - len(context))]
        
    return (torch.tensor(centers).reshape((-1, 1)), torch.tensor(contexts_negatives), 
            torch.tensor(masks), torch.tensor(labels)) 

综合以上函数，构造数据生成器，以便训练时调用。

In [17]:
class Dataset(torch.utils.data.Dataset): # 定义数据集
    def __init__(self, centers, contexts, negatives):
        assert len(centers) == len(contexts) == len(negatives)
        self.centers = centers
        self.contexts = contexts
        self.negatives = negatives

    def __getitem__(self, index):
        return (self.centers[index], self.contexts[index], self.negatives[index])

    def __len__(self):
        return len(self.centers)

In [20]:
def load_data(batch_size, window_size, num_noise_words, words, sentences): # 加载数据
    all_centers, all_contexts = get_centers_and_contexts(sentences, window_size)     
    all_negatives = get_negatives(all_contexts, words, num_noise_words)
    dataset = Dataset(all_centers, all_contexts, all_negatives)

    data_iter = torch.utils.data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify)
    return data_iter, words

#### 2 加载数据迭代器
设定训练批次大小，窗口 K 值，负采样个数。生成数据迭代器。

In [22]:
batch_size, window_size, num_noise_words = 256, 2, 3
data_iter, words = load_data(batch_size, window_size, num_noise_words, words, sentences)

100%|██████████| 100000/100000 [00:02<00:00, 37301.28it/s]
100%|██████████| 1474330/1474330 [00:16<00:00, 90052.05it/s]


#### 3 定义前向传播

In [37]:
def skip_gram(center, contexts_and_negatives, embed_v, embed_u):
    if (-1 not in center) and (-1 not in contexts_and_negatives):
        v = embed_v(center)
        u = embed_u(contexts_and_negatives)
        pred = torch.bmm(v, u.permute(0, 2, 1))
        return pred
    else:
        return None

#### 4 定义二元交叉熵损失函数
定义带掩码的训练损失函数。

In [38]:
class SigmoidBCELoss(nn.Module): # 带掩码的二元交叉熵损失
    def __init__(self):
        super().__init__()

    def forward(self, inputs, target, mask=None):
        out = nn.functional.binary_cross_entropy_with_logits(
            inputs, target, weight=mask, reduction="none")
        return out.mean(dim=1)

loss = SigmoidBCELoss()

#### 5 模型初始化
定义了两个嵌入层，将词表中的所有单词分别作为中心词和上下文词使用。字向量维度 embed_size 设为 200。

In [39]:
embed_size = 200
net = nn.Sequential(nn.Embedding(num_embeddings=len(words), embedding_dim=embed_size),
                    nn.Embedding(num_embeddings=len(words), embedding_dim=embed_size))

#### 6 训练

In [44]:
def train(net, data_iter, lr, num_epochs):
    def init_weights(m): # 初始词向量
        if type(m) == nn.Embedding:
            nn.init.xavier_uniform_(m.weight)

    net.apply(init_weights)
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)

    for epoch in tqdm.tqdm(range(num_epochs)):
        for i, batch in enumerate(data_iter):
            optimizer.zero_grad()
            center, context_negative, mask, label = [data for data in batch]
            pred = skip_gram(center, context_negative, net[0], net[1]) # 前向预测
            
            if pred != None:
                l = (loss(pred.reshape(label.shape).float(), label.float(), mask)
                         / mask.sum(axis=1) * mask.shape[1])

                l.sum().backward() # 反向传播
                optimizer.step()

    return net[0].weight.data

调参学习率为 0.002，训练轮数为5。

In [46]:
lr, num_epochs = 0.002, 5
V = train(net, data_iter, lr, num_epochs)
V.shape

100%|██████████| 5/5 [00:43<00:00,  8.69s/it]


torch.Size([10000, 200])

#### 相似度验证 SGNS 分解
与 SVD 实现相同。

In [47]:
def cos_sim(worda, wordb, V): # 计算两个词的余弦相似度
    if (worda in words) and (wordb in words):
        a = words.index(worda)
        b = words.index(wordb)
        cos = np.dot(V[a], V[b]) / (np.linalg.norm(V[a]) * np.linalg.norm(V[b]))
        return cos
    else:
        return 0

In [50]:
import numpy as np

lines = ''

with open('src/test.txt', 'r') as f: 
    for line in f:
        worda, wordb, svd_sim = get_word_pair(line)
        line = line[:-1] + '\t' + str(cos_sim(worda, wordb, V)) + '\n'
        lines += line
    
with open('src/test.txt', 'w') as f: 
    f.write(lines)