# 隐马可夫模型

例10.2 考虑盒子和求模型$\lambda=(A,B,\pi)$，状态集合$Q=\{1,2,3\}$，观测集合$V=\{红，白\}$。

In [1]:
import numpy as np
np.set_printoptions(suppress=True, threshold=np.inf,precision=10)

A = np.array([ # 状态转移概率分布
    [.5,.2,.3],
    [.3,.5,.2],
    [.2,.3,.5]
])
B = np.array([ # 观测概率分布
    [.5,.5],
    [.4,.6],
    [.7,.3]
])
pi = np.array([ # 初始概率分布 => 状态的分布
    .2,.4,.4
])

O = [0,1,0,1] # 1表示红，0表示白 # 观测序列
T = len(O) # 序列长度
M,N = A.shape[1],B.shape[0]

计算$\alpha_1(1)=\pi_1 b_1(o_1)$，$\alpha_1(2)=\pi_2 b_2(o_1)$，$\alpha_1(3)=\pi_3 b_3(o_1)$

In [2]:
alpha1 = B[:,O[0]]*pi
alpha1 = alpha1.flatten()
alpha1

array([0.1 , 0.16, 0.28])

递推

$\alpha_2(1)=[\sum^3_{i=1} \ \alpha_1(i)a_{i1}]b_1(o_2)$，\
$\alpha_2(2)=[\sum^3_{i=1} \ \alpha_1(i)a_{i2}]b_2(o_2)$，\
$\alpha_2(3)=[\sum^3_{i=1} \ \alpha_1(i)a_{i3}]b_3(o_2)$，\
$\alpha_3(1)=[\sum^3_{i=1} \ \alpha_2(i)a_{i1}]b_1(o_3)$，\
$\alpha_3(2)=[\sum^3_{i=1} \ \alpha_2(i)a_{i2}]b_2(o_3)$，\
$\alpha_3(3)=[\sum^3_{i=1} \ \alpha_2(i)a_{i3}]b_3(o_3)$，

In [3]:
alpha2 = np.array([A[:,i].dot(alpha1)*B[i,O[1]] for i in range(M)]) # B[i,1]表示白的状态转移概率分布，B[i,0]表示红的状态转移概率分布
alpha2.round(4)

array([0.077 , 0.1104, 0.0606])

In [4]:
alpha3 = np.array([A[:,i].dot(alpha2)*B[i,O[0]] for i in range(M)])
alpha3.round(5)

array([0.04187, 0.03551, 0.05284])

In [5]:
alpha4 = np.array([A[:,i].dot(alpha3)*B[i,O[1]] for i in range(M)])
alpha4.round(5)

array([0.02108, 0.02519, 0.01382])

In [6]:
P = np.sum(alpha4)
P.round(5)

0.06009

In [7]:
def forward(A,B,pi,T,O):
    alpha = np.zeros((T,N))
    for t in range(T):
        if t == 0:
            alpha[0,:] = B[:,O[0]]*pi # 初始化
            continue
        alpha[t,:] = np.array([A[:,i].dot(alpha[t-1,:])*B[i,O[t]] for i in range(M)])
    return alpha
alpha = forward(A,B,pi,T,O)

In [8]:
alpha

array([[0.1       , 0.16      , 0.28      ],
       [0.077     , 0.1104    , 0.0606    ],
       [0.04187   , 0.035512  , 0.052836  ],
       [0.0210779 , 0.02518848, 0.01382442]])

In [9]:
def backward(A,B,pi,T,O):
    beta = np.zeros((T,N))
    beta[-1,:] = np.ones(N)
    for t in list(range(T-1))[::-1]:
        beta[t,:] = np.array([np.sum(np.array([A[i,j]*B[j,O[t+1]]*beta[t+1,j] for j in range(N)])) for i in range(N)])
    p = np.sum(np.array([pi[i]*B[i,O[0]]*beta[0,i] for i in range(N)]))
    return p
backward(A,B,pi,T,O).round(5)

0.06009

In [10]:
import logging

