# 隐含马尔科夫链 解决 拼音转文字 问题

## 什么是马尔科夫链

简单的假设，随机过程中各个状态 $ S_t $的概率分布，只与它前一个状态 $ S_t-1 $有关，即$ P(S_t|S_1, S_2, ..., S_t-1)=P(S_t|S_t-1)$。

比如说，对于天气预报，硬性的假设，今天的气温只和昨天有关而与前天无关。这种假设虽然未必适应所有的应用，但是相比较今天气温和多种因素相关来说，提出了一种近似解。这种假设就成为马尔科夫假设。这个随机过程称为马尔科夫链。


## 什么是隐含马尔科夫模型 (HMM, Hidden Markov Models)

隐含马尔科夫模型是马尔科夫模型的一个扩展：任一时刻$t$的状态$S_t$是不可见的。所以观察者是没有办法通过观察到一个序列$S_1, S_2, ..., S_t$来推测转移概率等参数。但是隐含马尔科夫模型在每个时刻都会产生一个$O_t$，而且$O_t$和$S_t$相关**且**仅和$S_t$相关。这个被称为独立输出假设。其中隐含的状态 $S_1, S_2, ..., S_t$ 是一个典型的马尔科夫链，所以被称为隐含马尔科夫模型。

![](./image/hmm1.jpg)

基于马尔科夫假设和独立输出假设，我们可以计算出某个特定的状态序列$S_1, S_2, ..., S_t$产生输出符号$O_1, O_2, ..., O_t$的概率。

$$ P(S_1, S_2, ..., S_t,O_1, O_2, ..., O_t) = \prod_t P(S_t|S_t-1)*P(O_t|S_t)   \qquad(1)$$ 

在通信中，接受端$O_1, O_2, ..., O_t$来推测信号源发送的信息$S_1, S_2, ..., S_t$，只需要从所有的源信息中找到最有可能产生观测信号的那一个信息。用概率论的语言来描述，就是在已知$O_1, O_2, ..., O_t$，求得领条件概率 $P(S_1, S_2, ..., S_t|O_1, O_2, ..., O_t)$达到最大值那个信息串$S_1, S_2, ..., S_t$,即 

$$S_1, S_2, ..., S_t=ArgMax_{all S_1, S_2, ...} P(S_1, S_2, ..., S_t|O_1, O_2, ..., O_t)  \qquad(2)$$  

而上面的公式等价于

$$P(O_1, O_2, ..., O_t|S_1, S_2, ..., S_t)*P(S_1, S_2, ..., S_t) = \prod_t P(O_t|S_t) * \prod_t P(S_t|S_t-1) \qquad(3)$$ 

这样正好得到公式（1）。如何找到（2）公式的最大值，进而识别出句子$S_1, S_2, ..., S_t$，可以利用*维特比算法（Viterbi Algorithm）*，这个后面再说。

针对不同的应用，P(S_1, S_2, ..., S_t|O_1, O_2, ..., O_t)的名称也不相同，*语音识别*中被称为“声学模型（Acoustic Model）”，在*机器翻译*中被称为“翻译模型”,在 *拼写校正*中是“纠错模型（Correction Model）”

## 使用隐含马尔科夫模型解决问题

用户输入拼音序列，就是一个可以观察到的序列，而拼音对应的汉字则可以看作是一个不可见的序列，要求解最大值那个信息串$S_1, S_2, ..., S_t$，使用隐含马尔科夫模型就可以解决问题。

### 几个重要的参数

在拼音转汉字中我们重新列出几个重要的参数：

* V 是所有可能的观测的集合, 其中 M 是可能的观测数，对应所有可能的拼音的状态数量；

* Q 是所有可能的隐藏状态的集合, 其中 N 是可能的状态数， 对应所有可能的汉字的状态数量；

* I 是长度为 T 的隐藏状态序列， 对应用户输入的拼音序列对应的汉字序列；

