假设存在标注语料，可以用该标注语料计算状态转移矩阵A，发射矩阵B， 以及初始状态概率π

文件的格式：
- 文件1:每行一个句子，每个句子中已进行切分，每个位置时词库单词的索引，并以空格风格, 假设词库大小为100
- 文件2:对应第一个文件，每行表示一个句子中每个位置的状态类型，取值分别是（0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
其中每个数字分别对应状态类型的每一个

In [13]:
# demo data
import numpy as np
from collections import Counter

data = [[1, 11, 10, 24, 56, 89, 90],
        [23, 12, 45, 67, 89, 90, 23, 11, 23, 45, 60],
        [29, 67, 34, 52, 16, 89, 45, 90, 34]]

ann =  [[0, 2, 2, 5, 6, 3, 1],
        [2, 2, 1, 7, 9, 8, 1, 4, 2, 7, 1],
        [0, 2, 1, 5, 7, 3, 2, 7, 6]]

In [24]:
def compute_pi(ann, M=10):
    # M:每个隐状态的取值
    pi = np.zeros(M)
    for k, v in Counter([s[0] for s in ann]).items():
        pi[k] = v/len(ann)
    return pi

In [30]:
def compute_A(ann, M=10):
    # M:每个隐状态的取值
    A = np.zeros((M, M))
    for s in ann:
        for i in range(1, len(s)):
            row = s[i-1]
            col = s[i]
            A[row][col] += 1
    return A/A.sum(axis=1, keepdims=True)

In [40]:
def compute_B(data, ann, M=10, V=100):
    corpus_size = len(data)
    B = np.zeros((M, V))
    for i in range(corpus_size):
        for d, a in zip(data[i], ann[i]):
            B[a][d] += 1
    return B/B.sum(axis=1, keepdims=True)

In [25]:
compute_pi(ann)

array([0.66666667, 0.        , 0.33333333, 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ])

In [31]:
compute_A(ann, M=10)

array([[0.        , 0.        , 1.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.33333333,
        0.33333333, 0.        , 0.33333333, 0.        , 0.        ],
       [0.        , 0.28571429, 0.28571429, 0.        , 0.        ,
        0.14285714, 0.        , 0.28571429, 0.        , 0.        ],
       [0.        , 0.5       , 0.5       , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 1.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.5       , 0.5       , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 1.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.25      , 0.       

In [41]:
compute_B(data, ann)

array([[0.        , 0.5       , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.5       ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.  

![image.png](attachment:image.png)

![image.png](attachment:image.png)

In [67]:
import numpy as np

state = 10 # 每个隐状态的取值，也即词汇标签
V = 100 # 词表的大小

# 状态转移矩阵
A = np.random.randint(1, 100, size=(state, state))
A = A/A.sum(axis=1, keepdims=True)

# 发射概率矩阵
B = np.random.randint(1, 100, size=(state, V))
B = B/B.sum(axis=1, keepdims=True)

π = np.random.randint(0, 10, size=(state,))
π = π/π.sum()

![image.png](attachment:image.png)

假设我们要处理的问题是上图表示的，Zi表示句子第i个单位，每个单位有m个取值，也就是第一列绿色部分，我们的目的是找出从Z1-Z10的每个状态相连的最好路径

下式中最外围的循环是句子序列从第一个位置到最后一个位置，也即K,接下来的循环是j,表示取到的每个状态

$$\delta_{k+1}(j) = \max_{i}[\\
\delta_{k}(0)   *  P(Z_{k+1}=j|Z_{k}=0)   * P(X_{k+1}|Z_{k+1}=j) \\
\delta_{k}(1)   *  P(Z_{k+1}=j|Z_{k}=1)   * P(X_{k+1}|Z_{k+1}=j) \\
\delta_{k}(2)   *  P(Z_{k+1}=j|Z_{k}=2)   * P(X_{k+1}|Z_{k+1}=j) \\
... \\
\delta_{k}(i)   *  P(Z_{k+1}=j|Z_{k}=i)   * P(X_{k+1}|Z_{k+1}=j) \\
\delta_{k}(i+1) *  P(Z_{k+1}=j|Z_{k}=i+1) * P(X_{k+1}|Z_{k+1}=j) \\
... \\
\delta_{k}(m)   *  P(Z_{k+1}=j|Z_{k}=m)   * P(X_{k+1}|Z_{k+1}=j) \\
]$$

其中i的取值就是定义的10个状态类型，也就是m个取值, 上式可以很容易转化为矩阵的乘法，只需要每次循环j即可,因为最后一项乘的是发射概率，这部分是固定的，可以先求前两部分，然后找出最大值，再乘以最后一部分

In [55]:
# 其中的数字表示发射矩阵每一列的单词的索引
obs = np.array([0, 2, 30, 40, 30, 20, 10, 50, 60, 1, 23]).astype(np.int32)

In [86]:
from numba import jit

def viterbi(obs, A, B, π):
    """Viterbi algorithm for solving the uncovering problem

    Args:
        π: Initial state distribution  of dimension M
        A: State transition probability matrix of dimension M*M
        B: Output probability matrix of dimension M x V, V is the length of dictionary
        obs: Observation sequence of length, each element is the index of B's column

    Returns:
        S_opt: Optimal state sequence of length
        D: Accumulated probability matrix
        E: Backtracking matrix
    """

    N = len(obs) # 句子长度
    M = A.shape[0] # 状态
    
    # initial DP arrary for store result, just like upper image
    dp = np.ones((M, N))
    
    dp[:, 0] = np.multiply(π, B[:,obs[0]])
    
    # E used to remember the max path index
    E = np.zeros((M, N-1)).astype(np.int32)
    for k in range(1, N):
        # 当前第K个位置，分别取如下状态的时候的最大
        for j in range(M): 
            temp_product = np.multiply(dp[:, k-1], A[:, j])
            dp[j, k] = np.max(temp_product) * B[j, obs[k]]
            E[j, k-1] = np.argmax(temp_product) # 此处矩阵中上一个位置保存的抵达当前位置最好的结果，因此后面倒序取值的时候当前时刻应该取后一个时刻的结果
            
    # Backtracking
    S_opt = np.zeros(N).astype(np.int32)
    
    # 获得最后一个最后一个时刻的概率最大的索引，这个索引对应的状态值
    S_opt[-1] = np.argmax(dp[:, -1])
    
    # 倒叙，从N-2个时刻，第一次会获取根据N-1， 倒序取值的时候当前时刻应该取后一个时刻的结果,因为当前时刻的值决定了下个时刻的最大概率的路径
    for n in range(N-2, 0, -1):
        S_opt[n] = E[int(S_opt[n+1]), n]
        
    return S_opt, dp, E

In [87]:
start = time.time()
opt, _, _ = viterbi(obs, A, B, π)
print(time.time() - start)

0.00234222412109375


In [64]:
opt

array([0, 5, 5, 6, 1, 1, 1, 1, 1, 1, 5], dtype=int32)