In [24]:
import numpy as np

states=['healthy','fever']
observations=['normal','cold','dizzy']
pi=np.array([0.6,0.4])
A=np.array([[0.7,0.3],[0.4,0.6]])
B=np.array([[0.5,0.4,0.1],[0.1,0.3,0.6]])
#模拟状态序列和观测序列
s=np.array([0,0,1,1,2,1,2,2,2,0])
o=np.array([0,0,0,0,1,1,1,1,1,0])

In [29]:
class HMM():
    def __init__(self,A,B,pi):
        self.A=A
        self.B=B
        self.pi=pi
        
    def _forward(self,obs_seq):
        N=self.A.shape[0]
        T=len(obs_seq)
        
        #F保存前向概率矩阵:使用矩阵乘法替代循环
        F=np.zeros((N,T))
        F[:,0]=np.dot(self.pi,self.B[:,obs_seq[0]])
        
        for t in range(1,T-1):
            for i in range(N):
                F[i,t]=np.dot(F[:,t-1],self.A[:,i])*self.B[i,obs_seq[t]]
        return F
    
    def _backward(self,obs_seq):
        N=self.A.shape[0]
        T=len(obs_seq)
        X=np.zeros((N,T))
        X[:,-1]=1
        for t in reversed(range(T-1)):
            for i in range(N):
                X[i,t]=sum(self.A[i,:]*self.B[:,obs_seq[t+1]]*X[:,t+1])
        return X
    
    def bw_unsuper(self,observations, criterion=0.05):
        n_states = self.A.shape[0]
        n_samples = len(observations)
     
        done = False
        while not done:
            # alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
            # Initialize alpha
            alpha = self._forward(observations)
     
            # beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
            # Initialize beta
            beta = self._backward(observations)
     
            xi = np.zeros((n_states,n_states,n_samples-1))
            for t in range(n_samples-1):
                denom = np.dot(np.dot(alpha[:,t].T, self.A) * self.B[:,observations[t+1]].T, beta[:,t+1])
                for i in range(n_states):
                    numer = alpha[i,t] * self.A[i,:] * self.B[:,observations[t+1]].T * beta[:,t+1].T
                    xi[i,:,t] = numer / (denom+0.00001)
     
            # gamma_t(i) = P(q_t = S_i | O, hmm)
            gamma = np.sum(xi,axis=1)
            # Need final gamma element for new B
            prod =  (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
            gamma = np.hstack((gamma,  prod / (np.sum(prod)+0.00001))) #append one more to gamma!!!
     
            newpi = gamma[:,0]
            newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
            newB = np.copy(self.B)
     
            num_levels = self.B.shape[1]
            sumgamma = np.sum(gamma,axis=1)
            for lev in range(num_levels):
                mask = observations == lev
                newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma
     
            if np.max(abs(self.pi - newpi)) < criterion and \
                            np.max(abs(self.A - newA)) < criterion and \
                            np.max(abs(self.B - newB)) < criterion:
                done = 1
     
            self.A[:],self.B[:],self.pi[:] = newA,newB,newpi
    
    def state_path(self, obs_seq):
        """
        Returns
        -------
        V[last_state, -1] : float
            Probability of the optimal state path
        path : list(int)
            Optimal state path for the observation sequence
        """
        V, prev = self.viterbi(obs_seq)

        # Build state path with greatest probability
        last_state = np.argmax(V[:,-1])
        path = list(self.build_viterbi_path(prev, last_state))

        return V[last_state,-1], reversed(path)
    
    def build_viterbi_path(self, prev, last_state):
        """Returns a state path ending in last_state in reverse order."""
        T = len(prev)
        yield(last_state)
        for i in range(T-1, -1, -1):
            yield(prev[i, last_state])
            last_state = prev[i, last_state]
        
    def viterbi(self, obs_seq):
        """
        Returns
        -------
        V : numpy.ndarray
            V [s][t] = Maximum probability of an observation sequence ending
                       at time 't' with final state 's'
        prev : numpy.ndarray
            Contains a pointer to the previous state at t-1 that maximizes
            V[state][t]
        """
        N = self.A.shape[0]
        T = len(obs_seq)
        prev = np.zeros((T - 1, N), dtype=int)
     
        # DP matrix containing max likelihood of state at a given time
        V = np.zeros((N, T))
        V[:,0] = self.pi * self.B[:,obs_seq[0]]
     
        for t in range(1, T):
            for n in range(N):
                seq_probs = V[:,t-1] * self.A[:,n] * self.B[n, obs_seq[t]]
                prev[t-1,n] = np.argmax(seq_probs)
                V[n,t] = np.max(seq_probs)
     
        return V, prev

0.2


In [30]:
guess=HMM(A,B,pi)
guess.bw_unsuper(o)
states_out = guess.state_path(o)[1]
p = 0.0
for i in s:
    if next(states_out) == i: p += 1
 
print(p/10)
    


0.2
