# 隐马尔科夫模型

## 概述

状态间的转移仅依赖于前 n 个状态的过程被称为 n 阶马尔科夫模型。马尔可夫模型描述了一类重要的随机过程，其状态的转换与时间无关。

在马尔可夫模型中，每个状态代表了一个可观察的事件，所以，马尔可夫模型有时又称作可视马尔可夫模型（visibleMarkovmodel，VMM），这在某种程度上限制了模型的适应性。

对于盲人来说也许不能够直接获取到天气的观察情况，但是他可以通过触摸树叶通过树叶的干燥程度判断天气的状态。于是天气就是一个隐藏的状态，树叶的干燥程度是一个可观察的状态，于是我们就有了两组状态，一个是不可观察、隐藏的状态（天气），一个是可观察的状态（树叶），我们希望设计一种算法，在不能够直接观察天气的情况下，通过树叶和马尔可夫假设来预测天气。

以此为例，一个一阶的马尔可夫过程描述：

![](imgs/weather.png) 

在隐马尔可夫模型（HMM）中，我们不知道模型具体的状态序列，只知道状态转移的概率，即模型的状态转换过程是不可观察的。

因此，该模型是一个双重随机过程，包括模型的状态转换和特定状态下可观察事件的随机。

一般将一个隐马尔可夫模型记为：λ = [π, A, B]

需要确定以下三方面内容（三要素）：

1. 初始状态概率 π: 模型在初始时刻各状态出现的概率，通常记为π = (π_1,π_2,...,π_N)，表示模型的初始状态为 S_i 的概率。
2. 状态转移概率 A : 模型在各个状态间转换的概率，通常记为矩阵 A[a_ij]，其中 a_ij 表示在任意时刻 t，若状态为 S_i，则在下一时刻状态为 S_j 的概率。
3. 输出观测概率 B: 模型根据当前状态获得各个观测值的概率通常记为矩阵 B[b_ij]。其中，b_ij 表示在任意时刻 t，若状态为 S_i，则观测值 O_j 被获取的概率。

## 任务

使用隐马尔可夫模型(HMM)来分析一系列测量值。

在典型的手语识别系统中，我们可以在每一帧中观察说话人的左手、右手和鼻子的 XY 坐标。

将使用右手和右手拇指的 Y 坐标作为观测值来构造 HMM。在第 1 部分中，我们将构建一个一维模型，仅基于一系列右手 Y 坐标来识别单词；在第 2 部分中，我们将利用右手和右手拇指的 Y 坐标来识别单词。

识别的单词是 "ALLIGATOR"， "NUTS" 和 "SLEEP"。


### Part 1a: 构造 HMM

学习（Learning）——给定观测序列，得到一个隐马尔可夫模型。

确定每个单词的以下值:
1. 每个状态的转移概率
2. 每个状态的均值和方差

使用下表中的训练样本来构造 HMM，各参数需四舍五入到小数点后 3 位。

