### 隐马尔可夫链（参考李航的《统计学习方法》）
隐马尔可夫链三类问题：
1. 概率计算问题:给定模型和观测序列，计算在该模型下观测序列出现的概率
2. 学习问题：一致观测序列，估计模型参数
3. 预测问题：已知模型和观测序列，给定的观测序列，求最有可能对应的状态序列
#### EM 算法
1.EM算法分为两步，E步和M 步
<p> E步：
<p>求Q函数 ： $Q(\lambda,{\lambda}^i) = E_I[logP(O,I|\lambda)|O,{\lambda^i}]$
<p> M步：
<p> 更新参数使得Q最大
    
注：本代码关注与马尔科夫链学习问题，即知道观测序列$O = (o_1,o_2,...,o_T) $,估计参数模型 $\lambda = (A,B,\pi)$
<p> A:转移矩阵 B：状态矩阵 $\pi:$初始状态概率向量

In [1]:
import numpy as np
import random
#import os
class Markov():
    def __init__(self, I, A, B):
        self.I = I
        self.A = A
        self.B = B 
     #前向算法   
    def forward(self,I,A,B,P):
        #计算初值
        T = P.shape[0]
        N = I.shape[0]
        a = np.zeros((T, N))
        '''
        for i in range(I.shape[0]):
            a[0][i] = I[i] * B[i][0]
        '''
        a[0,:] = (I*B[:,0]).T
        #递推计算
        for j in range(T - 1):
            for k in range((I.shape[0])):
                #print(a[j,:])
                #print(self.A[:,j])
                #os.system("pause")
                #f = np.dot(a[j,:],self.A[:,k])
                #print(f)
                a[j+1][k] = np.dot(a[j,:],A[:,k]) * B[k][P[j+1]]
        P_o = np.sum(a[T-1])
        return P_o,a
    
    #后向算法
    def backward(self, I,A,B,P):
        T = P.shape[0]
        N = I.shape[0]
        b_t = np.zeros((T,N))
        b_t[T-1] = np.ones((1,N))
        temp  = list(range(T-1))
        temp.sort(reverse = True)
        for t in temp:
            for i in range(N):
                pp = A[i] * b_t[t+1]
                b_t[t][i] = np.dot(pp, B[:,P[t+1]])
                
        tt = I * (B[:,0].T) * b_t[0]
        P_o_b = np.sum(tt)
        return P_o_b,b_t
    
    #生成观测序列
    def generate_O(self,T):
        O = []
        # 确定初始观测位置
        index = self.rand_(P_array = self.I)
        pr = self.B[index,:]
        # 确定初始观测结果
        O_result= self.rand_(P_array = pr)
        O.append(O_result)
        
        # 状态转移
        for t in range(1,T):
            P_a = self.A[index,:]
            index_1 = self.rand_(P_array = P_a)
            pr_1 = self.B[index_1,:]
            O_result_1 = self.rand_(P_array = pr_1)
            O.append(O_result_1)
            index = index_1
        return O
    # 确定随机状态  
    def rand_(self,P_array):
        L = P_array.shape[0]
        p = np.random.uniform()
        for i in range(L):
            if i == 0:
                if p <= P_array[0]:
                    index = 0
                    break
            else:
                if p <= np.sum(P_array[range(i+1)]) and p > np.sum(P_array[range(i)]):
                    index = i
                    break
        return index
    #计算t处于状态qi的概率
    #计算t 处于qi t+1 处于 qj 的概率
    def Ca_Gamma(self,I,A,B,O:list):
        T = len(O)
        O = np.array(O)
        P_o,a = self.forward(I,A,B,O)
        P_o_b,b = self.backward(I,A,B,O)
        gamma = np.zeros(a.shape)
        temp_ = list(A.shape)
        temp = []
        temp.append(T-1)
        temp.extend(temp_)
        
        #tmep = list(T-1).append(temp)
        ca_t = np.zeros(temp)
        #计算t处于状态qi的概率
        for t in range(T):
            for i in range(a.shape[1]):
                gamma[t,i] = a[t,i]*b[t,i]/np.dot(a[t,:],b[t,:])
        self.gamma = gamma
        
        #计算t 处于qi t+1 处于 qj 的概率
        for t in range(T-1):
            temp = np.zeros(A.shape)
            for i in range(A.shape[0]):
                for j in range(A.shape[1]):
                    temp[i,j] = a[t,i]*A[i,j]*B[j,O[t+1]]*b[t+1,j]
            ca_t[t,:,:] = temp/(np.sum(temp))
        self.ca_t = ca_t
        return gamma , ca_t

    def Baum_welch(self,O,N,M,iter_=100):
        #N:初始状态个数
        #M:每个状态中的变化个数
        #例如：有4个盒子，每个盒子中有3中球，则N = 4， M = 3
        
        #将所有状态进行标号
        V = list(range(M))
        
        #确定观测长度
        T = len(O)
        #参数初始化
   
        I = np.array(random.sample(range(N,N+10),N))
        I = I/np.sum(I)
        
        A = np.abs(np.random.randn(N,N))
        s = np.sum(A,1)
        A = np.array([A[i,:]/s[i] for i in range(A.shape[0])])
        
        B = np.abs(np.random.randn(N,M))
        sb = np.sum(B,1)
        B = np.array([B[i,:]/sb[i] for i in range(B.shape[0])])
        #print("B.shape = ",B.shape)
    
        #I = np.array([1/N]*N)
        #A = np.ones((N,N))*1/N
        #B = np.ones((N,M))*1/M
        print("开始迭代")
        #迭代
        while(iter_ > 0):
            iter_ =  iter_ - 1
            if iter_%10 == 0:
                print("---------------------",iter_)
            gamma , ca_t = self.Ca_Gamma(I,A,B,O)
            #更新初始状态
            I_new = np.copy(gamma[0,:])

            #更新初始状态 转移矩阵
            A_new = np.zeros(A.shape)
            for i in range(A.shape[0]):
                for j in range(A.shape[0]):
                    tt = 0
                    for t in range(T-1):
                        tt += ca_t[t,i,j]
                    A_new[i,j] = tt/np.sum(gamma[0:T-1,i])  

            # 更新观测概率矩阵
            B_new = np.zeros(B.shape)
            for j in range(N):
                for k in range(M):
                     tt1 = 0 
                     for t in range(T):
                        if O[t] == V[k]:  
                            tt1 += gamma[t,j]
                     B_new[j,k] = tt1/np.sum(gamma[:,j])
            
            I = np.copy(I_new)     
            A = np.copy(A_new)
            B = np.copy(B_new)
            self.I_p = np.copy(I)
            self.A_p = np.copy(A)
            self.B_p = np.copy(B)

