隐马尔科夫模型

In [1]:
%matplotlib inline
# import matplotlib.pyplot as plt
import numpy as np
from hmmlearn.hmm import MultinomialHMM, GMMHMM

In [4]:
''' 设计一个 HMM 模型'''
# 构建一个参数已知的HMM模型
model_hmm1 = dict()

In [5]:
# 状态的初始分布
model_hmm1["pi"] =np.array([1.0/3.0,1.0/3.0,1.0/3.0])

In [6]:
# 各个状态之间的转移关系
# 行表示当前状态 列表示下一状态
model_hmm1["A"] = np.array([   
                            [0.4, 0.3, 0.3],
                            [0.3, 0.4, 0.3],
                            [0.3, 0.3, 0.4], 
                           ])

In [7]:
# 观测样本与各个状态之间的概率映射关系矩阵                   
M_O2S = np.zeros([3,8])
M_O2S[0,:6]=1/6.0
M_O2S[1,:4]=1/4.0
M_O2S[2,:8]=1/8.0
model_hmm1["M_O2S"] = M_O2S

In [8]:
# 计算观测样本O属于状态S的概率的函数（发射概率）
def prob_O2S(model,o):
    M_O2S = model["M_O2S"]
    return M_O2S[:,int(o)]

In [9]:
model_hmm1["B"] = prob_O2S

In [10]:
# 根据一组概率分布生成一个样本
def gen_one_sample_from_Prob_list(Prob_list):
    N_segment = np.shape(Prob_list)[0]
    
    # 将[0,1]的区间分为 N_segment 段
    prob_segment = np.zeros(N_segment)
    # 例如 Prob_list = [  0.3,0.3,0.4]
    #      prob_segment_segment = [0.3,0,6,1]
    for i in range(N_segment):
        prob_segment[i] = prob_segment[i-1]+ Prob_list[i]
    
    S =0
    # 生成0,1之间的随机数
    data = np.random.rand()
    # 查看生成的数值位于哪个段中
    for i in range(N_segment):
        if data <= prob_segment[i]:
            S = i
            break
    return S

In [11]:
def gen_samples_from_HMM(model,N):
    M_O2S = model["M_O2S"]
   
    datas = np.zeros(N)
    stats = np.zeros(N)
    
    # 得到初始状态，并根据初始状态生成一个样本
    init_S = gen_one_sample_from_Prob_list(model["pi"])
    stats[0] = init_S
    
    # 根据初始状态，生成一个数据
    datas[0] = gen_one_sample_from_Prob_list(M_O2S[int(stats[0])]) 
    
    #生成其他样本 
    for i in range(1,N):
        # 根据前一状态，生成当前状态
        stats[i] = gen_one_sample_from_Prob_list(model["A"][int(stats[i-1])])
        # 根据当前状态生成一个数据
        datas[i] = gen_one_sample_from_Prob_list(M_O2S[int(stats[i])])
    return datas,stats

In [12]:
''' 任务1 生成一条满足参数已知HMM分布的数据 '''
datas,states = gen_samples_from_HMM(model_hmm1,200)
print(datas)
print(states)

[2. 0. 6. 0. 0. 3. 2. 1. 1. 0. 2. 0. 3. 4. 7. 5. 0. 5. 2. 4. 4. 6. 2. 1.
 5. 3. 2. 3. 0. 3. 7. 7. 0. 2. 1. 0. 2. 2. 0. 2. 0. 0. 3. 5. 2. 7. 1. 5.
 4. 2. 5. 2. 0. 0. 1. 2. 1. 1. 2. 3. 5. 6. 0. 2. 1. 4. 0. 1. 0. 3. 2. 3.
 4. 2. 1. 0. 2. 3. 1. 7. 5. 2. 3. 3. 2. 2. 0. 3. 3. 7. 5. 4. 1. 7. 5. 0.
 4. 3. 2. 2. 3. 0. 3. 1. 0. 0. 3. 3. 2. 3. 1. 7. 4. 1. 2. 3. 3. 4. 1. 1.
 1. 0. 3. 4. 3. 1. 1. 0. 3. 4. 0. 4. 1. 6. 4. 6. 7. 4. 2. 2. 5. 3. 2. 2.
 2. 2. 1. 1. 5. 2. 3. 1. 5. 0. 0. 4. 2. 0. 4. 0. 1. 1. 0. 1. 1. 0. 3. 2.
 3. 2. 0. 4. 0. 3. 3. 1. 1. 1. 0. 2. 2. 1. 3. 3. 1. 3. 3. 2. 4. 3. 0. 1.
 7. 4. 0. 0. 1. 3. 1. 3.]