单词 | 帧数 | 观测序列 | 初始状态1 | 初始状态2 | 初始状态3
--- | --- | --- | --- | --- | --- 
ALLIGATOR | 17 | 31, 28, 28, 37, 68, 49, 64, 66, 22, 17, 53, 73, 81, 78, 48, 49, 47 | 31, 28, 28, 37, 68, 49 | 64, 66, 22, 17, 53, 73 | 81, 78, 48, 49, 47
ALLIGATOR | 10 | 25, 62, 75, 80, 75, 36, 74, 33, 27, 34 | 25, 62, 75, 80 | 75, 36, 74, 33 | 27, 34
ALLIGATOR | 16 | -4, 69, 59, 45, 62, 22, 17, 28, 12, 14, 24, 32, 39, 61, 35, 32 | -4, 69, 59, 45, 62, 22 | 17, 28, 12, 14, 24, 32 | 39, 61, 35, 32
NUTS | 11 | 45, 68, 62, 75, 61, 44, 73, 72, 71, 75, 55 | 45, 68, 62, 75 | 61, 44, 73, 72 | 71, 75, 55
NUTS | 18 | 33, 33, 32, 32, 34, 38, 43, 41, 35, 36, 36, 37, 38, 38, 39, 40, 38, 38 | 33, 33, 32, 32, 34, 38 | 43, 41, 35, 36, 36, 37 | 38, 38, 39, 40, 38, 38
NUTS | 19 | 33, 31, 29, 28, 25, 24, 25, 28, 28, 38, 37, 40, 37, 36, 36, 38, 44, 48, 48 | 33, 31, 29, 28, 25, 24, 25 | 28, 28, 38, 37, 40, 37, 36 | 36, 38, 44, 48, 48
SLEEP | 8 | 37, 35, 41, 39, 41, 38, 38, 38 | 37, 35, 41 | 39, 41, 38 | 38, 38
SLEEP | 12 | 22, 17, 18, 35, 33, 36, 42, 36, 41, 41, 37, 38 | 22, 17, 18, 35 | 33, 36, 42, 36 | 41, 41, 37, 38
SLEEP | 13 | 38, 37, 35, 32, 35, 13, 36, 41, 41, 31, 32, 34, 34 | 38, 37, 35, 32, 35 | 13, 36, 41, 41, 31 | 32, 34, 34

三个单词中的每一个（ALLIGATOR, NUTS 和 SLEEP）在 HMM 中都各有 3 个隐藏状态（即该 HMM 中共有 9 个状态）。

所有单词出现的概率都是相等的（1/3）。所有单词必须从状态1开始，只能过渡到下一个状态或停留在当前状态，如下图所示:

![](imgs/part_1_a_probs.png)

### 要点：

1. 对于每个隐藏状态，其观测结果可以认为是高斯分布的。

2. 检查每个状态的观测值离均值有多少个标准差，以比较一个观测值是接近于一个状态还是另一个状态。

3. 在计算每个状态的均值和方差之后，调整状态之间的边界。始终从边界左侧的第 1 个元素(LEFT)开始。如果 LEFT 元素更接近下一个状态，则将边界向左移动。如果 LEFT 元素应该停留在当前状态，则检查 边界右侧的第 1 个元素(RIGHT)。

4. 序列无论如何都需要有 3 个隐藏状态。在满足此条件的前提下才可以移动边界。

5. 若状态 S_i 在观测序列中有 m 个观测值，在第 m 个观测值后的下一时刻状态为 S_j， 可以认为 a_ij = 1/m。注意多个观测序列的情况。

In [6]:
import hmm_submission_test as tests

In [7]:
#export
import numpy as np
import operator

In [8]:
#export
def Boundary_detection(obs_len, left1, right1, left2, right2, index1, index2, mean1, mean2, mean3, std1, std2, std3):
    curindex1 = index1
    curindex2 = index2
    flag = False
    # 根据各个隐藏状态的均值和方差判定一个状态的边界值是否更靠近另一个状态
    if index1 > 1 and abs(left1-mean1)/std1 > abs(left1-mean2)/std2:
        index1 -= 1
        flag = True
    elif abs(right1-mean1)/std1 < abs(right1-mean2)/std2:
        index1 += 1
        flag = True
    
    if abs(left2-mean2)/std2 > abs(left2-mean3)/std3:
        index2 -= 1
        flag = True
    elif index2 < obs_len-1 and abs(right2-mean2)/std2 < abs(right2-mean3)/std3:
        index2 += 1
        flag = True
    
    # 每个隐藏状态至少要保留一个观察值
    if index1+1 > index2:
        index1 = curindex1
        index2 = curindex2
        flag = False
    return flag, index1, index2