In [14]:
# 测试前向/后向 计算方式
I = np.array([0.2, 0.4, 0.4])
A = np.array([[0.5, 0.2, 0.3],[0.3, 0.5, 0.2],[0.2, 0.3, 0.5]])
B = np.array([[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]])
m = Markov(I,A,B)
#P = np.array([0, 1,0])
P = np.array([1, 0, 1, 0, 1, 1])
P_o,a = m.forward(I,A,B,P)
P_o_b,b = m.backward(I,A,B,P)
print(a.shape)
print(a)
print(b.shape)
print(b)

(6, 3)
[[0.1        0.16       0.28      ]
 [0.077      0.0736     0.1414    ]
 [0.04443    0.056772   0.032556  ]
 [0.0228789  0.01881552 0.02867298]
 [0.01140935 0.01355126 0.00748898]
 [0.00563392 0.00678252 0.00296326]]
(6, 3)
[[0.02836522 0.02649512 0.02965701]
 [0.05320483 0.05776176 0.04972891]
 [0.116032   0.110117   0.122031  ]
 [0.2149     0.2478     0.2023    ]
 [0.46       0.51       0.43      ]
 [1.         1.         1.        ]]


In [8]:
# 测试生成序列
I = np.array([0.2, 0.4, 0.4])
A = np.array([[0.5, 0.2, 0.3],[0.3, 0.5, 0.2],[0.2, 0.3, 0.5]])
B = np.array([[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]])
T = 3
m = Markov(I,A,B)
z = m.generate_O(6)
print(z)

