In [15]:
# code from https://github.com/applenob/simple_crf/blob/master/crf.py

import numpy as np
from scipy import special, optimize

# 引进特殊的起点和终点标记
START = '|-'
END = '-1'

def log_dot_vm(loga, logM):
    """通过log向量和log矩阵，计算log(向量 点乘 矩阵)"""
    return special.logsumexp(np.expand_dims(loga, axis=1) + logM, axis=0)


def log_dot_mv(logM, logb):
    """通过log向量和log矩阵，计算log(矩阵 点乘 向量)"""
    return special.logsumexp(logM + np.expand_dims(logb, axis=0), axis=1)


class CRF:
    def __init__(self, feature_functions, labels):
        self.ft_fun = feature_functions
        self.k = len(self.ft_fun)
        self.w = np.random.randn(self.k)
        self.labels = labels
        self.label_id = {l: i for i, l in enumerate(self.labels)}
        
    def train(self, x_vecs, y_vecs):
        vectorised_x_vecs, vectorised_y_vecs = self.create_vector_list(x_vecs, y_vecs)
        l = lambda w: self.neg_likelihood_and_deriv(vectorised_x_vecs, vectorised_y_vecs, w)
        val = optimize.fmin_l_bfgs_b(l, self.w)
        self.w, _, _ = val
        return self.w
        
    # for train()
    def create_vector_list(self,x_vecs, y_vecs):
        observations = [self.get_all_features(x_vec) for x_vec in x_vecs]
        labels = len(y_vecs) * [None]

        for i in range(len(y_vecs)):
            y_vecs[i].insert(0, START)
            y_vecs[i].append(END)
            labels[i] = np.array([self.label_id[y] for y in y_vecs[i]], copy=False, dtype=np.int)
        return observations, labels
        
    # for create_vector_list()
    def get_all_features(self, x_vec):
        result = np.zeros((len(x_vec) + 1, len(self.labels), len(self.labels), len(self.ft_fun)))
        for i in range(len(x_vec) + 1):
            for j, yp in enumerate(self.labels):
                for k, y in enumerate(self.labels):
                    for l, f in enumerate(self.ft_fun):
                        result[i, j, k, l] = f(yp, y, x_vec, i)
        return result
    
    # for train()
    def neg_likelihood_and_deriv(self, x_vec_list, y_vec_list, w):
        """        
        求负对数似然函数和关于w的偏导。
        关键变量的尺寸中，Y是标注空间的个数，K是特征函数的个数。
        """
        likelihood = 0
        derivative = np.zeros(len(self.w))
        
        for x_vec, y_vec in zip(x_vec_list, y_vec_list):
            all_features = x_vec
            length = x_vec.shape[0]
            yp_vec_ids = y_vec[:-1]
            y_vec_ids = y_vec[1:]
            log_M_s = np.dot(all_features, w)
            
            log_alphas = self.forward(log_M_s, self.label_id[START])
            log_betas = self.backward(log_M_s, self.label_id[END])
            
            last = log_alphas[-1]
            log_Z = special.logsumexp(last)
            
            log_alphas1 = np.expand_dims(log_alphas[1:], axis=2)
            log_betas1 = np.expand_dims(log_betas[:-1], axis=1)
            
            log_probs = log_alphas1 + log_M_s + log_betas1 - log_Z
            log_probs = np.expand_dims(log_probs, axis=3)
            
            # 计算特征函数关于模型的期望
            exp_features = np.sum(np.exp(log_probs) * all_features, axis=(0, 1, 2))
            # 计算特征函数关于训练数据的期望
            emp_features = np.sum(all_features[range(length), yp_vec_ids, y_vec_ids], axis=0)
            
            # 计算似然函数
            likelihood += np.sum(log_M_s[range(length), yp_vec_ids, y_vec_ids]) - log_Z
            # 计算似然函数的偏导
            derivative += emp_features - exp_features

        return -likelihood, -derivative
    
    # for neg_likelihood_and_deriv()
    def forward(self, log_M_s, start):
        T = log_M_s.shape[0]
        Y = log_M_s.shape[1]
        alphas = np.NINF * np.ones((T+1, Y))  # log0 = ninf
        alpha = alphas[0]
        alpha[start] = 0  # log1 = 0
        for t in range(1, T+1):
            alphas[t] = log_dot_vm(alpha, log_M_s[t - 1])
            alpha = alphas[t]
        return alphas

    # for neg_likelihood_and_deriv()
    def backward(self, log_M_s, end):
        T = log_M_s.shape[0]
        Y = log_M_s.shape[1]
        betas = np.NINF * np.ones((T+1, Y))  # log0 = ninf
        # betas = np.zeros((T+1, Y))
        beta = betas[-1]
        beta[end] = 0  # log1 = 0
        for t in reversed(range(T)):
            betas[t] = log_dot_mv(log_M_s[t], beta)
            beta = betas[t]
        return betas
    
    def predict(self, x_vec, debug=False):
        """
        给定x，预测y。使用Viterbi算法
        """
        
        # all_features, len(x_vec) + 1, Y, Y, K
        all_features = self.get_all_features(x_vec)
        
        # log_potential: len(x_vec) + 1, Y, Y  保存各个下标的非规范化概率
        log_potential = np.dot(all_features, self.w)
        
        T = len(x_vec)
        Y = len(self.labels)
        
        # Psi保存每个时刻最优情况的下标
        Psi = np.ones((T, Y), dtype=np.int32) * -1
        
        # 初始化
        delta = log_potential[0, 0]
        
        # 递推
        for t in range(1, T):
            next_delta = np.zeros(Y)
            for y in range(Y):
                w = delta + log_potential[t, :, y]
                Psi[t, y] = psi = w.argmax()
                next_delta[y] = w[psi]
            delta = next_delta
            
        # 回溯找到最优路径
        y = delta.argmax()
        trace = []
        for t in reversed(range(T)):
            trace.append(y)
            y = Psi[t, y]
        trace.reverse()
        return [self.labels[i] for i in trace]