def class_split(obs, cor_index = None):
    # 统计各个观察样本的长度
    obs_len = np.array([len(obj) for obj in obs])
    # 初始化3个隐藏状态的边界
    if cor_index is not None:
        index1, index2 = cor_index
    else:
        index1 = np.ceil(obs_len/3).astype(np.int16)
        index2 = 2*index1
    
    flag = True
    # 如果边界没有变化，则退出循环
    while flag:
        flag = False
        # 划分3个隐藏状态的数据
        s1 = obs[0][:index1[0]] + obs[1][:index1[1]] + obs[2][:index1[2]]
        s2 = obs[0][index1[0]:index2[0]] + obs[1][index1[1]:index2[1]] + obs[2][index1[2]:index2[2]]
        s3 = obs[0][index2[0]:] + obs[1][index2[1]:] + obs[2][index2[2]:]
        
        # 根据边界计算3个隐藏状态的均值
        mean1 = np.mean(s1)
        mean2 = np.mean(s2)
        mean3 = np.mean(s3)

        # 根据边界计算3个隐藏状态的方差
        std1 = np.std(s1)
        std2 = np.std(s2)
        std3 = np.std(s3)

        flags = [False, False, False] # 各个样本的边界是否发生过变化
        for i in range(3):
            curobs = obs[i]
            left1 = curobs[index1[i]-1]
            right1 = curobs[index1[i]]
            left2 = curobs[index2[i]-1]
            right2 = curobs[index2[i]]
            # 根据均值和方差重新划分边界
            flags[i], index1[i], index2[i] = Boundary_detection(obs_len[i], left1, right1, left2, right2, index1[i], index2[i], mean1, mean2, mean3, std1, std2, std3)
            flag = flag | flags[i]

    mean = np.around((mean1, mean2, mean3),3)
    std = np.around((std1, std2, std3),3)
    # 计算各个状态的切换概率
    index1 = sum(index1)
    index2 = sum(index2)
    p12 = np.around(3/index1, 3)
    p23 = np.around(3/(index2-index1), 3)
    p3e = np.around(3/(np.sum(obs_len)-index2), 3)
    # 切换概率矩阵，transition_probs[i][0]表示维持状态i的概率，transition_probs[i][1]表示状态i切换到下一状态的概率
    transition_probs = [[1-p12, p12], [1-p23, p23], [1-p3e, p3e]]
    return (mean, std, transition_probs)

