
以下是双仿射函数的代码实现。

In [6]:
# 代码来源于GitHub项目Alibaba-NLP/ACE
# （Copyright (c) 2020, Xinyu Wang, MIT License（见附录））
import torch
import torch.nn as nn

class Biaffine(nn.Module):
    def __init__(self, n_in, n_out=1, bias_x=True, bias_y=True,\
                diagonal=False):
        super(Biaffine, self).__init__()
        # n_in：输入特征大小
        # n_out：输出的打分数量（边预测为1，标签预测即为标签数量）
        # bias_x：为输入x加入线性层
        # bias_y：为输入y加入线性层
        self.n_in = n_in
        self.n_out = n_out
        self.bias_x = bias_x
        self.bias_y = bias_y
        self.diagonal = diagonal
        # 对角线化参数，让原本的参数矩阵变成了对角线矩阵，
        # 从而大幅度减少运算复杂度，一般在计算标签的得分时会使用
        if self.diagonal:
            self.weight = nn.Parameter(torch.Tensor(n_out,
                                                    n_in + bias_x))
        else:
            self.weight = nn.Parameter(torch.Tensor(n_out,
                                                    n_in + bias_x,
                                                    n_in + bias_y))
        self.reset_parameters()


    def reset_parameters(self):
        nn.init.normal_(self.weight)

    def forward(self, x, y):
        # 当bias_x或bias_y为True时，为输入x或y的向量拼接额外的1
        if self.bias_x:
            x = torch.cat((x, torch.ones_like(x[..., :1])), -1)
        if self.bias_y:
            y = torch.cat((y, torch.ones_like(y[..., :1])), -1)

        # PyTorch中的einsum可以很简单地实现矩阵运算
        # 思路是为输入的张量的每个维度分别定义一个符号
        # （例如输入x、y的第一维是批大小，定义为b）
        # 并且定义输出的张量大小，这个函数会自动地根据
        # 前后的变化计算张量乘法、求和的过程
        # 例如下面的einsum函数中的bxi,byi,oi->boxy，
        # 表示的是输入的3个张量大小分别为b * x * i、b * y * i和o * i
        # 输出则是b * o * x * y
        # 根据这个式子，可以看出3个张量都有i这个维度，
        # 在输出时被消除了
        # 因此3个张量的i维通过张量乘法（三者按位相乘然后求和）
        # 进行消除
        # 这个算法的好处是相比于手动实现，
        # einsum可以更容易地避免运算过程中出现很大的张量大幅占用显存
        # 同时也避免了手动实现的流程
        # 具体使用方法请参考PyTorch文档
        if self.diagonal:
            s = torch.einsum('bxi,byi,oi->boxy', x, y, self.weight)
        else:
            s = torch.einsum('bxi,oij,byj->boxy', x, self.weight, y)
        # 当n_out=1时，将第一维移除
        s = s.squeeze(1)

        return s

现在我们举一个简单的例子来看双仿射函数是如何进行打分的：

In [9]:
biaffine = Biaffine(4)
# 假设批大小为1，句子长度为2，词数量为4
x=torch.randn(1,2,4)
y=torch.randn(1,2,4)
scores = biaffine(x,y)
print(scores)

tensor([[[0.7518, 4.9461],
         [2.2237, 1.5331]]], grad_fn=<SqueezeBackward1>)


下面给出具体的CKY算法实现。

In [3]:
# 代码来源于GitHub项目yzhangcs/crfpar
# （Copyright (c) 2020, Yu Zhang, MIT License（见附录））
import torch

