In [2]:
import numpy as np
from hmmlearn import hmm
from itertools import accumulate


class GenData:
    """
    根据隐马尔科夫模型生成相应的观测数据
    """

    def __init__(self, hmm, n_sample):
        self.hmm = hmm
        self.n_sample = n_sample

    def _locate(self, prob_arr):
        # 给定概率向量，返回状态
        seed = np.random.rand(1)
        for state, cdf in enumerate(accumulate(prob_arr)):
            if seed <= cdf:
                return state
        return

    def init_state(self):
        # 根据初始状态概率向量，生成初始状态
        return self._locate(self.hmm.S)

    def state_trans(self, current_state):
        # 转移状态
        return self._locate(self.hmm.A[current_state])

    def gen_obs(self, current_state):
        # 生成观测
        return self._locate(self.hmm.B[current_state])

    def gen_data(self):
        # 根据模型产生观测数据
        current_state = self.init_state()
        start_obs = self.gen_obs(current_state)
        state = [current_state]
        obs = [start_obs]
        n = 0
        while n < self.n_sample - 1:
            n += 1
            current_state = self.state_trans(current_state)
            state.append(current_state)
            obs.append(self.gen_obs(current_state))
        return state, obs


class HMM:
    def __init__(self, n_state, n_obs, S=None, A=None, B=None):
        self.n_state = n_state  # 状态的个数n
        self.n_obs = n_obs  # 观测的种类数m
        self.S = S  # 1*n, 初始状态概率向量
        self.A = A  # n*n, 状态转移概率矩阵
        self.B = B  # n*m, 观测生成概率矩阵


    def _alpha(self, obs, t):
        # 计算时刻t各个状态的前向概率
        
        b = self.B[:, obs[0]]
        alpha = np.array([self.S * b])  # n*1
        for i in range(1, t + 1):
            alpha = (alpha @ self.A) * np.array([self.B[:, obs[i]]])
           
        return alpha[0]


    def forward_prob(self, obs):
    # 前向算法计算最终生成观测序列的概率, 即各个状态下概率之和
        alpha = self._alpha( obs, len(obs)-1 )
       
        return np.sum(alpha)


    def _beta(self, obs, t):
    # 计算时刻t各个状态的后向概率
        beta = np.ones(self.n_state)
        for i in reversed(range(t + 1, len(obs))):
            beta = np.sum(self.A *self.B[:, obs[i]] * beta, axis=1)
            #print(beta)
        return beta


    def backward_prob(self, obs):
    # 后向算法计算生成观测序列的概率
        beta =self._beta( obs, 1)
        print(np.sum(self.S *self.B[:, obs[0]] * beta))
        return np.sum(self.S *self.B[:, obs[0]] * beta)


    def fb_prob(self, obs, t=None):
    # 将前向和后向合并
        if t is None:
            t = 0
        res = self._alpha( obs, t) * self._beta( obs, t)
        return res.sum()


    def _gamma(self, obs, t):
    # 计算时刻t处于各个状态的概率
        alpha = self._alpha( obs, t)
        beta =self._beta( obs, t)
        #print(np.nan_to_num(beta))
        prob = alpha * beta
        #print('分子%s分母%s'%(np.nan_to_num(prob),np.nan_to_num(prob.sum())))
        return prob / prob.sum()


    def point_prob(self, obs, t, i):
    # 计算时刻t处于状态i的概率
        prob = self._gamma( obs, t)
        return prob[i]


    def _xi(self, obs, t):
        alpha = np.mat(self._alpha( obs, t))
        beta_p = self._beta( obs, t + 1)
        obs_prob = self.B[:, obs[t + 1]]
        obs_beta = np.mat(obs_prob * beta_p)
        alpha_obs_beta = np.asarray(alpha.T * obs_beta)
        xi = alpha_obs_beta * self.A
        return xi / xi.sum()


    def fit(self, obs_data, maxstep=100):
        # 利用Baum-Welch算法学习
        self.A = np.ones((self.n_state,self.n_state)) / self.n_state
        self.B = np.ones((self.n_state, self.n_obs)) / self.n_obs
        self.S = np.random.sample(self.n_state)  # 初始状态概率矩阵（向量），的初始化必须随机状态，否则容易陷入局部最优
        #if np.nan_to_num(self.S.sum())==0:
           #self.S=self.S
        #else:
        self.S = np.nan_to_num(self.S) / np.nan_to_num(self.S.sum())
        step = 0
        print(step)
        print(maxstep)
        while step < maxstep:
            xi = np.zeros_like(self.A)
            gamma = np.zeros_like(self.S)
            B = np.zeros_like(self.B)
            S = self._gamma(obs_data, 0)
            for t in range(len(obs_data) - 1):
                tmp_gamma =self._gamma(obs_data, t)
                gamma += tmp_gamma
                xi += self._xi(obs_data, t)
                B[:, obs_data[t]] += tmp_gamma

        # 更新 A
            for i in range(self.n_state):
                #print("分子%s分母%s"%(np.nan_to_num( xi[i]) , np.nan_to_num(gamma[i])))
                #if np.nan_to_num(gamma[i])==0:
                   # self.A[i]=self.A[i]
                #else:
                self.A[i] = np.nan_to_num( xi[i]) / np.nan_to_num(gamma[i])
        # 更新 B
            tmp_gamma_end = self._gamma(obs_data, len(obs_data) - 1)
            gamma += tmp_gamma_end
            B[:, obs_data[-1]] += tmp_gamma_end
            for i in range(self.n_state):
                #if np.nan_to_num(gamma[i])==0:
                   # self.B[i]= self.B[i]
                #else:
                self.B[i] =np.nan_to_num( B[i]) / np.nan_to_num(gamma[i])
                
            # 更新 S
            
            self.S = S
            step += 1