def part_1_a():
    '''
    Returns:
        (
            {'A1': prob_of_starting_in_A1, 'A2': prob_of_starting_in_A2, ...},
            {'A1': {'A1': prob_of_transition_from_A1_to_A1,
                    'A2': prob_of_transition_from_A1_to_A2,
                    'A3': prob_of_transition_from_A1_to_A3,
                    'Aend': prob_of_transition_from_A1_to_Aend},
             'A2': {...}, ...},
            {'A1': tuple(mean_of_A1, standard_deviation_of_A1),
             'A2': tuple(mean_of_A2, standard_deviation_of_A2), ...},
            {'N1': prob_of_starting_in_N1, 'N2': prob_of_starting_in_N2, ...},
            {'N1': {'N1': prob_of_transition_from_N1_to_N1,
                    'N2': prob_of_transition_from_N1_to_N2,
                    'N3': prob_of_transition_from_N1_to_N3,
                    'Nend': prob_of_transition_from_N1_to_Nend},
             'N2': {...}, ...}
            {'N1': tuple(mean_of_N1, standard_deviation_of_N1),
             'N2': tuple(mean_of_N2, standard_deviation_of_N2), ...},
            {'S1': prob_of_starting_in_S1, 'S2': prob_of_starting_in_S2, ...},
            {'S1': {'S1': prob_of_transition_from_S1_to_S1,
                    'S2': prob_of_transition_from_S1_to_S2,
                    'S3': prob_of_transition_from_S1_to_S3,
                    'Send': prob_of_transition_from_S1_to_Send},
             'S2': {...}, ...}
            {'S1': tuple(mean_of_S1, standard_deviation_of_S1),
             'S2': tuple(mean_of_S2, standard_deviation_of_S2), ...} 
        )
    '''

    obs1 = [31, 28, 28, 37, 68, 49, 64, 66, 22, 17, 53, 73, 81, 78, 48, 49, 47]
    obs2 = [25, 62, 75, 80, 75, 36, 74, 33, 27, 34]
    obs3 = [-4, 69, 59, 45, 62, 22, 17, 28, 12, 14, 24, 32, 39, 61, 35, 32]

    mean, std, transition_probs = class_split([obs1, obs2, obs3])

    """Word ALLIGATOR"""
    a_prior_probs = {
        'A1': 0.333,
        'A2': 0.,
        'A3': 0.,
        'Aend': 0.
    }
    a_transition_probs = {
        'A1': {'A1': transition_probs[0][0], 'A3': 0., 'A2': transition_probs[0][1], 'Aend': 0.},
        'A2': {'A1': 0., 'A2': transition_probs[1][0], 'A3': transition_probs[1][1], 'Aend': 0.},
        'A3': {'A2': 0., 'A3': transition_probs[2][0], 'A1': 0., 'Aend': transition_probs[2][1]},
        'Aend': {'A1': 0., 'A3': 0., 'A2': 0., 'Aend': 1.}
    }
    # Parameters for end state is not required͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏︇͏️͏󠄆
    a_emission_paras = {
        'A1': (mean[0], std[0]),
        'A2': (mean[1], std[1]),
        'A3': (mean[2], std[2]),
        'Aend': (None, None)
    }

    obs1 = [45, 68, 62, 75, 61, 44, 73, 72, 71, 75, 55]
    obs2 = [33, 33, 32, 32, 34, 38, 43, 41, 35, 36, 36, 37, 38, 38, 39, 40, 38, 38]
    obs3 = [33, 31, 29, 28, 25, 24, 25, 28, 28, 38, 37, 40, 37, 36, 36, 38, 44, 48, 48]
    
    cor_index1 = [5, 16, 16]
    cor_index2 = [6, 17, 17]
    mean, std, transition_probs = class_split([obs1, obs2, obs3], (cor_index1, cor_index2))
    """Word NUTS"""
    n_prior_probs = {
        'N1': 0.333,
        'N2': 0.,
        'N3': 0.,
        'Nend': 0.
    }
    # Probability of a state changing to another state.͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏︇͏️͏󠄆
    n_transition_probs = {
        'N1': {'N3': 0., 'N1': transition_probs[0][0], 'N2': transition_probs[0][1], 'Nend': 0.},
        'N2': {'N3': transition_probs[1][1], 'N1': 0., 'N2': transition_probs[1][0], 'Nend': 0.},
        'N3': {'N3': transition_probs[2][0], 'N1': 0., 'N2': 0., 'Nend': transition_probs[2][1]},
        'Nend': {'N3': 0., 'N2': 0., 'N1': 0., 'Nend': 1.}
    }
    # Parameters for end state is not required͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏︇͏️͏󠄆
    n_emission_paras = {
        'N1': (mean[0], std[0]),
        'N2': (mean[1], std[1]),
        'N3': (mean[2], std[2]),
        'Nend': (None, None)
    }

    obs1 = [37, 35, 41, 39, 41, 38, 38, 38]
    obs2 = [22, 17, 18, 35, 33, 36, 42, 36, 41, 41, 37, 38]
    obs3 = [38, 37, 35, 32, 35, 13, 36, 41, 41, 31, 32, 34, 34]

    mean, std, transition_probs = class_split([obs1, obs2, obs3])
    """Word SLEEP"""
    s_prior_probs = {
        'S1': 0.333,
        'S2': 0.,
        'S3': 0.,
        'Send': 0.
    }
    s_transition_probs = {
        'S1': {'S2': transition_probs[0][1], 'S3': 0., 'S1': transition_probs[0][0], 'Send': 0.},
        'S2': {'S1': 0., 'S2': transition_probs[1][0], 'S3': transition_probs[1][1], 'Send': 0.},
        'S3': {'S2': 0., 'S1': 0., 'S3': transition_probs[2][0], 'Send': transition_probs[2][1]},
        'Send': {'S2': 0., 'S3': 0., 'S1': 0., 'Send': 1.}
    }
    # Parameters for end state is not required͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏︇͏️͏󠄆
    s_emission_paras = {
        'S1': (mean[0], std[0]),
        'S2': (mean[1], std[1]),
        'S3': (mean[2], std[2]),
        'Send': (None, None)
    }

    return (a_prior_probs, a_transition_probs, a_emission_paras,
            n_prior_probs, n_transition_probs, n_emission_paras,
            s_prior_probs, s_transition_probs, s_emission_paras)


