<a href="https://colab.research.google.com/github/Allen123321/DEMO-DL/blob/master/HMM_Baum_Welch_%E7%AE%97%E6%B3%95_%E5%92%8C_viterbi%E7%AE%97%E6%B3%95.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np

def convert_obs_seq_2_index(Q, index=None):
    """
    将观测序列转换为观测值的索引值
    Q:是输入的观测序列
    """
    if index is not None:
        cht = Q[index]
        if "黑" == cht:
            return 1
        else:
            return 0
    else:
        result = []
        for q in Q:
            if "黑" == q:
                result.append(1)
            else:
                result.append(0)
        return result

In [2]:
# 前向算法
def calc_alpha(pi, A, B, Q, alpha, fetch_index_by_obs_seq=None):
    """
    计算前向概率α的值
    pi：初始的随机概率值
    A：状态转移矩阵
    B: 状态和观测值之间的转移矩阵
    Q: 观测值列表
    alpha：前向概率alpha矩阵
    fetch_index_by_obs_seq: 根据序列获取对应索引值，可以为空
    NOTE:
        1. ord函数的含义是将一个单个的字符转换为数字, eg: ord('a') = 97; ord('中')=20013；底层其实是将字符转换为ASCII码；
        2. 最终会直接更新参数中的alpha对象
    """
    # 0. 初始化
    fetch_index_by_obs_seq_f = fetch_index_by_obs_seq
    if fetch_index_by_obs_seq_f is None:
        fetch_index_by_obs_seq_f = lambda obs, obs_index: ord(obs[obs_index])

    # 1. 初始一个状态类别的顺序
    n = len(A)
    n_range = range(n)

    # 2. 更新初值(t=1)
    for i in n_range:
        alpha[0][i] = pi[i] * B[i][fetch_index_by_obs_seq_f(Q, 0)]

    # 3. 迭代更新其它时刻
    T = len(Q)
    tmp = [0 for i in n_range]
    for t in range(1, T):
        for i in n_range:
            # 1. 计算上一个时刻t-1累积过来的概率值
            for j in n_range:
                tmp[j] = alpha[t - 1][j] * A[j][i]

            # 2. 更新alpha的值
            alpha[t][i] = np.sum(tmp) * B[i][fetch_index_by_obs_seq_f(Q, t)]




In [3]:
#后向算法
def calc_beta(pi, A, B, Q, beta, fetch_index_by_obs_seq=None):
    """
    计算后向概率β的值
    pi：初始的随机概率值
    A：状态转移矩阵
    B: 状态和观测值之间的转移矩阵
    Q: 观测值列表
    beta：后向概率beta矩阵
    fetch_index_by_obs_seq: 根据序列获取对应索引值，可以为空
    NOTE:
        1. ord函数的含义是将一个单个的字符转换为数字, eg: ord('a') = 97; ord('中')=20013；底层其实是将字符转换为ASCII码；
        2. 最终会直接更新参数中的beta对象
    """
    # 0. 初始化
    fetch_index_by_obs_seq_f = fetch_index_by_obs_seq
    if fetch_index_by_obs_seq_f is None:
        fetch_index_by_obs_seq_f = lambda obs, obs_index: ord(obs[obs_index])

    # 1. 初始一个状态类别的顺序
    n = len(A)
    n_range = range(n)
    T = len(Q)

    # 2. 更新初值(t=T)
    for i in n_range:
        beta[T - 1][i] = 1

    # 3. 迭代更新其它时刻
    tmp = [0 for i in n_range]
    for t in range(T - 2, -1, -1):
        for i in n_range:
            # 1. 计算到下一个时刻t+1的概率值
            for j in n_range:
                tmp[j] = A[i][j] * beta[t + 1][j] * B[j][fetch_index_by_obs_seq_f(Q, t + 1)]

            # 2. 更新beta的值
            beta[t][i] = np.sum(tmp)


In [3]:
pi = np.array([0.2, 0.5, 0.3])
A = np.array([
    [0.5, 0.4, 0.1],
    [0.2, 0.2, 0.6],
    [0.2, 0.5, 0.3]
    ])
B = np.array([
    [0.4, 0.6],
    [0.8, 0.2],
    [0.5, 0.5]
    ])
Q = '白黑白白黑'
alpha = np.zeros((len(Q), len(A)))
# 开始计算
calc_alpha(pi, A, B, Q, alpha, convert_obs_seq_2_index)
# 输出最终结果
print(alpha)

# 计算最终概率值：
p = 0
for i in alpha[-1]:

  p += i
print(Q, end="->出现的概率为:")
print(p)