In [16]:
from collections import defaultdict
import re
import sys

def get_feature_functions(word_sets, labels, observes):
    """生成各种特征函数"""
    print("get feature functions ...")
    transition_functions = [
        lambda yp, y, x_v, i, _yp=_yp, _y=_y: 
        1 if yp == _yp and y == _y else 0
        for _yp in labels[:-1] for _y in labels[1:]
        ]

    def set_membership(tag, word_sets):
        def fun(yp, y, x_v, i):
            if i < len(x_v) and x_v[i].lower() in word_sets[tag]:
                return 1
            else:
                return 0
        return fun

    observation_functions = [set_membership(t, word_sets) for t in word_sets]

    misc_functions = [
        lambda yp, y, x_v, i: 1 if i < len(x_v) and re.match('^[^0-9a-zA-Z]+$', x_v[i]) else 0,
        lambda yp, y, x_v, i: 1 if i < len(x_v) and re.match('^[A-Z\.]+$', x_v[i]) else 0,
        lambda yp, y, x_v, i: 1 if i < len(x_v) and re.match('^[0-9\.]+$', x_v[i]) else 0
    ]

    tagval_functions = [
        lambda yp, y, x_v, i, _y=_y, _x=_x: 1 if i < len(x_v) and y == _y and x_v[i].lower() == _x else 0
        for _y in labels
        for _x in observes]

    return transition_functions + tagval_functions + observation_functions + misc_functions