def cky(scores, mask):
    '''
    scores：大小为批大小 * 序列长度 * 序列长度，
    每个位置表示跨度的打分，例如scores[0,1,2]就表示
    第0个输入样例上跨度(1,2)的打分（跨度(i,j)不包括j位置，
    因此跨度(1,2)即对应的只有1个词的跨度的打分）。
    
    mask：与scores的大小一样，表示对打分的掩码，
    根据句子长度的不同，掩码大小不同。假设句子0长度为5，
    那么mask[0,:5,:5]中所有值都为1，mask[0]上的其余位置为0
    '''
    # 通过对掩码求和的方式找到每个句子的长度
    lens = mask[:, 0].sum(-1).long()
    # 批大小 * 序列长度 * 序列长度 -> 序列长度 * 序列长度 * 批大小，
    # 方便后续运算
    scores = scores.permute(1, 2, 0)
    seq_len, seq_len, batch_size = scores.shape
    
    # 复制一个新的与scores大小相同的张量，其中s表示新计算出的跨度得分，
    # p表示得分最高的位置（也就是max k），位置信息用long()表示
    s = scores.new_zeros(seq_len, seq_len, batch_size)
    p = scores.new_zeros(seq_len, seq_len, batch_size).long()
    
    # 设w为跨度，从小的跨度到大跨度进行遍历
    for w in range(1, seq_len):
        # 通过seq_len - w可以计算出当前长度有多少长度为w的跨度
        n = seq_len - w
        # 根据n生成0到n的列表
        starts = p.new_tensor(range(n)).unsqueeze(0)
        
        # 当跨度w为1的时候，没有中间值k，
        # 直接将结果赋值到s中作为max score
        if w == 1:
            # diagonal(w)表示抽取对角线，w的大小为偏置，
            # 当偏置为0时，则直接为对角线，
            # 当偏置大于0，则对角线上移（也就是(0,1),(1,2),……）
            # 具体细节请查看PyTorch文档
            s.diagonal(w).copy_(scores.diagonal(w))
            continue

        # 计算跨度为w情况下，s_best(i,k)+s_best(k,j)的值，
        # strip()函数下面会介绍，
        # 它每次取出大小为n * w-1 * batch_size的矩阵
        s_span = stripe(s, n, w-1, (0, 1)) + stripe(s, n, w-1, (1, w), 0)
        # n * w-1 * batch_size -> batch_size * n * w-1
        s_span = s_span.permute(2, 0, 1)
        # 计算max(s_best(i,k)+s_best(k,j))，以及对应的k值
        s_span, p_span = s_span.max(-1)
        # 更新s_best(i,j) = s(i,j)+max(s_best(i,k)+s_best(k,j))
        s.diagonal(w).copy_(s_span + scores.diagonal(w))
        # 保留最大的k值，由于p_span并不对应在原来的矩阵中的位置，
        # 因此需要加上starts+1来还原
        p.diagonal(w).copy_(p_span + starts + 1)

    def backtrack(p, i, j):
        # 通过分治法找到之前所有得分最大的span
        if j == i + 1:
            return [(i, j)]
        split = p[i][j]
        ltree = backtrack(p, i, split)
        rtree = backtrack(p, split, j)
        return [(i, j)] + ltree + rtree
    
    p = p.permute(2, 0, 1).tolist()
    # 从最大的跨度(0,length)开始，逐渐找到中间最大的k值，还原整个成分
    trees = [backtrack(p[i], 0, length)
             for i, length in enumerate(lens.tolist())]

    return trees



def stripe(x, n, w, offset=(0, 0), dim=1):
    r'''Returns a diagonal stripe of the tensor.
    Parameters:
        x：输入的超过2维的张量
        n：输出的斜对角矩阵的长度
        w：输出的斜对角矩阵的宽度
        offset：前两个维度的偏置
        dim：当其为0则抽取纵向斜对角矩阵；1则是横向斜对角矩阵
    例子：
    >>> x = torch.arange(25).view(5, 5)
    >>> x
    tensor([[ 0,  1,  2,  3,  4],
            [ 5,  6,  7,  8,  9],
            [10, 11, 12, 13, 14],
            [15, 16, 17, 18, 19],
            [20, 21, 22, 23, 24]])
    >>> n = 2
    >>> w = 3
    >>> stripe(x, n, w-1, (0, 1))
    tensor([[ 1,  2],
            [ 7,  8]])
    >>> stripe(x, n, w-1, (1, w) dim=0)
    tensor([[  8,  13],
            [ 14,  19]])
    可以看出，当跨度长度为3时，
    两个矩阵的第一行分别表示跨度为：
    [(0,1),(0,2)]和[(1,3),(2,3)]，
    可以看出枚举了对于跨度为(0,3)有两种跨度组合：
    s(0,1)+s(1,3)
    s(0,2)+s(2,3)
    '''
    x, seq_len = x.contiguous(), x.size(1)
    # 根据x的形状创建步长列表，numel为批大小
    stride, numel = list(x.stride()), x[0, 0].numel()
    # 设置行和列的步长，假设当前位置为(i,j)，
    # stride[0]会取出(i+1,j+1)的值，作为输出矩阵的下一行值
    stride[0] = (seq_len + 1) * numel
    # 假设当前位置为(i,j)，stride[1]会取出(i+1,j)的值，作为下一列的值
    stride[1] = (1 if dim == 1 else seq_len) * numel
    return x.as_strided(size=(n, w, *x.shape[2:]), stride=stride,
        storage_offset=(offset[0]*seq_len+offset[1])*numel)


给定一个句子“learning probalistic grammar is difficult”，为每个跨度打分（简单起见，这里没有考虑标签），并调用CKY算法得到句法树：

In [4]:
# 句子“learning probalistic grammar is difficult”一共有5个词，
# 额外加上根节点后，score的张量大小为1 * 6 * 6
# score是一个上三角矩阵，其余部分用-999代替