#             self.A[self.A<0.00000000001]=0
#             self.A[self.A>0.99999]=1
#             self.B[ self.B<0.00000000001]=0
#             self.B[self.B>0.99999]=1
#             self.S[self.S<0.00000000001]=0
#             self.S[self.S>0.99999]=1
#             self.S=np.nan_to_num(self.S)
#             self.A=np.nan_to_num(self.A)
#             self.B=np.nan_to_num(self.B)
        return self


    def predict(self, obs):
        # 采用Viterbi算法预测状态序列
        N = len(obs)
        nodes_graph = np.zeros((self.n_state, N), dtype=int)  # 存储时刻t且状态为i时， 前一个时刻t-1的状态，用于构建最终的状态序列
        delta = self.S * self.B[:, obs[0]]  # 存储到t时刻，且此刻状态为i的最大概率
        nodes_graph[:, 0] = range(self.n_state)

        for t in range(1, N):
            new_delta = []
            for i in range(self.n_state):
                temp = [self.A[j, i] * d for j, d in enumerate(delta)]  # 当前状态为i时， 选取最优的前一时刻状态
                max_d = max(temp)
                new_delta.append(max_d * self.B[i, obs[t]])
                nodes_graph[i, t] = temp.index(max_d)
            delta = new_delta

        current_state = np.argmax(nodes_graph[:, -1])
        path = []
        t = N
        while t > 0:
            path.append(current_state)
            current_state = nodes_graph[current_state, t - 1]
            t -= 1
        return list(reversed(path))