tests.TestPart1a().test_prior(part_1_a)
tests.TestPart1a().test_a_emission(part_1_a)
tests.TestPart1a().test_n_emission(part_1_a)
tests.TestPart1a().test_s_emission(part_1_a)
tests.TestPart1a().test_a_transition(part_1_a)
tests.TestPart1a().test_n_transition(part_1_a)
tests.TestPart1a().test_s_transition(part_1_a)

UnitTest test_prior passed successfully!
UnitTest test_a_emission passed successfully!
UnitTest test_n_emission passed successfully!
UnitTest test_s_emission passed successfully!
UnitTest test_a_transition passed successfully!
UnitTest test_n_transition passed successfully!
UnitTest test_s_transition passed successfully!


### Part 1b: Viterbi 算法

解码（Decoding）——给定 HMM，即λ = [π, A, B]，以及某个观测序列，求隐藏状态的序列。

使用从第1a部分导出的 HMM 来构建 viterbi 网格。当提供观测序列(观察到的右手 Y 坐标列表)时，该函数将返回生成序列的最可能的状态序列以及该状态序列正确的概率。

例如, 当输入为序列 [38, 37, 35, 32, 35, 13, 36, 41, 41, 31, 32, 34, 34] 时，应当输出序列 ['S1', ... 'S2', ... 'S3']及该状态序列正确的概率。

如果找不到可能的状态序列，返回 `([], 0)` 。

为了简便起见，我们使用高斯分布的概率密度来表征模型根据当前状态获得各个观测值的概率。


In [9]:
#export
def gaussian_prob(x, para_tuple):
    if list(para_tuple) == [None, None]:
        return 0.0

    mean, std = para_tuple
    gaussian_percentile = (2 * np.pi * std**2)**-0.5 * \
                          np.exp(-(x - mean)**2 / (2 * std**2))
    return gaussian_percentile

In [10]:
#export
def viterbi(evidence_vector, states, prior_probs, transition_probs, emission_paras):
    if evidence_vector == []:
        return [], 0.
    
    n = len(evidence_vector)
    # 初始化第一个状态的概率(0.333*高斯概率)
    next_state_probabilities = prior_probs.copy()
    for state in states:
        # 计算高斯概率
        evidence_state_prob = gaussian_prob(evidence_vector[0], emission_paras[state])
        # 第一个状态的概率
        next_state_probabilities[state] *= evidence_state_prob
        
    survive_state = [] # 全过程状态转移路径(幸存路径)，用于回溯，survive_state[i][s]表示第i+1个时刻状态s所在路径的上一个状态
    for i in range(1, n):
        s = {}                         # 幸存路径
        evidence = evidence_vector[i]  # 当前观测值
        # 当前时刻抵达各状态的概率
        cur_state_probabilities = next_state_probabilities.copy() 
        
        # 当前观测值属于各状态的高斯概率
        evidence_state_probs = {}
        for state in states:
            next_state_probabilities[state] = 0. # 下一时刻抵达各状态的概率（预置0）
            evidence_state_prob = gaussian_prob(evidence, emission_paras[state])
            evidence_state_probs[state] = evidence_state_prob
        
        # 遍历当前时刻可以抵达的状态
        for cur_state in states:
            cur_state_prob = cur_state_probabilities[cur_state]
            if cur_state_prob == 0.:
                continue
            # 对于选中的当前状态，遍历其下一时刻可以抵达的状态
            for next_state in transition_probs[cur_state].keys():
                # 状态切换的概率
                state_change_prob = transition_probs[cur_state][next_state]
                # 计算从当前状态的路径抵达下一状态的概率(当前路径概率*切换概率*观测值属于下一状态的高斯概率)
                new_prob = cur_state_prob * state_change_prob * evidence_state_probs[next_state]
                # 选择所有抵达下一时刻各状态的路径中概率最大的作为幸存路径
                if new_prob > next_state_probabilities[next_state]:
                    next_state_probabilities[next_state] = new_prob
                    s[next_state] = cur_state
        
        # 向全过程幸存路径添加新的状态转移
        survive_state.append(s)
       
    sequence = [''] * n
    probability = 0.0
    # 选择幸存路径中概率最大的一条
    for state in states:
        new_prob = next_state_probabilities[state]
        if new_prob > probability:
            probability = new_prob
            sequence[-1] = state
    # 对选中的路径进行回溯，得到状态序列
    for i in range(n-2, -1, -1):
        sequence[i] = survive_state[i][sequence[i+1]]
    
    return sequence, probability