* O 是对应的观测序列， 对应用户输入的拼音序列；

HMM 模型可以使用一个三元组来刻画 $\lambda = (\pi, A, B)$

其中：

* A是**隐藏状态转移概率（Transition Probability）分布**，在拼音转汉字的过程中就是句子中汉字到汉字的转移概率。

* B是**观察状态到隐藏状态的生成概率（Generation Probability）**，在拼音转汉字的过程中就是拼音到汉字的发射概率。

* $\pi$ **隐藏状态初始的概率**，即每个汉字的初始概率。

### 解决问题

有了上述的一些基本参数，我们可以有一个初步的方案：

1. 可以从搜索日志中抽取一批中文搜索词，再结合已有的词典，将对应中文转换为对应的拼音序列

2. 对所有的词按照单字分词，并统计各个词出现的频率，即可作为初始化概率$\pi$ 

3. 统计每个拼音对应汉字以及各自出现的次数，就可以得到观察概率分布B

4. 最后统计每个汉字后面出现的汉字的次数，以此作为隐藏状态的转移概率分布A

经过以上步骤之后，就得到一个隐马尔科夫模型 $\lambda$

## 维特比算法

维特比算法是一个特殊但应用最广的动态规划算法。在求解隐马尔科夫、条件随机场的预测以及seq2seq模型概率计算等问题中均用到了该算法，即对状态序列进行预测时用到。利用动态规划，可以解决任何一个图中的最短路径问题。而维特比算法是针对一个特殊的图-篱笆网了（Lattice）的有向图最短路径问题而提出来的。它之所以重要，是因为凡是使用隐马尔科夫模型描述的问题都可以用它解码，包括当前的数字通信、语音识别、机器翻译、拼音转汉字、分词等。


假定用户（盲打时）输入的拼音是$y_1, y_2, ..., y_n$，对应的汉字是$x_1, x_2, ..., x_n$，可以用下图描述这样的一个过程：

![](./image/viterbi1.png)

这是一个相对简单的隐含马尔科夫链，这个马尔科夫链的每个状态的输出都是固定的，但是每一个状态的值是可变的。比如输入“zhong”的字可以是“中”，“钟”等多个字。我们不妨抽象一下，用符号$x_ij$表示状态$x_i$的第$j$个可能的值。如果按照不同的值展开，就得到了下面这个篱笆网络(Lattice):

![](./image/viterbi2.png)

$$x_1, x_2, ..., x_n=ArgMax_{all x_1, x_2, ...} P(x_1, x_2, ..., x_n|y_1, y_2, ..., y_n)=ArgMax_{all x_1, x_2, ...}\prod_{i=1}^N P(y_i|x_i)*p(x_i|x_{i-1})  \qquad(4)$$  

如果按照（4）计算从第一个状态到最后一个状态找到最可能的路径，这样的路径组合数会跟序列状态数呈指数型增长。汉语中每个无声调的拼音对应13个左右的汉字，那么组合数为$13^{10} ~ 5*10^{14}$。假定计算每条路径概率需要20次乘法，就是$10^16$次计算。按照具体计算机每秒处理$10^11$次的计算，也许有大约计算一天的时间，因此使用穷举法肯定不合适。而维特比算法是一个和状态数目成正比的算法。

![](./image/viterbi3.png)

简单来说维特比算法可以有：**假定当我们从状态i进入到状态i+1时，从起点S到状态i上各个节点的最短路径已经找到了，并且记录在这些节点上。那么在计算从起点S到第i+1状态的某个节点的最短路径时，只需要考虑从S到前一个状态i所以的k个节点的最短路径，以及从这k个节点到$x_{i+1}$,j的距离即可。**

这样每一步计算的复杂度都和相邻两个状态$S_i$和$S_{i+1}$格子节点数目$n_i, n_{i+1}$的乘积成正比，即$O(n_i*n_{i+1})$。如果隐含马尔科夫链中的节点最多有D个节点，网格长度为N，那么维特比算法的复杂度为$O(N*D^2)$。

