In [1]:
import numpy as np

#将频数转化概率，在转为log形式，以避免精度下溢
def log_normalize(array):
    total = array.sum()
    array /= total  #规范化为概率
    
    result = np.empty_like(array)
    for i in range(len(array)):
        if array[i] == 0:
            result[i] = -3.14e+100  #用极小数值代替log(0)
        else:
            result[i] = np.log(array[i])
    return result

In [2]:
#将一行用BMES编码 
def encode(text):
    words = text.split()  #数据集用的是空格分词
    state_list = []
    sentence = ''.join(words)  #连起来的句子
    for i, word in enumerate(words):
        if len(word) == 1:  #单字词
            state_list.append(3)  #3代表状态S
        else:  #多字词
            state_list.extend([0] + (len(word) - 2) * [1] + [2])  #0,1,2代表状态BME
    return list(zip(sentence, state_list))

In [3]:
#用于中文分词的隐马尔科夫模型
class HMM:

    #初始化theta: 
        #Pi: initial state probability, 
        #A: transition probability
        #B: emitting probability
    def __init__(self, params_path=None):
        if params_path:
            params = np.load(params_path)
            self.init_prob = params['init_prob']
            self.trans_prob = params['trans_prob']
            self.emit_prob = params['emit_prob']
        else:
            self.init_prob = np.zeros(4) #4:BMES4种状态
            self.trans_prob = np.zeros((4, 4))
            self.emit_prob = np.zeros((4, 65536)) #65536:确保sentence的长度不会超过emit_prob的长度

    #根据词频统计学习隐马尔可夫模型       
        #:param file_path: utf-8 encoded Chinese separated text
    def train(self, file_path, save_to=None):
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f.readlines():
                encoding = encode(line)
                for i in range(len(encoding)):
                    char, state = encoding[i]
                    char_index = ord(char)
                    self.emit_prob[state, char_index] += 1  #统计频数，更新观测矩阵
                    if i == 0:  #句首
                        self.init_prob[state] += 1  #统计频数，更新初始概率
                    else:
                        prev_state = encoding[i - 1][1]
                        self.trans_prob[prev_state, state] += 1  #统计频数，更新状态转移矩阵

        #将频数规范化为频率，并取对数
        self.init_prob = log_normalize(self.init_prob)
        for i in range(self.trans_prob.shape[0]):
            self.trans_prob[i] = log_normalize(self.trans_prob[i])
            self.emit_prob[i] = log_normalize(self.emit_prob[i])

        #:param save_to: save model parameters to a file
        if save_to:
            np.savez(save_to, trans_prob=self.trans_prob, emit_prob=self.emit_prob, init_prob=self.init_prob)

    #用Viterbi算法对一行文本进行解码
    def Viterbi(self, text):
        """
        :param text: 未分词的文本
        :return: 状态序列
        """
        length = len(text)
        delta = np.zeros((length, 4))
        psi = np.zeros((length, 4), dtype=int)

        #计算delta和psi
        #由于log，概率的乘法都变加法
        for t in range(length):
            char_index = ord(text[t])
            if t == 0:
                delta[t] = self.init_prob + self.emit_prob[:, char_index]  #delta_j(0) = pi_j*b_j_v(0)
            else:
                for j in range(4):
                    temp = delta[t - 1] + self.trans_prob[:, j]  #delta_i(t-1)*a_ij
                    psi[t, j] = np.argmax(temp)
                    delta[t, j] = temp.max() + self.emit_prob[j, char_index]

        #开始回溯
        omega = np.zeros(length, dtype=int)
        omega[-1] = np.argmax(delta[-1])
        for t in range(length - 2, -1, -1):
            omega[t] = psi[t + 1, omega[t + 1]]

        return omega
    
    
    #根据状态序列得到一行文本的分词结果
       #:param text: 未分词的文本
    def split(self, text):
        text = text.strip()
        state_list = self.Viterbi(text)
        word_list = []
        for i in range(len(text)):
            if state_list[i] == 3:  #S，表示单字词
                word_list.append(text[i])
            elif state_list[i] == 0:  #B，表示一个词的开始
                word = text[i]
            elif state_list[i] == 1:  #M，表示一个词的中间
                word += text[i]
            else:  #E，表示一个词的结束
                word += text[i]
                word_list.append(word)
        
        #返回分词结果
        return word_list
    

In [4]:
def main():
    test_sentences = ["今天我来到了东南大学。",
                      "模式识别课程是一门有趣的课程。",
                      "我认为完成本次实验是一个挑战。"]

    try:
        split_model = HMM("hmm_params.npz")
    except:
        split_model = HMM()
        split_model.train("./RenMinData.txt_utf8", save_to="hmm_params.npz")
    for sent in test_sentences:
        words = split_model.split(sent)
        print(" ".join(words))#使用空格进行分词


if __name__ == "__main__":
    main()

今天 我来 到 了 东南 大学 。
模式 识别 课程 是 一门 有趣 的 课程 。
我 认为 完成 本次 实验 是 一个 挑战 。
