In [13]:
import numpy as np
import collections

# 假设Sigmoid和损失函数
class SigmoidWithLoss:
    def forward(self, y, t):
        self.y = y
        self.t = t
        
        # 使用 np.clip 限制 y 的范围，避免出现 log(0) 的情况
        y = np.clip(y, 1e-7, 1 - 1e-7)
        
        # 计算交叉熵损失
        loss = -np.sum(t * np.log(y) + (1 - t) * np.log(1 - y))
        return loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]
        dx = (self.y - self.t) * dout / batch_size
        return dx


# Embedding层，负责从词汇表中提取词嵌入
class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.cache = None

    def forward(self, idx):
        W = self.params[0]
        out = W[idx]
        self.cache = idx
        print('out===\n',out)
        return out

    def backward(self, dout):
        idx = self.cache
        self.grads[0][idx] += dout
        return None


# 计算词与词之间的内积
class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None

    def forward(self, h, idx):
        target_W = self.embed.forward(idx)
        print('target_W===\n',target_W)
        out = np.sum(target_W * h, axis=1)
        self.cache = (h, target_W)
        return out

    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0], 1)

        dtarget_W = dout * h
        self.embed.backward(dtarget_W)
        dh = dout * target_W
        return dh


# 负采样采样器
class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1

        vocab_size = len(counts)
        self.vocab_size = vocab_size

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]

        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)

    def get_negative_sample(self, target):
        batch_size = target.shape[0]
        negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

        for i in range(batch_size):
            p = self.word_p.copy()
            target_idx = target[i]
            p[target_idx] = 0
            p /= p.sum()
            negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)

        return negative_sample


# 负采样损失类
class NegativeSamplingLoss:
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]

        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)

        # 正例的正向传播
        score = self.embed_dot_layers[0].forward(h, target)
        correct_label = np.ones(batch_size, dtype=np.int32)
        loss = self.loss_layers[0].forward(score, correct_label)

        # 负例的正向传播
        negative_label = np.zeros(batch_size, dtype=np.int32)
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)
            loss += self.loss_layers[1 + i].forward(score, negative_label)

        return loss

    def backward(self, dout=1):
        dh = 0
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout)
            dh += l1.backward(dscore)

        return dh

import random
# 固定种子
seed = 42
# 设置random模块的种子
np.random.seed(seed)

# 小词汇表示例数据：5个词和一些语料
vocab = ["dog", "cat", "man", "woman", "apple"]
vocab_size = len(vocab)
W = np.random.randn(vocab_size, 2)  # 假设我们用一个 5×2 的词向量矩阵
print('W===\n',W)
corpus = [0, 1, 2, 3, 4, 0, 1, 3, 2]  # 对应词汇表 ["dog", "cat", "man", "woman", "apple"]

# 输入：上下文词的隐藏层表示，目标词索引
h = np.array([[0.1, 0.2], [0.3, 0.4]])  # 假设上下文词的嵌入是 2×2 的矩阵
target = np.array([0, 2])  # "dog" 和 "man" 作为目标词

# 创建负采样损失对象并计算损失
negative_sampling_loss = NegativeSamplingLoss(W, corpus, sample_size=2)
loss = negative_sampling_loss.forward(h, target)

# 输出总损失
print(f"Total Loss: {loss}")

# 进行反向传播
dh = negative_sampling_loss.backward()
print(f"Gradients for hidden layer (dh): {dh}")
print(f"Gradients for W: {negative_sampling_loss.grads[0]}")


W===
 [[ 0.49671415 -0.1382643 ]
 [ 0.64768854  1.52302986]
 [-0.23415337 -0.23413696]
 [ 1.57921282  0.76743473]
 [-0.46947439  0.54256004]]
out===
 [[ 0.49671415 -0.1382643 ]
 [-0.23415337 -0.23413696]]
target_W===
 [[ 0.49671415 -0.1382643 ]
 [-0.23415337 -0.23413696]]
out===
 [[0.64768854 1.52302986]
 [0.64768854 1.52302986]]
target_W===
 [[0.64768854 1.52302986]
 [0.64768854 1.52302986]]
out===
 [[-0.23415337 -0.23413696]
 [ 0.49671415 -0.1382643 ]]
target_W===
 [[-0.23415337 -0.23413696]
 [ 0.49671415 -0.1382643 ]]
Total Loss: 22.120590459275714
Gradients for hidden layer (dh): [[-0.11504491  0.35711761]
 [ 0.41975369  0.74166916]]
Gradients for W: [[-0.04889907 -0.09779814]
 [ 0.          0.        ]
 [-0.17458512 -0.23278016]
 [ 0.          0.        ]
 [ 0.          0.        ]]