回到之前输入法的问题上，计算量基本上是$13*10*10=1690 \approx 10^3$，这个和$10^16$有着天壤之别。那么无论在语音识别还是打字中，输入都是按照流的方式进行的，解码基本上是实时的。

### 实现

基于上面的知识点，我们可以先实现一个简单的版本。在此之前通过对新闻等文本，制作了一些拼音转汉字的序列和频率、拼音序列和频率、一些词组的序列和频率等制作了一个简单的词典文件放置在data\下，从而实现了在一定文本下的隐藏状态转移矩阵、隐藏状态初始状态、隐藏状态到观察状态的发射矩阵。

当然再简单点，可以手动列出一些mapping直接用于测试，如：

\# 隐藏状态转移矩阵

trans_prob = {"周杰":0.9,
              "周姐":0.1,
              "周洁":0.3,
              "杰伦":0.8,
              "结论":0.7
              }

\# 隐藏状态初始状态

pi = {
    "周":0.5,
    "粥":0.3,
    "杰":0.5,
    "姐":0.4,
    "节":0.2,
    "结":0.3,
    "轮":0.1,
    "伦":0.5,
    "论":0.5,
}

\# 隐藏状态到观察状态的发射矩阵

emit_probs = {
    "周zhou":0.5,
    "粥zhou":0.1,
    "姐jie":0.1,
    "节jie":0.1,
    "结jie":0.2,
    "杰jie":0.2,
    "轮lun":0.1,
    "伦lun":0.3,
    "论lun":0.1,
}

In [1]:
import numpy as np

from utils.common import *


pinyin2word = get_symbol_list()

word2id = word2id()
id2word = dict(zip(word2id.values(), word2id.keys()))

pinyin2id = pinyin2id()
id2pinyin = dict(zip(pinyin2id.values(), pinyin2id.keys()))

# 隐藏状态转移矩阵
trans_total_usage, trans_prob = get_hidden_status_trans_probs()
# min_trans_prob = 0.1 * float(1) / float(trans_total_usage)

# 隐藏状态初始矩阵
hidden_status_total_usage, pi = get_hidden_status_init_probs()
# 没有出现在pi矩阵的字给其设置一个初始的概率
min_word_prob = 0.1 * float(1) / float(hidden_status_total_usage)

# 隐藏状态到观察状态的发射矩阵
emit_probs = get_hidden_to_observer_emit_prob(get_symbol_list())


def get_word_in_pi_prob(word_id):
    '''
    返回某一个隐藏状态（汉字）在初始矩阵的概率，
    如果没有则为min_word_prob
    :param word_id:
    :return:
    '''
    word = id2word[word_id]
    if word in pi.keys():
        return pi.get(word)
    return min_word_prob


def get_pinyin_word_emit_prob(pinyin_word):
    '''
    返回一个隐藏状态到观察状态（字+拼音）组合在发射矩阵的概率，
    如果没有则为min_word_prob
    :param pinyin_word:
    :return:
    '''
    if pinyin_word in emit_probs.keys():
        return emit_probs[pinyin_word]
    return min_word_prob