if __name__ == '__main__':
    word_data = []
    label_data = []
    all_labels = set()
    word_sets = defaultdict(set)
    observes = set()
    
    data = [
        "Confidence/NN in/IN the/DT pound/NN is/VBZ widely/RB expected/VBN to/TO take/VB another/DT sharp/JJ dive/NN if/IN trade/NN figures/NNS for/IN September/NNP ,/, due/JJ for/IN release/NN tomorrow/NN ,/, fail/VB to/TO show/VB a/DT substantial/JJ improvement/NN from/IN July/NNP and/CC August/NNP 's/POS near-record/JJ deficits/NNS ./.",
        "Chancellor/NNP of/IN the/DT Exchequer/NNP Nigel/NNP Lawson/NNP 's/POS restated/VBN commitment/NN to/TO a/DT firm/NN monetary/JJ policy/NN has/VBZ helped/VBN to/TO prevent/VB a/DT freefall/NN in/IN sterling/NN over/IN the/DT past/JJ week/NN ./.",
        "But/CC analysts/NNS reckon/VBP underlying/VBG support/NN for/IN sterling/NN has/VBZ been/VBN eroded/VBN by/IN the/DT chancellor/NN 's/POS failure/NN to/TO announce/VB any/DT new/JJ policy/NN measures/NNS in/IN his/PRP$ Mansion/NNP House/NNP speech/NN last/JJ Thursday/NNP ./.",
        "This/DT has/VBZ increased/VBN the/DT risk/NN of/IN the/DT government/NN being/VBG forced/VBN to/TO increase/VB base/NN rates/NNS to/TO 16/CD %/NN from/IN their/PRP$ current/JJ 15/CD %/NN level/NN to/TO defend/VB the/DT pound/NN ,/, economists/NNS and/CC foreign/JJ exchange/NN market/NN analysts/NNS say/VBP ./.",
    ]
    
    for line in data:
        words, labels = [], []
        for token in line.strip().split():
            word, label = token.split('/')
            all_labels.add(label)
            word_sets[label].add(word.lower())
            observes.add(word.lower())
            words.append(word)
            labels.append(label)

        word_data.append(words)
        label_data.append(labels)

    labels = [START, END] + list(all_labels)
    feature_functions = get_feature_functions(word_sets, labels, observes)

    crf = CRF(labels=labels, feature_functions=feature_functions)
    crf.train(word_data, label_data)
    for x_vec, y_vec in zip(word_data[-5:], label_data[-5:]):
        print("raw data: ", x_vec)
        print("prediction: ", crf.predict(x_vec))
        print("ground truth: ", y_vec)


get feature functions ...
('raw data: ', ['Confidence', 'in', 'the', 'pound', 'is', 'widely', 'expected', 'to', 'take', 'another', 'sharp', 'dive', 'if', 'trade', 'figures', 'for', 'September', ',', 'due', 'for', 'release', 'tomorrow', ',', 'fail', 'to', 'show', 'a', 'substantial', 'improvement', 'from', 'July', 'and', 'August', "'s", 'near-record', 'deficits', '.'])
('prediction: ', ['NN', 'CD', 'VBN', 'IN', 'VBZ', 'VBZ', 'VBZ', 'PRP$', 'VBZ', 'NN', 'POS', '|-', 'NN', 'POS', 'JJ', 'VBP', '|-', 'NN', 'IN', 'RB', '-1', '-1', '.', 'VBG', 'PRP$', 'IN', 'VBZ', 'PRP$', '|-', 'NN', 'POS', 'VBG', 'DT', 'RB', '-1', 'RB', 'VBP'])
('ground truth: ', ['|-', 'NN', 'IN', 'DT', 'NN', 'VBZ', 'RB', 'VBN', 'TO', 'VB', 'DT', 'JJ', 'NN', 'IN', 'NN', 'NNS', 'IN', 'NNP', ',', 'JJ', 'IN', 'NN', 'NN', ',', 'VB', 'TO', 'VB', 'DT', 'JJ', 'NN', 'IN', 'NNP', 'CC', 'NNP', 'POS', 'JJ', 'NNS', '.', '-1'])
('raw data: ', ['Chancellor', 'of', 'the', 'Exchequer', 'Nigel', 'Lawson', "'s", 'restated', 'commitment', 'to'