[1, 0, 1, 0, 1, 1]


In [35]:
# EM 算法 估计参数
I = np.array([0.2, 0.4, 0.4])
A = np.array([[0.5, 0.2, 0.3],[0.3, 0.5, 0.2],[0.2, 0.3, 0.5]])
B = np.array([[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]])
m = Markov(I,A,B)
O = m.generate_O(20)
#O_ = np.array([0, 1,0])
print(O)
m.Baum_welch(O,3,2)
print("I = ",m.I_p)
print("A = ",m.A_p)
print("B = ",m.B_p)
#print(m.ca_t)

[0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1]
开始迭代
--------------------- 90
--------------------- 80
--------------------- 70
--------------------- 60
--------------------- 50
--------------------- 40
--------------------- 30
--------------------- 20
--------------------- 10
--------------------- 0
I =  [0. 1. 0.]
A =  [[4.94945619e-085 1.00000000e+000 6.82162060e-131]
 [1.42857143e-001 9.77670179e-092 8.57142857e-001]
 [1.00000000e+000 0.00000000e+000 5.74030455e-050]]
B =  [[4.28571429e-001 5.71428571e-001]
 [1.00000000e+000 6.83901812e-147]
 [2.51600235e-058 1.00000000e+000]]


In [19]:
import numpy as np
I = np.array([0.2, 0.4, 0.4])
I.shape[0]
p = np.random.uniform()
p = p*np.ones((3,1))
#I = np.expand_dims(I,axis = 1)
I = I[:,np.newaxis]
t = I - p
ind = np.where(t<=0)
print(p)
print(t)
print(ind)
print(I.shape[0])

[[0.98747748]
 [0.98747748]
 [0.98747748]]
[[-0.78747748]
 [-0.58747748]
 [-0.58747748]]
(array([0, 1, 2], dtype=int64), array([0, 0, 0], dtype=int64))
3


In [57]:
A = np.array([[0.5, 0.2, 0.3],[0.3, 0.5, 0.2],[0.2, 0.3, 0.5]])
B =  np.array([[[0.5, 0.2, 0.3],[0.3, 0.5, 0.2]],[[0.5, 0.2, 0.3],[0.3, 0.5, 0.2]]])
c = np.sum(B,0)
print(c)

[[1.  0.4 0.6]
 [0.6 1.  0.4]]


In [21]:
a = 1
b = []
b.append(a)
a = 2
print(b)

[1]


In [19]:
I = np.array([0.2, 0.4, 0.4])

A = np.array([[0.5, 0.2, 0.3],[0.3, 0.5, 0.2],[0.2, 0.3, 0.5]])
A[2,:] = (A[:,0]*I).T
print(A[2,:])

[0.1  0.12 0.08]


In [38]:
import random
import numpy as np
p = np.array(random.sample(range(1,9),3))
np.random.randn(3,4)

array([[ 0.72623315,  0.36890259, -0.20474817,  1.05738532],
       [ 0.73757077,  1.10612743,  0.10633356, -2.21435742],
       [-0.4985321 , -0.36497318,  0.38486569, -0.16381695]])

In [53]:
A = np.abs(np.random.randn(3,3))
s = np.sum(A,1)
A = np.array([A[i,:]/s[i] for i in range(A.shape[0])])
print(type(A))
print(A)

<class 'numpy.ndarray'>
[[0.3509054  0.20060274 0.44849186]
 [0.86302235 0.07181622 0.06516143]
 [0.25463415 0.45597277 0.28939309]]


In [52]:
np.sum(A,1)

array([1., 1., 1.])