class HMM:
    def __init__(self,M,N,epoches=10):
        self.A  = np.abs(np.random.randn(M,M))
        self.B  = np.abs(np.random.randn(M,N))
        self.pi = np.ones(M)
        self.T  = None
        self.O  = None
        self.M  = M # 可能的状态数
        self.N  = N # 可能的观察数
        self.alpha = None
        self.beta = None
        self.epoches = epoches
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.DEBUG)
    
    def forward(self,):
        alpha = np.zeros((self.T,self.M))
        for t in range(self.T):
            if t == 0:
                alpha[t,:] = self.B[:,O[t]]*self.pi # 初始化
                continue
            alpha[t,:] = np.array([self.A[:,i].dot(alpha[t-1,:])*self.B[i,O[t]] for i in range(self.M)])
        return alpha

    def backward(self,):
        beta = np.zeros((self.T,self.M))
        beta[-1,:] = np.ones(self.M)
        for t in list(range(self.T-2,-1,-1)):
            beta[t,:] = np.array([np.sum(np.array([self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j] for j in range(self.M)])) for i in range(self.M)])
        return beta

    def calc_gamma(self,t,i):
        gamma = (self.alpha[t,i]*self.beta[t,i]) / np.sum(np.array([self.alpha[t,j]*self.beta[t,j] for j in range(self.M)]))
        return gamma

    def calc_kesai(self,t,i,j):
        kesai = (self.alpha[t,i]*self.A[i,j]*self.B[j,self.O[t+1]]*self.beta[t+1,j]) / np.sum(np.array([[self.alpha[t,i]*self.A[i,j]*self.B[j,self.O[t+1]]*self.beta[t+1,j] for j in range(M)] for i in range(M)]))
        return kesai
    
    def init_params(self,O,A,B,pi):
        self.T = len(O)
        self.O = O
        if A is not None:
            self.A = A
        if B is not None:
            self.B = B
        if pi is not None:
            self.pi = pi
        
        self.M,self.N = self.B.shape
        
        if A is None:
            self.A = self.A / np.sum(self.A)
            self.B = self.B / np.sum(self.B)
            self.pi = self.pi / np.sum(self.pi)
            

    def cal_probality(self,select=None):
        p = 0
        if select != "backward":
            self.logger.info("前向算法")
        else:
            self.logger.info("后向算法")
        self.alpha = self.forward()
        self.beta = self.backward()
        p = np.sum(self.alpha[-1,:])
        return p

    def baum_welch(self,):
        for e in range(self.epoches):
            
            self.alpha = self.forward()
            self.beta = self.backward()
            
            self.logger.info("第{}次迭代".format(e))
            
            A_ = np.zeros((self.M,self.M))
            B_ = np.zeros((self.M,self.N))
            pi_ = np.zeros(self.M)
                        
            # a_{ij}
            for i in range(self.M):
                for j in range(self.M):
                    molecular = 0.0
                    denominator = 0.0
                    for t in range(self.T-1):
                        molecular += self.calc_kesai(t,i,j)
                        denominator += self.calc_gamma(t,i)
                    A_[i,j] = molecular / denominator
                    
            # b_{jk}
            for j in range(self.M):
                for k in range(self.N):
                    molecular = 0.0
                    denominator = 0.0
                    for t in range(self.T):
                        if k == self.O[t]:
                            molecular += self.calc_gamma(t,j)
                        denominator += self.calc_gamma(t,j)
                    B_[j,k] = molecular / denominator
                    
            # pi_{i}
            for i in range(self.M):
                pi_[i] = self.calc_gamma(0,i)
            
            # 更新
            self.A  = A_/np.sum(A_)
            self.B  = np.array([self.B[i,:]/np.sum(self.B[i,:]) for i in range(self.M)])
            self.pi = pi_/np.sum(pi_)
        self.logger.info("更新完毕:\n A:\n{}\nB:\n{}\npi:\n{}\n".format(self.A.round(5),
                                                                                self.B.round(5),self.pi.round(5)))
        
    def viterbi(self,):
        delta = np.zeros((self.T,self.M))
        delta[0,:] = np.array([self.pi[i]*self.B[i,self.O[0]] for i in range(self.M)])
        psi = np.zeros((self.T,self.M))
        
        for t in range(1,self.T):
            for i in range(self.M):
                max_delta = np.array([delta[t-1,j]*self.A[j,i] for j in range(self.M)])
                delta[t,i] = np.max(max_delta)*self.B[i,self.O[t]]
                psi[t,i] = np.argmax(max_delta)
        print("delta:\n{},\npsi:\n{}".format(delta,psi))
        # 终止
        max_delta = delta[self.T-1,:]
        I = np.zeros(self.T,dtype=int)
        P = np.max(max_delta)
        I[-1] = np.argmax(max_delta)
        print("P(*)-最优路径的概率：{}；最优路径的终点是：{}".format(P.round(5),I[-1]))
        
        
        # 回溯
        for t in range(self.T-2,-1,-1):
            I[t] = psi[t+1,I[t+1]]
        print("最优路径I(*):\n{}".format(I))
        
        
    def fit(self,O,A=None,B=None,pi=None,select=None):
        if A is None:
            select = "bw"
        self.init_params(O,A,B,pi)
        self.logger.info("初始化:\n A:\n{}\nB:\n{}\npi:\n{}\n状态数:{},观测数:{}".format(self.A.round(5),
                                                                                self.B.round(5),self.pi.round(5),self.M,self.N))
        if select == "bw" or select == "baum_welch":
            self.baum_welch()
        self.p = self.cal_probality()
        self.logger.info("当前p(o|lambda)={}".format(self.p.round(5)))
        self.p = self.cal_probality("backward")
        self.logger.info("当前p(o|lambda)={}".format(self.p.round(5)))
        print("p(o|lambda)的概率为:{}".format(self.p))
        
A = np.array([ # 状态转移概率分布
    [.5,.2,.3],
    [.3,.5,.2],
    [.2,.3,.5]
])
B = np.array([ # 观测概率分布
    [.5,.5],
    [.4,.6],
    [.7,.3]
])
pi = np.array([ # 初始概率分布 => 状态的分布
    .2,.4,.4
])

O = [0,1,0] # 1表示红，0表示白 # 观测序列


M,N = A.shape[1],B.shape[0]    
hmm = HMM(10,4)
hmm.fit(O,A,B,pi)
hmm.viterbi()

p(o|lambda)的概率为:0.130218
delta:
[[0.1     0.16    0.28   ]
 [0.028   0.0504  0.042  ]
 [0.00756 0.01008 0.0147 ]],
psi:
[[0. 0. 0.]
 [2. 2. 2.]
 [1. 1. 2.]]
P(*)-最优路径的概率：0.0147；最优路径的终点是：2
最优路径I(*):
[2 2 2]


In [11]:
O = [0,1,0] # 1表示红，0表示白 # 观测序列

M,N = A.shape[1],B.shape[0]    
hmm = HMM(3,2)
hmm.fit(O)
hmm.viterbi()

p(o|lambda)的概率为:0.05178078150847522
delta:
[[0.           0.           0.7604947887]
 [0.0148663723 0.1894006233 0.          ]
 [0.000000048  0.0000000945 0.0480121762]],
psi:
[[0. 0. 0.]
 [2. 2. 2.]
 [1. 1. 1.]]
P(*)-最优路径的概率：0.04801；最优路径的终点是：2
最优路径I(*):
[2 1 2]