[[0.08       0.4        0.15      ]
 [0.09       0.0374     0.1465    ]
 [0.032712   0.093384   0.037695  ]
 [0.01702872 0.04048728 0.03530505]
 [0.0142037  0.00651229 0.01829338]]
白黑白白黑->出现的概率为:0.03900936690000001


In [6]:
beta = np.zeros((len(Q), len(A)))
# 开始计算
calc_beta(pi, A, B, Q, beta,convert_obs_seq_2_index)
# 输出最终结果
print(beta)

# 计算最终概率值：
p1 = 0
for i in range(len(A)):
  p1 += pi[i] * B[i][convert_obs_seq_2_index(Q, 0)] * beta[0][i]

print(Q, end="->出现的概率为:")
print(p1)

[[0.05866323 0.06623394 0.05215155]
 [0.134115   0.137346   0.148821  ]
 [0.2517     0.219      0.2739    ]
 [0.43       0.46       0.37      ]
 [1.         1.         1.        ]]
白黑白白黑->出现的概率为:0.03900936690000001


In [4]:
"""计算gama值,即给定模型lambda和观测序列Q的时候，时刻t对应状态i的概率值"""
def calc_gamma(alpha, beta, gamma):
    """
    根据alphe和beta的值计算gamma值
    最终结果保存在gamma矩阵中
    """
    T = len(alpha)
    n_range = range(len(alpha[0]))
    tmp = [0 for i in n_range]
    for t in range(T):
        # 累加t时刻对应的所有状态值的前向概率和后向概率，从而计算分母
        for i in n_range:
            tmp[i] = alpha[t][i] * beta[t][i]
        sum_alpha_beta_of_t = np.sum(tmp)

        # 更新gamma值
        for i in n_range:
            gamma[t][i] = tmp[i] / sum_alpha_beta_of_t


In [10]:
gamma = np.zeros((len(Q), len(A)))

# 3. 计算gamm矩阵
calc_gamma(alpha, beta, gamma)
# 输出最终结果
print("gamma矩阵:")
print(gamma)

# 选择每个时刻最大的概率作为预测概率
print("各个时刻最大概率的盒子为:", end='')
index = ['盒子1', '盒子2', '盒子3']
for p in gamma:
  print(index[p.tolist().index(np.max(p))], end="\t")

gamma矩阵:
[[0.12030594 0.67915934 0.20053472]
 [0.30942184 0.13167967 0.5588985 ]
 [0.21106752 0.52426116 0.26467132]
 [0.18770747 0.47742761 0.33486492]
 [0.36410987 0.1669418  0.46894833]]
各个时刻最大概率的盒子为:盒子2	盒子3	盒子2	盒子2	盒子3	

In [5]:
"""计算两个连续状态的联合概率值可西/可赛值"""

def calc_ksi(alpha, beta, A, B, Q, ksi, fetch_index_by_obs_seq=None):
    """
    计算时刻t的时候状态为i，时刻t+1的时候状态为j的联合概率ksi
    alpha：对应的前向概率值
    beta：对应的后向概率值
    A：状态转移矩阵
    B: 状态和观测值之间的转移矩阵
    Q: 观测值列表
    ksi：待求解的ksi矩阵
    fetch_index_by_obs_seq: 根据序列获取对应索引值的函数，可以为空
    NOTE:
        1. ord函数的含义是将一个单个的字符转换为数字, eg: ord('a') = 97; ord('中')=20013；底层其实是将字符转换为ASCII码；
        2. 最终会直接更新参数中的ksi矩阵
    """
    # 0. 初始化
    # 初始化序列转换为索引的方法
    fetch_index_by_obs_seq_f = fetch_index_by_obs_seq
    if fetch_index_by_obs_seq_f is None:
        fetch_index_by_obs_seq_f = lambda obs, obs_index: ord(obs[obs_index])

    # 初始化相关的参数值: n、T
    T = len(alpha)
    n = len(A)

    # 1. 开始迭代更新
    n_range = range(n)
    tmp = np.zeros((n, n))

    for t in range(T - 1):
        # 1. 计算t时刻状态为i，t+1时刻状态为j的概率值
        for i in n_range:
            for j in n_range:
                tmp[i][j] = alpha[t][i] * A[i][j] * B[j][fetch_index_by_obs_seq_f(Q, t + 1)] * beta[t + 1][j]

        # 2. 计算t时候的联合概率和
        sum_pro_of_t = np.sum(tmp)

        # 2. 计算时刻t时候的联合概率ksi
        for i in n_range:
            for j in n_range:
                ksi[t][i][j] = tmp[i][j] / sum_pro_of_t