tests.TestPart1b().test_viterbi_case1(part_1_a, viterbi)
tests.TestPart1b().test_viterbi_case2(part_1_a, viterbi)
tests.TestPart1b().test_viterbi_case3(part_1_a, viterbi)
tests.TestPart1b().test_viterbi_realsample1(part_1_a, viterbi)
tests.TestPart1b().test_viterbi_realsample2(part_1_a, viterbi)
tests.TestPart1b().test_viterbi_realsample3(part_1_a, viterbi)

UnitTest test_viterbi_case1 passed successfully!
UnitTest test_viterbi_case2 passed successfully!
UnitTest test_viterbi_case3 passed successfully!
UnitTest test_viterbi_realsample1 passed successfully!
UnitTest test_viterbi_realsample2 passed successfully!
UnitTest test_viterbi_realsample3 passed successfully!


### Part 2a: 多维度 HMM

在 Part 1a中, 我们用右手 Y 轴坐标作为唯一的特征，现在我们要用两个特征——同样的右手 Y 轴坐标和右手拇指 Y 轴坐标，以提高我们的模型的准确性。

以下是右手拇指 Y 轴位置的各状态转移转移概率，以及各状态观测值的平均值和标准差。

![](imgs/part_2_a_probs_updated.png)

ALLIGATOR | State 1 | State 2 | State 3
--- | --- | --- | --- 
Mean | 53.529 | 40.769 | 51.000
Std | 17.493 | 6.104 | 12.316

NUTS | State 1 | State 2 | State 3
--- | --- | --- | --- 
Mean | 36.318 | 60.000 | 37.476
Std | 7.376 | 15.829 | 8.245

SLEEP | State 1 | State 2 | State 3
--- | --- | --- | --- 
Mean | 35.357 | 31.462 | 38.333
Std | 7.315 | 5.048 | 7.409