[1. 1. 2. 0. 0. 1. 1. 1. 1. 0. 2. 1. 1. 0. 2. 0. 2. 0. 0. 0. 2. 2. 0. 1.
 2. 2. 1. 1. 2. 2. 2. 2. 0. 1. 1. 1. 1. 0. 1. 1. 2. 2. 1. 0. 0. 2. 2. 2.
 0. 1. 2. 2. 0. 1. 1. 0. 0. 1. 0. 2. 0. 2. 0. 2. 1. 0. 1. 0. 0. 2. 1. 2.
 0. 2. 0. 2. 0. 1. 0. 2. 2. 0. 2. 1. 1. 0. 0. 1. 1. 2. 2. 2. 1. 2. 0. 1.
 0. 1. 2. 0. 1. 1. 2. 2. 1. 1. 1. 0. 1. 2. 1. 2. 0. 0. 1. 1. 1. 2. 1. 1.
 1. 1. 1. 2. 1. 1. 0. 1. 

In [13]:
# 前向算法  alpha = [N_sample,N_stats]
# 表示已知前t个样本的的情况下，第t个样本
# 属于状态i的概率
def calc_alpha(model,observations):
    o = observations
    N_samples = np.shape(o)[0]
    N_stats = np.shape(model["pi"])[0]
    
    # alpha 初始化
    alpha = np.zeros([N_samples,N_stats])
    
    # 计算第0个样本属于第i个状态的概率
    alpha[0] = model["pi"]*model["B"](model,o[0])
   
    # 计算其他时刻的样本属第i个状态的概率
    for t in range(1,N_samples):
        s_current = np.dot(alpha[t-1],model["A"])
        alpha[t] = s_current*model["B"](model,o[t])
   
    return alpha

In [14]:
# 后向算法
def calc_beta(model,observations):
    o = observations
    N_samples = np.shape(o)[0]
    N_stats = np.shape(model["pi"])[0]
    
    beta = np.zeros([N_samples,N_stats])
    
    # 反向初始值
    beta[-1] = 1
    
    for t in range(N_samples-2,-1,-1):
        # 由t+1时刻的beta以及t+1时刻的观测值计算
        # t+1时刻的状态值
        s_next = beta[t+1]*model["B"](model,o[t+1])
        beta[t] = np.dot(s_next,model["A"].T)
    return beta    

In [16]:
def forward(model,observations):
    o = observations
    
    # 计算前向概率
    alpha =  calc_alpha(model,o)
    prob_seq_f = np.sum(alpha[-1])

    return np.log(prob_seq_f)

In [17]:
def backward(model,observations):
    o = observations
    
    # 计算后向概率
    beta =  calc_beta(model,o)
    s_next = beta[0]*model["B"](model,o[0]) 
    prob_seq_b = np.dot(s_next,model["pi"])
    
    return np.log(prob_seq_b)

In [18]:
''' 任务2 利用前向 后向算法 计算一个生成一个序列的概率'''
datas = np.array([1,3,4])
print("alpha")
print(calc_alpha(model_hmm1,datas))    
print("beta")
print(calc_beta(model_hmm1,datas)) 
print("---------------")
print("forward",forward(model_hmm1,datas))
print("backward",backward(model_hmm1,datas))

alpha
[[0.05555556 0.08333333 0.04166667]
 [0.0099537  0.015625   0.00729167]
 [0.00180941 0.         0.00132378]]
beta
[[0.01725694 0.01770833 0.01677083]
 [0.10416667 0.0875     0.1       ]
 [1.         1.         1.        ]]
---------------
forward -5.765700974259562
backward -5.765700974259562


In [19]:
# 构建另一个参数已知的HMM模型
model_hmm2 = dict()
# 状态的初始分布
model_hmm2["pi"] =np.array([1.0/3.0,1.0/3.0,1.0/3.0])
# 各个状态之间的转移关系
# 行表示当前状态 列表示下一状态
model_hmm2["A"] = np.array([   
                        [0.8, 0.1, 0.1],
                        [0.1, 0.8, 0.1],
                        [0.1, 0.1, 0.8], 
                       ])
# 观测样本与各个状态之间的概率映射关系矩阵                   
M_O2S_2 = np.zeros([3,8])
M_O2S_2[0,:6]=[0.8,0.04,0.04,0.04,0.04,0.04]
M_O2S_2[1,:4]=1/4.0
M_O2S_2[2,:8]=1/8.0
model_hmm2["M_O2S"] = M_O2S_2
model_hmm2["B"] = prob_O2S

In [20]:
# 分别用2个HMM生成2个数据序列
datas_hmm1,_ = gen_samples_from_HMM(model_hmm1,100)
datas_hmm2,_ = gen_samples_from_HMM(model_hmm2,100)    

p_d1m1 = forward(model_hmm1,datas_hmm1)
p_d1m2 = forward(model_hmm2,datas_hmm1)
    
p_d2m1 = forward(model_hmm1,datas_hmm2)
p_d2m2 = forward(model_hmm2,datas_hmm2)
    
print("p_d1m1",p_d1m1)
print("p_d1m2",p_d1m2)
print("----------------------------------")
print("p_d2m1",p_d2m1)
print("p_d2m2",p_d2m2)

p_d1m1 -196.31645150637812
p_d1m2 -203.19453911807554
----------------------------------
p_d2m1 -178.28500717670119
p_d2m2 -146.5341992098277