score = torch.Tensor([
        [ -999,  1,  -1,  1,  -1, 1],
        [ -999,  -999,  1,  1,  -1, -1],
        [ -999, -999, -999, 1, -1, -1],
        [ -999, -999, -999, -999, 1, 1],
        [ -999, -999, -999, -999, -999, 1],
        [ -999, -999, -999, -999, -999, -999]]
    ).unsqueeze(0)

# mask应该是一个上三角矩阵
mask = torch.ones_like(score)
mask = torch.triu(mask,diagonal=1)

print(mask)

trees=cky(score,mask)
print(trees)

tensor([[[0., 1., 1., 1., 1., 1.],
         [0., 0., 1., 1., 1., 1.],
         [0., 0., 0., 1., 1., 1.],
         [0., 0., 0., 0., 1., 1.],
         [0., 0., 0., 0., 0., 1.],
         [0., 0., 0., 0., 0., 0.]]])
[[(0, 5), (0, 3), (0, 1), (1, 3), (1, 2), (2, 3), (3, 5), (3, 4), (4, 5)]]


现在画出这个成分句法树。这里使用supar代码包来画成分句法树。supar是一个开源且好用的句法、成分、语义分析工具包。我们没有为标签进行打分，因此这里使用空标签。

In [9]:
from supar.models.const.crf.transform import Tree
draw_tree=[(i,j,'|') for i,j in trees[0]]
Tree.build(['learning', 'probalistic', 'grammar', 'is',\
    'difficult'],draw_tree,root='TOP').pretty_print()


                           TOP                    
                            |                      
                            |                     
               _____________|_________             
              |                       |           
    __________|_______                |            
   |                  |               |           
   |           _______|_____       ___|______      
   |          |             |     |          |    
   |          |             |     |          |     
   _          _             _     _          _    
   |          |             |     |          |     
learning probalistic     grammar  is     difficult



内向算法的实现也和CKY算法类似。下面是具体的实现代码。

In [15]:
# 代码来源于GitHub项目yzhangcs/crfpar
# （Copyright (c) 2020 Yu Zhang, MIT License（见附录））
def inside(scores, mask):
    # 大部分内容与cky()函数相同，相同部分不再复述
    batch_size, seq_len, _ = scores.shape
    scores, mask = scores.permute(1, 2, 0), mask.permute(1, 2, 0)
    # 我们会对score求exponential，因此
    s = torch.full_like(scores, float('-inf'))

    for w in range(1, seq_len):
        n = seq_len - w
        if w == 1:
            s.diagonal(w).copy_(scores.diagonal(w))
            continue
        s_span = stripe(s, n, w-1, (0, 1)) + stripe(s, n, w-1, (1, w), 0)
        s_span = s_span.permute(2, 0, 1)
        # 防止数据出现nan
        if s_span.requires_grad:
            s_span.register_hook(lambda x: \
                x.masked_fill_(torch.isnan(x), 0))
        # 这里使用PyTorch中的logsumexp()函数实现指数求和的过程，
        # logsumexp()可以有效防止exp后数据溢出的情况
        s_span = s_span.logsumexp(-1)
        s.diagonal(w).copy_(s_span + scores.diagonal(w))

    return s



下面是计算损失函数的过程。首先计算人工标注句法树的得分，也就是将该树的所有跨度得分求和，然后用内向算法得出的所有可能的树的得分取幂之和，最终计算损失函数值。

In [45]:
# 代码来源于GitHub项目yzhangcs/crfpar
# （Copyright (c) 2020 Yu Zhang, MIT License（见附录））
@torch.enable_grad()
def crf(scores, mask, target=None):
    lens = mask[:, 0].sum(-1).long()
    total = lens.sum()
    batch_size, seq_len, _ = scores.shape
    # 训练过程中需要保证scores能够返回梯度
    training = scores.requires_grad
    # 计算内向算法，求得可能的树的得分和
    s = inside(scores.requires_grad_(), mask)
    logZ = s[0].gather(0, lens.unsqueeze(0)).sum()
    # (scores * target * mask).sum()可以求出目标树的得分和，
    # 与logZ相减后求平均损失
    loss = (logZ - (scores * target * mask).sum()) / total
    return loss,


接下来计算以下实际的损失函数：

In [46]:
# score是一个上三角矩阵，其余部分用-999代替

score = torch.Tensor([
    [ -999,  1,  -1,  1,  -1, 1],
    [ -999,  -999,  1,  1,  -1, -1],
    [ -999, -999, -999, 1, -1, -1],
    [ -999, -999, -999, -999, 1, 1],
    [ -999, -999, -999, -999, -999, 1],
    [ -999, -999, -999, -999, -999, -999]]).unsqueeze(0)

# mask应该是一个上三角矩阵
mask = torch.ones_like(score)
mask = torch.triu(mask,diagonal=1).long()