#if __name__ == '__main__':
    # S = np.array([0.2, 0.4, 0.4])
    # A = np.array([[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]])
    # B = np.array([[0.5, 0.2, 0.3], [0.4, 0.2, 0.4], [0.6, 0.3, 0.1]])
    # hmm_real = HMM(3, 3, S, A, B)
    # g = GenData(hmm_real, 500)
    # state, obs = g.gen_data()
    # 检测生成的数据
    # state, obs = np.array(state), np.array(obs)
    # ind = np.where(state==2)[0]
    # from collections import Counter
    # obs_ind = obs[ind]
    # c1 = Counter(obs_ind)
    # n = sum(c1.values())
    # for o, val in c.items():
    #     print(o, val/n)
    # ind_next = ind + 1
    # ind_out = np.where(ind_next==1000)
    # ind_next = np.delete(ind_next, ind_out)
    # state_next = state[ind_next]
    # c2 = Counter(state_next)
    # n = sum(c2.values())
    # for s, val in c2.items():
    #     print(s, val/n)

    # 预测
#     S = np.array([0.5, 0.5])
#     A = np.array([[0.8, 1], [0.8, 0.8]])
#     B = np.array([[0.2, 0.0, 0.8], [0, 0.8, 0.2]])
#     hmm = HMM(2, 3, S, A, B)
#     g = GenData(hmm, 200)
#     state, obs = g.gen_data()
#     print(obs)
#     path = predict(hmm, obs)
#     score = sum([int(i == j) for i, j in zip(state, path)])
#     print(score / len(path))

# hmm = HMM(3, 4)
# #obes1=np.array([['A','A','B','B','C','C','D','D'],['A','B','B','C','B','B','D','D'],['A','C','B','C','B','C','D'],['A','D'],['A','C','B','C','B','A','B','C','D','D'],['B','A','B','A','A','D','D','D'],['B','A','B','C','D','C','C'],['A','B','D','B','B','C','C','D','D'],['A','B','A','A','A','C','D','C','C','D'],['A','B','D']])
# #obes1=[[10,10,11,11,12,12,13,13],[10,11,11,12,11,11,13,13],[10,12,11,12,11,12,13],[10,13],[10,12,11,12,11,10,11,12,13,13],[11,10,11,10,10,13,13,13],[11,10,11,12,13,12,12],[10,11,13,11,11,12,12,13,13],[10,11,10,10,10,12,13,12,12,13],[10,11,13]]
# obes1=[[0,0,1,1,2,2,3,3],[0,1,1,2,1,1,3,3],[0,2,1,2,1,2,3],[0,3],[0,2,1,2,1,0,1,2,3,3],[1,0,1,0,0,3,3,3],[1,0,1,2,3,2,2],[0,1,3,1,1,2,2,3,3],[0,1,0,0,0,2,3,2,2,3],[0,1,3]]
# for i in obes1:
#     hmm =hmm.fit( i)
# print(hmm.S)
# print(hmm.A)
# print(hmm.B)




s=np.array([0,  0.79064424, 0.20935576])
a=np.array([[1, 0, 0],
 [1, 0, 0],
 [1, 0, 0]])
b=np.array([[0,  0.5, 0,  0.5],
 [1,  0, 0, 0 ],
 [1,  0, 0,  0 ]])
hmm = HMM(3, 4,s,a,b)
print(hmm.forward_prob([0,0,1,1,2,2,3,3]))

#print(hmm.forward_prob([0,3,1,1,1,2,3]))

    # 学习
    # import matplotlib.pyplot as plt
    #
    #
    # def triangle_data(n_sample):
    #     # 生成三角波形状的序列
    #     data = []
    #     for x in range(n_sample):
    #         x = x % 6
    #         if x <= 3:
    #             data.append(x)
    #         else:
    #             data.append(6 - x)
    #     return data
    #
    #
    # hmm = HMM(10, 4)
    # data = triangle_data(30)
    # hmm = fit(hmm, data)
    # g = GenData(hmm, 30)
    # state, obs = g.gen_data()
    #
    # x = [i for i in range(30)]
    # plt.scatter(x, obs, marker='*', color='r')
    # plt.plot(x, data, color='g')
    # plt.show()


0.0


In [18]:
a=np.array([1,0,0])
b=0
if b==0:
    a=a
else:
    a=a/b
print(a)

[1 0 0]
