In [1]:
import numpy as np

In [2]:
class HMM():
    def __init__(self,A,B,pi):
        self.A=A   #状态转移概率矩阵 N*N
        self.B=B   #观测概率矩阵   N*M
        self.pi=pi  #初始概率状态向量  N*1
    
    def forward(self,obs_seq):
        '''
        前向算法
        
        输入观测序列obs_seq，输出前向概率矩阵alpha
            alpha:维度T*N
                T -- 观测序列长度
                N -- 所有可能的状态数
        '''
        T=len(obs_seq)
        
        alpha=[]
        alpha.append(self.pi*self.B[:,obs_seq[0]])
        
        for t in range(1,T):
            alpha.append((np.dot(alpha[t-1],self.A)*self.B[:,obs_seq[t]]).reshape(1,-1).tolist()[0])
        alpha=np.array(alpha)
        return alpha
    
    def backward(self,obs_seq):
        '''
        后向算法
        
        输入观测序列obs_seq，输出后向概率矩阵beta
            beta:维度T*N
                T -- 观测序列长度
                N -- 所有可能的状态数
        '''
        T=len(obs_seq)
        N=self.A.shape[0]
        
        beta=[]
        beta.append([1 for _ in range(N)])
        
        for t in range(T-1)[::-1]:
            beta.append(np.dot(self.A,self.B[:,obs_seq[t+1]]*beta[-1]).reshape(1,-1).tolist()[0])
        beta=np.array(beta)[::-1]
        return beta
    
    def predict(self,obs_seq):
        '''
        用维特比算法实现隐马尔可夫模型的预测
        
        输入模型(A,B,pi)和观测序列obs_seq，输出最优路径I及其概率
            delte:维度T*N
                T -- 观测序列长度
                N -- 所有可能的状态数
                delte[t-1][i] -- 在时刻t状态为i的所有单个路径中概率最大值
            psi:维度T*N
                psi[t-1][i] -- 在时刻t状态为i的所有单个路径中概率最大的路径的第t-i个节点
        '''
        T=len(obs_seq)
        N=self.A.shape[0]
        
        #初始化
        delte,psi=[],[]
        I=[]
        delte.append((self.pi*self.B[:,obs_seq[0]]).reshape(1,-1).tolist()[0])
        psi.append([-1 for _ in range(N)])
        
        #递推
        for t in range(1,T):
            delte.append((np.amax((delte[-1]*self.A.T).T,axis=0)*self.B[:,obs_seq[t]]).reshape(1,-1).tolist()[0])
            psi.append(np.argmax((delte[-2]*self.A.T).T,axis=0).reshape(1,-1).tolist()[0])
        
        #终止
        P=max(delte[-1])
        I.append(np.argmax(delte[-1]))

        #最优路径回溯
        for t in range(T-1)[::-1]:
            I.append(psi[t+1][I[-1]])
        I=I[::-1]
        return I,P
    def fit(self,seqs,criterion=1e-3):
        '''
        根据训练数据学习模型参数（A,B,pi）
        若输入包括观测序列和状态序列，采用极大似然估计法
        若输入只有观测序列，采用无监督学习算法--Baum_Welch算法
            输入维度为S*2*T或S*T
                T -- 观测序列长度
                S -- 序列个数(无监督学习情况下本程序暂时只处理输入序列个数为1)
        '''
        dim=len(np.shape(seqs))
        if dim<2 or dim>3:
            print("输入错误！")
            return

        ###  极大似然估计
        if dim==3:
            sta_seqs=seqs[:,1,:]  #所有状态序列集合
            obs_seqs=seqs[:,0,:]  #所有观测序列集合
            T=len(sta_seqs[0])  #单个序列长度
            S=len(sta_seqs)     #序列个数
            
            sta_trans_dict={}  #key:(i,j) value:Aij，时刻t处于状态i时刻t+1转移到状态j的频数为Aij
            states=set()   #所有可能状态集合
            states_init={}  #存储初始状态及对应频数
            for sta_seq in sta_seqs:
                states_init[sta_seq[0]]=states_init.get(sta_seq[0],0)+1
                states=states|set(sta_seq)
                for t in range(T-1):
                    sta_trans_dict[(sta_seq[t],sta_seq[t+1])]=sta_trans_dict.get((sta_seq[t],sta_seq[t+1]),0)+1
            N=len(states)   #状态个数
            
            obs_sta_dict={}   #key(j,k) value:Bjk，状态为j观测为k的频数
            observations=set()   #所有可能观测集合
            for s in range(S):
                sta_seq,obs_seq=sta_seqs[s],obs_seq[s]
                observations=observations|set(obs_seq)
                for t in range(T):
                    obs_sta_dict[(sta_seq[t],obs_seq[t])]=obs_sta_dict.get((sta_seq[t],obs_seq[t]),0)+1
            M=len(observations)   #观测个数
            
            #状态转移概率矩阵A的估计
            for i in range(N):
                Aij_sum=sum([sta_trans_dict[x] for x in sta_trans_dict if x[0]==i])
                for j in range(N):
                    self.A[i][j]=sta_trans_dict[(i,j)]/Aij_sum
            
            #观测概率矩阵B的估计
            for j in range(N):
                Bjk_sum=sum([obs_sta_dict[x] for x in obs_sta_dict if x[0]==j])
                for k in range(M):
                    self.B[j][k]=obs_sta_dict[(j,k)]/Bjk_sum
            
            for i in range(N): 
                self.pi[i]=states_init[i]/S
            
            return self.A,self.B,self.pi
        
        
        ### Baum_Welch算法
        N=self.A.shape[0]
        T=len(seqs[0])
        obs_seq=seqs[0]
        
        done = False
        while not done:
            alpha = self.forward(obs_seq)
            beta = self.backward(obs_seq)

            xi = np.zeros((N, N, T - 1))  #xi[i,j,t]表示在时刻t处于状态i且在时刻t+1处于状态j的概率

            for t in range(T - 1):
                P_O_lambda = sum(alpha[-1,:])
                
                # 计算xi
                for i in range(N):
                    numer = alpha[t,i] * self.A[i, :] * self.B[:, obs_seq[t + 1]].T * beta[ t + 1,:].T
                    xi[i, :, t] = numer / P_O_lambda

                # 计算gamma_t(i) 就是对j进行求和
                gamma = np.sum(xi, axis=1)
                # need final gamma elements for new B
                prod = (alpha[T - 1,:] * beta[T - 1,:]).reshape((-1, 1))
                # 合并T时刻的节点
                gamma = np.hstack((gamma, prod / np.sum(prod)))
                # 列向量
                newpi = gamma[:, 0]
                newA = np.sum(xi, 2) / np.sum(gamma[:, :-1], axis=1).reshape((-1, 1))
                newB = np.copy(self.B)

                # 观测状态数
                num_levels = self.B.shape[1]
                sumgamma = np.sum(gamma, axis=1)
                for lev in range(num_levels):
                    mask = obs_seq == lev
                    newB[:, lev] = np.sum(gamma[:, mask], axis=1) / sumgamma

                if np.max(abs(self.pi - newpi)) < criterion and \
                                np.max(abs(self.A - newA)) < criterion and \
                                np.max(abs(self.B - newB)) < criterion:
                    done = 1
                self.A[:], self.B[:], self.pi[:] = newA, newB, newpi
        return self.A,self.B,self.pi