# 在本例子中，假设score预测树与训练的目标target一样
target = torch.Tensor([
    [ 0,  1,  0,  1,  0, 1],
    [ 0,  0,  1,  1,  0, 0],
    [ 0, 0, 0, 1, 0, 0],
    [ 0, 0, 0, 0, 1, 1],
    [ 0, 0, 0, 0, 0, 1],
    [ 0, 0, 0, 0, 0, 0]]).unsqueeze(0)


print('目标矩阵：',target)

s=inside(score,mask)
logZ = s[0].gather(0, lens.unsqueeze(0)).sum()

print('内部函数计算结果(即所有可能的成分树分数和)：',logZ)

loss = crf(score,mask,target)
print('loss：',loss)

目标矩阵： tensor([[[0, 1, 0, 1, 0, 1],
         [0, 0, 1, 1, 0, 0],
         [0, 0, 0, 1, 0, 0],
         [0, 0, 0, 0, 1, 1],
         [0, 0, 0, 0, 0, 1],
         [0, 0, 0, 0, 0, 0]]])
内部函数计算结果(即所有可能的成分树分数和)： tensor(9.4121)
loss： (tensor(0.0824, grad_fn=<DivBackward0>),)


<!--#### 推理代码实现-->

下面我们提供一套代码来展示在基于转移的句法分析过程中，如何根据转移动作的打分去对栈和缓存进行操作。如果读者想要运行该代码，需自行定义model。

In [1]:
# 部分代码参考了GitHub项目kmkurn/pytorch-rnng 
# (Copyright 2017 Kemal Kurniawan, MIT License（见附录）)
# 假设当模型预测的SHIFT 操作id为0，REDUCE操作id为1，
# 非终极符的标签为其他大于1的值
SHIFT_ID=0
REDUCE_ID=1

# 这里假设只有3个标签
label_set = {2: 'S', 3: 'NP', 4: 'VP'}

class element:
    # word_id：一个存储成分结构的列表，例如(S (NP 1 2) 3)
    # 这棵树表示为[S [NP 1 2] 3]
    def __init__(self,word_id,is_open_nt):
        self.is_open_nt=is_open_nt
        if self.is_open_nt:
            self.word_id=[word_id]
        else:
            self.word_id=word_id


def decode(words,model):
    # words：每个元素为(word_idx, word_id)的元组，
    # word_idx为句子中的位置，word_id则为词表中的id
    # model：这里不具体构建模型，仅作为一个示例
    # 缓存buffer初始化，将words翻转，能够保证pop()操作能够从前往后进行
    buffer = words[::-1]
    # 栈stack初始化
    stack = []
    # 保存操作历史
    history = []
    # 统计当前栈中开放的非终极符数量
    num_open_nt = 0
    # 循环转移迭代
    while 1:
        # 模型通过buffer，stack和history计算下一步操作的打分
        log_probs = model(buffer,stack,history)
        # 得到得分最高的操作id
        action_id = torch.max(log_probs)[1]
        # 当action_id分别为0,1和大于1时，
        # 分别为其做SHIFT，REDUCE和push_nt操作
        if action_id == SHIFT_ID:
            buffer,stack = shift(buffer,stack)
        elif action_id == REDUCE_ID:
            stack = reduce(buffer,stack)
            num_open_nt -= 1
        else:
            stack = push_nt(stack,action_id)
            num_open_nt += 1
        # 将当前操作记录到历史中
        history.append(action_id)
        # 当缓存为空，栈只有一个元素且它不是开放的非终极符时，则退出
        if num_open_nt == 0 and len(buffer) == 0 and\
                len(stack) == 1 and stack[0].is_open_nt == False:
            break
    # 返回操作历史和整棵树
    return history, stack[0]

def shift(buffer,stack):
    # 将buffer中的词移动到栈顶
    word_id=buffer.pop()
    stack.append(element(word_id,False))
    return buffer, stack 

def reduce(stack):
    children = []
    # 重复地从栈中弹出完整的子树或终极符，直到遇到一个开放的非终极符
    while len(stack) > 0 and stack[-1].is_open_nt == False:
        children.append(stack.pop())
    # 循环pop()过程会将顺序颠倒，这里将其变回原来的顺序
    children = children[::-1]
    # 这些节点的word_id将成为当前开放的非终极符的子节点，
    # 我们将这些节点取出，成为一个新的列表
    children_ids = [child.word_id for child in children]
    # 将子节点放入非终极符的word_id中
    stack[-1].word_id+=children_ids
    # 将非终极符关闭
    stack[-1].is_open_nt = False
    
    return stack

def push_nt(stack,action_id):
    # 将action_id转换为具体的标签，放入栈顶
    stack.append(element(label_set[action_id],False))
    return stack