In [11]:
T = len(Q)
n = len(A)
ksi = np.zeros((T - 1, n, n))
print(ksi)

[[[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]]


In [14]:
# 4. 计算ksi矩阵
calc_ksi(alpha, beta, A, B, Q, ksi, convert_obs_seq_2_index)
# 输出最终结果
print("ksi矩阵:")
print(ksi)

ksi矩阵:
[[[0.08251249 0.02253342 0.01526003]
  [0.16502498 0.05633355 0.45780082]
  [0.06188437 0.0528127  0.08583765]]

 [[0.11614134 0.16168424 0.03159626]
  [0.01930527 0.03359439 0.07878   ]
  [0.07562091 0.32898252 0.15429506]]

 [[0.07211683 0.12343718 0.0155135 ]
  [0.0823497  0.17619005 0.26572141]
  [0.03324094 0.17780037 0.05363   ]]

 [[0.1309587  0.03492232 0.02182645]
  [0.12454633 0.04151544 0.31136583]
  [0.10860484 0.09050403 0.13575605]]]


In [6]:
"""进行HMM参数学习的算法"""
def baum_welch(pi, A, B, Q, max_iter=3, fetch_index_by_obs_seq=None):
    """
    根据传入的初始概率矩阵(pi、A、B)以及观测序列Q，使用baum_welch算法进行迭代求解最终的pi、A、B的值；最大迭代次数默认为3；最终更新结果保存在传入的参数矩阵中(pi\A\B)
    """
    # 0. 初始化相关变量
    # 初始化序列转换为索引的方法
    fetch_index_by_obs_seq_f = fetch_index_by_obs_seq
    if fetch_index_by_obs_seq_f is None:
        fetch_index_by_obs_seq_f = lambda obs, obs_index: ord(obs[obs_index])

    # 初始化相关的参数值: n、m、T
    T = len(Q)
    n = len(A)
    m = len(B[0])
    alpha = np.zeros((T, n))
    beta = np.zeros((T, n))
    gamma = np.zeros((T, n))
    ksi = np.zeros((T - 1, n, n))
    n_range = range(n)
    m_range = range(m)
    t_range = range(T)
    t_1_range = range(T - 1)

    # 1. 迭代更新(EM算法思想类型)
    for time in range(max_iter):
        # a. 在当前的pi，A，B的情况下对观测序列Q分别计算alpha、beta、gamma和ksi
        calc_alpha(pi, A, B, Q, alpha, fetch_index_by_obs_seq_f)
        calc_beta(pi, A, B, Q, beta, fetch_index_by_obs_seq_f)
        calc_gamma(alpha, beta, gamma)
        calc_ksi(alpha, beta, A, B, Q, ksi, fetch_index_by_obs_seq_f)

        # b. 更新pi、A、B的值
        # b.1. 更新pi值
        for i in n_range:
            pi[i] = gamma[0][i]

        # b.2. 更新状态转移矩阵A的值
        tmp1 = np.zeros(T - 1)
        tmp2 = np.zeros(T - 1)
        for i in n_range:
            for j in n_range:
                # 获取所有时刻从状态i转移到状态j的值
                for t in t_1_range:
                    tmp1[t] = ksi[t][i][j]
                    tmp2[t] = gamma[t][i]

                # 更新状态i到状态j的转移概率
                A[i][j] = np.sum(tmp1) / np.sum(tmp2)

        # b.3. 更新状态和观测值之间的转移矩阵
        for i in n_range:
            for k in m_range:
                tmp1 = np.zeros(T)
                tmp2 = np.zeros(T)
                # 获取所有时刻从状态i转移到观测值k的概率和
                number = 0
                for t in t_range:
                    if k == fetch_index_by_obs_seq_f(Q, t):
                        # 如果序列Q中时刻t对应的观测值就是k，那么进行统计这个时刻t为状态i的概率值
                        tmp1[t] = gamma[t][i]
                        number += 1

                    tmp2[t] = gamma[t][i]

                # 更新状态i到观测值k之间的转移概率
                if number == 0:
                    # 没有转移，所以为0
                    B[i][k] = 0
                else:
                    # 有具体值，那么进行更新操作
                    B[i][k] = np.sum(tmp1) / np.sum(tmp2)


In [10]:
np.random.seed(10)
pi = np.random.randint(1, 10, 3)
pi = pi / np.sum(pi)
A = np.random.randint(1, 10, (3, 3))
A = A / np.sum(A, axis=1).reshape((-1, 1))
B = np.random.randint(1, 5, (3, 2))
B = B / np.sum(B, axis=1).reshape((-1, 1))


# pi = np.array([0.2, 0.5, 0.3])
# A = np.array([
#     [0.5, 0.4, 0.1],
#     [0.2, 0.2, 0.6],
#     [0.2, 0.5, 0.3]
#     ])
# B = np.array([
#     [0.4, 0.6],
#     [0.8, 0.2],
#     [0.5, 0.5]
#     ])
# Q = '白黑白白黑'

# print("随机初始的概率矩阵:")
# print("pi初始状态选择矩阵:")
# print(pi)
# print("\n状态转移矩阵A:")
# print(A)
# print("\n状态和观测值之间的转移矩阵B:")
# print(B)



# 开始计算
baum_welch(pi, A, B, Q, max_iter=10, fetch_index_by_obs_seq=convert_obs_seq_2_index)

# 输出最终结果
print("\n\n\n最终计算出来的结果:")
print("pi初始状态选择矩阵:")
print(pi)
print("\n状态转移矩阵A:")
print(A)
print("\n状态和观测值之间的转移矩阵B:")
print(B)




最终计算出来的结果:
pi初始状态选择矩阵:
[9.99998920e-01 1.07968223e-06 9.02395088e-53]

状态转移矩阵A:
[[2.03184731e-22 5.18727102e-16 1.00000000e+00]
 [9.99846221e-01 7.63506021e-05 7.74281358e-05]
 [2.76117461e-28 1.00000000e+00 7.49179156e-34]]

状态和观测值之间的转移矩阵B:
[[9.99999999e-01 5.32975241e-10]
 [1.00000000e+00 4.99488039e-10]
 [2.74494329e-20 1.00000000e+00]]


In [11]:
"""实现viterbi算法"""

def viterbi(pi, A, B, Q, fetch_index_by_obs_seq=None):
    """计算观测序列"""
    # 0. 初始化
    # 初始化序列转换为索引的方法
    fetch_index_by_obs_seq_f = fetch_index_by_obs_seq
    if fetch_index_by_obs_seq_f is None:
        fetch_index_by_obs_seq_f = lambda obs, obs_index: ord(obs[obs_index])

    # 初始化相关的参数值: n、m、T
    T = len(Q)
    n = len(A)
    n_range = range(n)
    # 存储delta值
    delta = np.zeros((T, n))
    # 存储状态值，pre_index[t][i]表示t时刻的状态为i，上一个最优状态(delta值最大的)为pre_index[t][i]
    pre_index = np.zeros((T, n), dtype=np.int)

    # 1. 计算t=1的时候delta的值
    for i in n_range:
        delta[0][i] = pi[i] * B[i][fetch_index_by_obs_seq_f(Q, 0)]

    # 2. 更新其它时刻的值
    for t in range(1, T):
        for i in n_range:
            # 当前时刻t的状态为i
            # a. 获取最大值
            max_delta = -1
            for j in n_range:
                # j表示的是上一个时刻的状态值
                tmp = delta[t - 1][j] * A[j][i]
                if tmp > max_delta:
                    max_delta = tmp
                    pre_index[t][i] = j

            # b. 更新值
            delta[t][i] = max_delta * B[i][fetch_index_by_obs_seq_f(Q, t)]

    # 3. 解码操作，查找到最大的结果值
    decode = [-1 for i in range(T)]
    # 先找最后一个时刻的delta最大值
    max_delta_index = 0
    for i in range(1, n):
        if delta[T - 1][i] > delta[T - 1][max_delta_index]:
            max_delta_index = i
    decode[T - 1] = max_delta_index
    # 再根据转移的路径(最大转移路径), 找出最终的链路
    for t in range(T - 2, -1, -1):
        max_delta_index = pre_index[t + 1][max_delta_index]
        decode[t] = max_delta_index
    return decode

In [12]:
pi = np.array([0.2, 0.5, 0.3])
A = np.array([
    [0.5, 0.4, 0.1],
    [0.2, 0.2, 0.6],
    [0.2, 0.5, 0.3]
    ])
B = np.array([
    [0.4, 0.6],
    [0.8, 0.2],
    [0.5, 0.5]
    ])
Q = '白黑白白黑'

# 开始计算
state_seq = viterbi(pi, A, B, Q, convert_obs_seq_2_index)
print("最终结果为:", end='')
print(state_seq)
state = ['盒子1', '盒子2', '盒子3']
for i in state_seq:
  print(state[i], end='\t')

最终结果为:[1, 2, 1, 1, 2]
盒子2	盒子3	盒子2	盒子2	盒子3	