In [3]:
### 统计学习方法例10.2
A=np.array([[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]])
B=np.array([[0.5,0.5],[0.4,0.6],[0.7,0.3]])
pi=np.array([0.2,0.4,0.4])
h = HMM(A,B,pi)

obs_seq=[0,1,0]
alpha = h.forward(obs_seq)
print("forward alpha:",alpha)
print ("forward: P(O|lambda) = %f" %sum(alpha[-1,:]))
beta = h.backward(obs_seq)
print("backward beta",beta)
print ("backward: P(O|lambda) = %f" %sum(beta[0,:]*pi*B[:,0]))

### 统计学习方法例10.3
path,p=h.predict([0,1,0])
print("最优路径概率",p)
print("最优路径",path)

forward alpha: [[0.1      0.16     0.28    ]
 [0.077    0.1104   0.0606  ]
 [0.04187  0.035512 0.052836]]
forward: P(O|lambda) = 0.130218
backward beta [[0.2451 0.2622 0.2277]
 [0.54   0.49   0.57  ]
 [1.     1.     1.    ]]
backward: P(O|lambda) = 0.130218
最优路径概率 0.014699999999999998
最优路径 [2, 2, 2]


In [4]:
### 网上查找的海藻数据
observations = ('dry','dryish','damp','soggy')   #观测集合
states = ('sunny','cloudy','rainy')              #状态集合

start_probability ={'sunny':0.6,'cloudy':0.2,'rainy':0.2}

transition_probability = {
    'sunny':{'sunny':0.5,'cloudy':0.375,'rainy':0.125},
    'cloudy':{'sunny':0.25,'cloudy':0.125,'rainy':0.625},
    'rainy':{'sunny':0.25,'cloudy':0.375,'rainy':0.375}
}

emission_probability ={
    'sunny':{'dry':0.6,'dryish':0.2,'damp':0.15,'soggy':0.05},
    'cloudy':{'dry':0.25,'dryish':0.25,'damp':0.25,'soggy':0.25},
    'rainy':{'dry':0.05,'dryish':0.1,'damp':0.35,'soggy':0.50}
}