def viterbi(word_list, pinyin_list, n):
    """
    维特比算法求解最大路径问题
    :param word_list:   每个拼音对应的隐藏状态矩阵
    :param n:   可能观察到的状态数， 对应为汉字数量
    :param id2word:    id到汉字的映射
    :return:
    """
    T = len(word_list)  # 观察状态的长度
    delta = np.zeros((T, n))  # 转移值
    psi = np.zeros((T, n), dtype=int)  # 转移下标值

    # 初始化第一个字符的隐藏初始状态概率， 设置为每个词在词典中的单独出现的概率
    words = word_list[0]
    for i, w in enumerate(words):
        delta[0][i] = get_word_in_pi_prob(w)

    # 动态规划计算
    for idx in range(1, T):
        words = word_list[idx]  # 第T时刻所有可能出现的字的集合
        for i in range(len(words)):
            max_value = 0
            pre_words = word_list[idx-1]

            last_index = 0
            for j in range(len(pre_words)):
                tmp_key = id2word[pre_words[j]] + id2word[words[i]]  # 中国/钟国/忠国
                # 获得转移概率，如果不存在则设置为0
                if tmp_key in trans_prob.keys():
                    prob = trans_prob[tmp_key]
                else:
                    prob = 0

                # 前一时刻的字观察状态到隐藏状态的概率 * 转移概率
                tmp_value = delta[idx-1][j] * prob
                if tmp_value > max_value:
                    max_value = tmp_value
                    last_index = j

            # 计算观察状态到隐藏状态的概率
            tmp_pw_key = id2word[words[i]] + pinyin_list[idx]  # 国guo2
            emit_prob = get_pinyin_word_emit_prob(tmp_pw_key) * max_value  # 观察状态到隐藏状态的概率 * 前一时刻所有字和当前字组合的最大概率

            delta[idx][i] = emit_prob
            psi[idx][i] = last_index  # 保存当前字的前一时刻 和当前字组合的最大概率 的下标值

    prob = 0
    path = np.zeros(T, dtype=int)
    path[T-1] = 1

    # 获取最大的转移值
    desc_word_id = []
    for i in range(n):
        if prob < delta[T-1][i]:
            prob = delta[T-1][i]
            path[T-1] = i
            desc_word_id.append(word_list[T-1][i])

    # 最优路径回溯
    for t in range(T-2, -1, -1):
        last_index = psi[t+1][path[t+1]]
        path[t] = last_index
        desc_word_id.append(word_list[t][last_index])

    final_word = ""
    for id in reversed(desc_word_id):
        final_word += id2word[id]

    return final_word

In [10]:
tests = [
    ['zhong1', 'guo2', 'ren2', 'min2'],
    ['ming2', 'tian1', 'gao1', 'kao3'],
    ['hou4', 'tian1', 'jiu4', 'yao4', 'chu1', 'cheng2', 'ji4', 'le5'],
    ['xi1', 'wang4', 'ni3', 'hou4', 'tian1', 'gao1', 'kao3', 'shun4', 'li4'],
    ['wo3', 'ming2', 'tian1', 'yao4', 'chi1', 'ping2', 'guo3'],
    ['ni3', 'de5', 'bao4', 'jia4', 'tai4', 'gao1', 'le5'],
    ['qin2', 'lao2', 'yong3', 'gan3'],
    ['xin1', 'zhi1', 'du4', 'ming2'],
    ['jin1', 'tian1', 'zhuan4', 'le5', "hao3", 'duo1', 'qian2'],
    ['ni3', 'shi4', 'fou3', 'you3', "hao3", 'duo1', 'wen4', 'hao4']
]

for pinyin_list in tests:
    word_id_list = []
    n = 0
    for i, single_pinyin in enumerate(pinyin_list):
        single_pinyin_words = pinyin2word[single_pinyin]
        if n < len(single_pinyin_words):
            n = len(single_pinyin_words)
        word_id_list.append([word2id[single_word] for single_word in single_pinyin_words])

    words = viterbi(word_id_list, pinyin_list, n)
    print(words)

中国人民
明天高考
后天就要出成系了
希望你后天高考顺利
我明天要吃苹果
你的报价太高了
勤劳勇敢
新知度明名
今天传了好多前
你是否有好多问好


从实现结果上看，在语料内容不多的情况下，很快的能得出这样的结果还是相当不错了。但是更多的语料该怎么实现呢？新发现的词又如何加载呢？每次加载内存又会否可以承担呢？上面使用2n-gram的统计模型是否有更好的n-gram模型呢？这些后面再来讲解。