In [1]:
import numpy as np

In [2]:
def viterbi_decode(nodes, trans):
    """
    Viterbi算法求最优路径
    其中 nodes.shape=[seq_len, num_labels],
        trans.shape=[num_labels, num_labels].
    """
    # 获得输入状态序列的长度，以及观察标签的个数
    seq_len, num_labels = len(nodes), len(trans)
    # 简单起见，先不考虑发射概率，直接用起始0时刻的分数
    scores = nodes[0].reshape((-1, 1))

    paths = []
    # 递推求解上一时刻t-1到当前时刻t的最优
    for t in range(1, seq_len):
        # scores 表示起始0到t-1时刻的每个标签的最优分数
        scores_repeat = np.repeat(scores, num_labels, axis=1)
        # observe当前时刻t的每个标签的观测分数
        observe = nodes[t].reshape((1, -1))
        observe_repeat = np.repeat(observe, num_labels, axis=0)
        # 从t-1时刻到t时刻最优分数的计算，这里需要考虑转移分数trans
        M = scores_repeat + trans + observe_repeat
        # 寻找到t时刻的最优路径
        scores = np.max(M, axis=0).reshape((-1, 1))
        idxs = np.argmax(M, axis=0)
        # 路径保存
        paths.append(idxs.tolist())

    best_path = [0] * seq_len
    best_path[-1] = np.argmax(scores)
    # 最优路径回溯
    for i in range(seq_len - 2, -1, -1):
        idx = best_path[i + 1]
        best_path[i] = paths[i][idx]

    return best_path

In [3]:
trans = np.array([[0.1,0.4,0.5],[0.3,0.1,0.6],[0.7, 0.1,0.2]])
nodes = np.array([[0.8,0.1,0.1],[0.1,0.5,0.4],[0.2,0.6,0.2]])
print('trans\n', trans)
print('nodes\n', nodes)

trans
 [[0.1 0.4 0.5]
 [0.3 0.1 0.6]
 [0.7 0.1 0.2]]
nodes
 [[0.8 0.1 0.1]
 [0.1 0.5 0.4]
 [0.2 0.6 0.2]]


In [4]:
best_path = viterbi_decode(nodes, trans)
best_path

[0, 2, 0]

In [5]:
seq_len, num_labels = len(nodes), len(trans)
scores = nodes[0].reshape((-1, 1))
paths = []

In [6]:
scores

array([[0.8],
       [0.1],
       [0.1]])

In [7]:
t = 2
scores_repeat = np.repeat(scores, num_labels, axis=1)
# observe当前时刻t的每个标签的观测分数
observe = nodes[t].reshape((1, -1))
observe_repeat = np.repeat(observe, num_labels, axis=0)
# 从t-1时刻到t时刻最优分数的计算，这里需要考虑转移分数trans
M = scores_repeat + trans + observe_repeat
# 寻找到t时刻的最优路径
scores = np.max(M, axis=0).reshape((-1, 1))
idxs = np.argmax(M, axis=0)
# 路径保存
paths.append(idxs.tolist())

In [8]:
scores_repeat

array([[0.8, 0.8, 0.8],
       [0.1, 0.1, 0.1],
       [0.1, 0.1, 0.1]])

In [9]:
observe_repeat

array([[0.2, 0.6, 0.2],
       [0.2, 0.6, 0.2],
       [0.2, 0.6, 0.2]])

In [10]:
trans

array([[0.1, 0.4, 0.5],
       [0.3, 0.1, 0.6],
       [0.7, 0.1, 0.2]])

In [11]:
M

array([[1.1, 1.8, 1.5],
       [0.6, 0.8, 0.9],
       [1. , 0.8, 0.5]])

In [12]:
scores

array([[1.1],
       [1.8],
       [1.5]])

In [13]:
idxs

array([0, 0, 0], dtype=int64)

In [14]:
paths

[[0, 0, 0]]

In [15]:
np.max(M, axis=0)

array([1.1, 1.8, 1.5])

In [16]:
best_path = [0] * seq_len
best_path[-1] = np.argmax(scores)

In [17]:
best_path

[0, 0, 1]

In [18]:
12000/21.75

551.7241379310345

In [25]:
def viterbi_decode_v2(nodes, trans):
    """
    Viterbi算法求最优路径v2
    其中 nodes.shape=[seq_len, num_labels],
        trans.shape=[num_labels, num_labels].
    """
    seq_len, num_labels = len(nodes), len(trans)
    scores = nodes[0].reshape((-1, 1))
    paths = []
    # # 递推求解上一时刻t-1到当前时刻t的最优
    for t in range(1, seq_len):
        observe = nodes[t].reshape((1, -1))
        M = scores + trans + observe
        scores = np.max(M, axis=0).reshape((-1, 1))
        idxs = np.argmax(M, axis=0)
        paths.append(idxs.tolist())

    best_path = [0] * seq_len
    best_path[-1] = np.argmax(scores)
    print('paths: ', paths)
    print('best_path: ', best_path)
    # 最优路径回溯
    for i in range(seq_len-2, -1, -1):
        idx = best_path[i+1]
        best_path[i] = paths[i][idx]
        print(i)
        print(paths[i][idx])
    return best_path

In [26]:
best_path = viterbi_decode_v2(nodes, trans)
best_path

paths:  [[0, 0, 0], [2, 1, 1]]
best_path:  [0, 0, 0]
1
2
0
0


[0, 2, 0]

### 还有一种写法，最后不用回溯，每次把最优路径的索引都保存起来，并添加一个正常的路径，最后直接按索引找出最优路径

In [43]:
def viterbi_decode_v3(nodes, trans):
    """
    Viterbi算法求最优路径
    其中 nodes.shape=[seq_len, num_labels],
        trans.shape=[num_labels, num_labels].
    """
    seq_len, num_labels = len(nodes), len(trans)
    labels = np.arange(num_labels).reshape((1, -1))
    print('labels: \n', labels)
    scores = nodes[0].reshape((-1, 1))
    paths = labels
    for t in range(1, seq_len):
        observe = nodes[t].reshape((1, -1))
        M = scores + trans + observe
        scores = np.max(M, axis=0).reshape((-1, 1))
        idxs = np.argmax(M, axis=0)
        print('{} paths:'.format(t))
        print(paths)
        print('idxs:\n', idxs)
        paths = np.concatenate([paths[:, idxs], labels], 0)
    print('paths: \n', paths)
    best_path = paths[:, scores.argmax()]
    return best_path

In [44]:
best_path = viterbi_decode_v3(nodes, trans)
best_path

labels: 
 [[0 1 2]]
1 paths:
[[0 1 2]]
idxs:
 [0 0 0]
2 paths:
[[0 0 0]
 [0 1 2]]
idxs:
 [2 1 1]
paths: 
 [[0 0 0]
 [2 1 1]
 [0 1 2]]


array([0, 2, 0])

In [49]:
path = np.array([[0,0,0],[0,1,2]])
idx = [0,2]
path[:,idx]

array([[0, 0],
       [0, 2]])