In [11]:
#export
def part_2_a():
    """Word ALLIGATOR"""
    a_prior_probs = {
        'A1': 0.333,
        'A2': 0.,
        'A3': 0.,
        'Aend': 0.
    }
    # example: {'A1': {'A1' : (right-hand Y, right-thumb Y), ... }͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏︇͏️͏󠄆
    a_transition_probs = {
        'A1': {'A1': (0.833, 0.824), 'A3': (0., 0.), 'A2': (0.167, 0.176), 'Aend': (0., 0.)},
        'A2': {'A1': (0., 0.), 'A3': (0.214, 0.231), 'A2': (0.786, 0.769), 'Aend': (0., 0.)},
        'A3': {'A2': (0., 0.), 'A1': (0., 0.), 'A3': (0.727, 0.769), 'Aend': (0.273, 0.231)},
        'Aend': {'A2': (0., 0.), 'A3': (0., 0.), 'A1': (0., 0.), 'Aend': (1., 1.)}
    }
    # example: {'A1': [(right-hand-mean, right-thumb-std), (right-hand--mean, right-thumb-std)] ...}͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏︇͏️͏󠄆
    a_emission_paras = {
        'A1': [(51.056, 21.986), (53.529, 17.493)],
        'A2': [(28.357, 14.936), (40.769, 6.104)],
        'A3': [(53.727, 16.707), (51.000, 12.316)],
        'Aend': [(None, None), (None, None)]
    }

    """Word NUTS"""
    n_prior_probs = {
        'N1': 0.333,
        'N2': 0.,
        'N3': 0.,
        'Nend': 0.
    }
    n_transition_probs = {
        'N1': {'N2': (0.081, 0.136), 'N3': (0., 0.), 'N1': (0.919, 0.864), 'Nend': (0., 0.)},
        'N2': {'N3': (1., 0.273), 'N1': (0., 0.), 'N2': (0., 0.727), 'Nend': (0., 0.)},
        'N3': {'N1': (0., 0.), 'N3': (0.625, 0.8), 'N2': (0., 0.), 'Nend': (0.375, 0.2)},
        'Nend': {'N1': (0., 0.), 'N2': (0., 0.), 'N3': (0., 0.), 'Nend': (1., 1.)}
    }
    n_emission_paras = {
        'N1': [(38.081, 11.175), (36.318, 7.376)],
        'N2': [(42.0, 2.828), (60.000, 15.829)],
        'N3': [(60.0, 13.491), (37.476, 8.245)],
        'Nend': [(None, None), (None, None)]
    }

    """Word SLEEP"""
    s_prior_probs = {
        'S1': 0.333,
        'S2': 0.,
        'S3': 0.,
        'Send': 0.
    }

    s_transition_probs = {
        'S1': {'S2': (0.375, 0.214), 'S3': (0., 0.), 'S1': (0.625, 0.786), 'Send': (0., 0.)},
        'S2': {'S1': (0., 0.), 'S3': (0.136, 0.231), 'S2': (0.864, 0.769), 'Send': (0., 0.)},
        'S3': {'S3': (0., 0.5), 'S1': (0., 0.), 'S2': (0., 0.), 'Send': (1., 0.5)},
        'Send': {'S3': (0., 0.), 'S1': (0., 0.), 'S2': (0., 0.), 'Send': (1., 1.)}
    }
    s_emission_paras = {
        'S1': [(29.5, 8.411), (35.357, 7.315)],
        'S2': [(36.182, 5.99), (31.462, 5.048)],
        'S3': [(36.667, 1.886), (38.333, 7.409)],
        'Send': [(None, None), (None, None)]
    }

    return (a_prior_probs, a_transition_probs, a_emission_paras,
            n_prior_probs, n_transition_probs, n_emission_paras,
            s_prior_probs, s_transition_probs, s_emission_paras)


tests.TestPart2a().test_prior(part_2_a)
tests.TestPart2a().test_a_emission(part_2_a)
tests.TestPart2a().test_n_emission(part_2_a)
tests.TestPart2a().test_s_emission(part_2_a)
tests.TestPart2a().test_a_transition(part_2_a)
tests.TestPart2a().test_n_transition(part_2_a)
tests.TestPart2a().test_s_transition(part_2_a)

UnitTest test_prior passed successfully!
UnitTest test_a_emission passed successfully!
UnitTest test_n_emission passed successfully!
UnitTest test_s_emission passed successfully!
UnitTest test_a_transition passed successfully!
UnitTest test_n_transition passed successfully!
UnitTest test_s_transition passed successfully!


### Part 2b: 多维度 Viterbi 算法

修改 `viterbi()` 函数，允许一个状态有多个观测值(右手和右拇指的 Y 位置)。假设每个状态的各个观测值的获取过程是独立的。