### 将数据转换为
def gengerate_index_map(labels):
    '''
    建立索引表'''
    index_label = {}
    label_index = {}

    i =0
    for l in labels:
        index_label[i] =l
        label_index[l] =i

        i+=1
    return label_index,index_label
states_label_index,states_index_label = gengerate_index_map(states)
observations_label_index,observations_index_label = gengerate_index_map(observations)

def convert_observations_to_index(observations,label_index):
    list =[]
    for o in observations:
        list.append(label_index[o])
    return list

def convert_map_to_matrix(map,label_index1,label_index2):
    '''
    建立概率矩阵'''
    m = np.empty((len(label_index1),len(label_index2)),dtype = float)
    for line in map:
        for col in map[line]:
            m[label_index1[line]][label_index2[col]] = map[line][col]
    return m

def convert_map_to_vector(map,label_index):
    '''
    建立概率向量'''
    v = np.empty(len(map),dtype = float)
    for e in map:
        v[label_index[e]] =map[e]
    return v

A = convert_map_to_matrix(transition_probability,states_label_index,states_label_index)
print ('状态转移概率矩阵:',A)
B = convert_map_to_matrix(emission_probability,states_label_index,observations_label_index)
print ('观测概率矩阵:',B)
observations_index = convert_observations_to_index(observations,observations_label_index)
print ('观测序列索引:',observations_index)
pi = convert_map_to_vector(start_probability,states_label_index)
print ('初始概率状态向量:',pi)

h = HMM(A,B,pi)
# 人为定义的海藻状态序列
obs_seq = ('dry','damp','soggy')
obs_seq_index = convert_observations_to_index(obs_seq,observations_label_index)

# 计算P(o|lambda)
alpha = h.forward(obs_seq_index)
print(alpha)
print ("forward: P(O|lambda) = %f" %sum(alpha[-1,:]))
beta = h.backward(obs_seq_index)
print(beta)
print ("backward: P(O|lambda) = %f" %sum(beta[0,:]*pi*B[:,0]))

# 预测
ss,p = h.predict(obs_seq_index)
path = []
for s in ss:
    path.append(states_index_label[s])

print("最有可能的隐藏序列为：" ,path)
print("viterbi: P(I|O) =%f"% p)

状态转移概率矩阵: [[0.5   0.375 0.125]
 [0.25  0.125 0.625]
 [0.25  0.375 0.375]]
观测概率矩阵: [[0.6  0.2  0.15 0.05]
 [0.25 0.25 0.25 0.25]
 [0.05 0.1  0.35 0.5 ]]
观测序列索引: [0, 1, 2, 3]
初始概率状态向量: [0.6 0.2 0.2]
[[0.36       0.05       0.01      ]
 [0.02925    0.03625    0.028     ]
 [0.00153438 0.0065     0.01840625]]
forward: P(O|lambda) = 0.026441
[[0.05984375 0.0821875  0.07875   ]
 [0.18125    0.35625    0.29375   ]
 [1.         1.         1.        ]]
backward: P(O|lambda) = 0.026441
最有可能的隐藏序列为： ['sunny', 'cloudy', 'rainy']
viterbi: P(I|O) =0.010547


In [5]:
def simulate(A,B,pi,T):
    '''
    生成测试序列'''
    def draw_from(probs):
        # np.random.multinomial 为多项式分布，1为实验次数，类似于投掷一枚骰子，丢出去是几，probs每个点数的概率，均为1/6
        # 给定行向量的概率，投掷次数为1次，寻找投掷的点数
        return np.where(np.random.multinomial(1, probs) == 1)[0][0]

    observations = np.zeros(T, dtype=int)
    states = np.zeros(T, dtype=int)
    states[0] = draw_from(pi)
    observations[0] = draw_from(B[states[0], :])
    for t in range(1, T):
        states[t] = draw_from(A[states[t - 1], :])
        observations[t] = draw_from(B[states[t], :])
    return observations.reshape(1,-1), states
observations_data, states_data = simulate(A,B,pi,100)

guess = HMM(np.array([[0.33, 0.33, 0.34],
                       [0.33, 0.33, 0.34],
                       [0.33, 0.33, 0.34]]),
                np.array([[0.25, 0.25, 0.25, 0.25],
                       [0.25, 0.25, 0.25, 0.25],
                       [0.25, 0.25, 0.25, 0.25]]),
                np.array([0.7, 0.15, 0.15])
                )
guess.fit(observations_data)
# 预测
states_out = guess.predict(observations_data[0])[0]
p = 0.0
for i in range(len(states_data)):
    if states_out[i] == states_data[i]: p += 1

print("正确率为：",p / len(states_data))

正确率为： 0.36