In [12]:
#export
def multidimensional_viterbi(evidence_vector, states, prior_probs, transition_probs, emission_paras):
    # 观测值是二维的，假设它们相互独立，其概率为两者之积，在viterbi函数基础上稍加修改即可
    if evidence_vector == []:
        return [], 0.
    
    n = len(evidence_vector)
    # 初始化第一个状态的概率(0.333*高斯概率)
    next_state_probabilities = prior_probs.copy()
    for state in states:
        # 计算高斯概率
        evidence_state_prob = gaussian_prob(evidence_vector[0][0], emission_paras[state][0]) * gaussian_prob(evidence_vector[0][1], emission_paras[state][1])
        # 第一个状态的概率
        next_state_probabilities[state] *= evidence_state_prob
        
    survive_state = [] # 全过程状态转移路径(幸存路径)，用于回溯，survive_state[i][s]表示第i+1个时刻状态s所在路径的上一个状态
    for i in range(1, n):
        s = {}                         # 幸存路径
        evidence = evidence_vector[i]  # 当前观测值
        # 当前时刻抵达各状态的概率
        cur_state_probabilities = next_state_probabilities.copy() 
        
        # 当前观测值属于各状态的高斯概率
        evidence_state_probs = {}
        for state in states:
            next_state_probabilities[state] = 0. # 下一时刻抵达各状态的概率（预置0）
            evidence_state_prob = gaussian_prob(evidence[0], emission_paras[state][0]) * gaussian_prob(evidence[1], emission_paras[state][1])
            evidence_state_probs[state] = evidence_state_prob
        
        # 遍历当前时刻可以抵达的状态
        for cur_state in states:
            cur_state_prob = cur_state_probabilities[cur_state]
            if cur_state_prob == 0.:
                continue
            # 对于选中的当前状态，遍历其下一时刻可以抵达的状态
            for next_state in transition_probs[cur_state].keys():
                # 状态切换的概率
                state_change_prob = transition_probs[cur_state][next_state][0] * transition_probs[cur_state][next_state][1]
                # 计算从当前状态的路径抵达下一状态的概率(当前路径概率*切换概率*观测值属于下一状态的高斯概率)
                new_prob = cur_state_prob * state_change_prob * evidence_state_probs[next_state]
                # 选择所有抵达下一时刻各状态的路径中概率最大的作为幸存路径
                if new_prob > next_state_probabilities[next_state]:
                    next_state_probabilities[next_state] = new_prob
                    s[next_state] = cur_state
        
        # 向全过程幸存路径添加新的状态转移
        survive_state.append(s)
       
    sequence = [''] * n
    probability = 0.0
    # 选择幸存路径中概率最大的一条
    for state in states:
        new_prob = next_state_probabilities[state]
        if new_prob > probability:
            probability = new_prob
            sequence[-1] = state
    # 对选中的路径进行回溯，得到状态序列
    for i in range(n-2, -1, -1):
        sequence[i] = survive_state[i][sequence[i+1]]
    
    return sequence, probability


tests.TestPart2b().test_viterbi_case1(part_2_a, multidimensional_viterbi)
tests.TestPart2b().test_viterbi_case2(part_2_a, multidimensional_viterbi)
tests.TestPart2b().test_viterbi_case3(part_2_a, multidimensional_viterbi)
tests.TestPart2b().test_viterbi_realsample1(part_2_a, multidimensional_viterbi)
tests.TestPart2b().test_viterbi_realsample2(part_2_a, multidimensional_viterbi)
tests.TestPart2b().test_viterbi_realsample3(part_2_a, multidimensional_viterbi)

UnitTest test_viterbi_case1 passed successfully!
UnitTest test_viterbi_case2 passed successfully!
UnitTest test_viterbi_case3 passed successfully!
UnitTest test_viterbi_realsample1 passed successfully!
UnitTest test_viterbi_realsample2 passed successfully!
UnitTest test_viterbi_realsample3 passed successfully!
