In [21]:
# Hidden Markov Model is a random process, with the latent model inside

# State/Track : state sequence(which is latent)
# Observation : observation sequence
# Markov model is assumpted that the current state is related to the previouw one state

# Q: state spcae 1,..,N
# V: observation spae 1,...,M
# I: state sequence
# O: observation sequence
# A: transition matrix P(I|I) R(N,N)
# B: observation matrix P(O|I) R(N,M)
# Pi: init state distribution Pi(I) R(1,N])

# Assumption:
# 1. The second Markov hypothesis
# 2. Assumptions of observational independence

import numpy as np

# state space
Q = {'Healthy','Fever'}

# observation space
V = {'normal','cold','dizzy'}

# init state distribution
Pi = {
    'Healthy':0.6,
    'Fever':0.4
}

# state transition
A = {
    'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
    'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}

# observation transition
B = {
    'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
    'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}

def generate_index_map(labels):
    index2label = {}
    label2index = {}
    i = 0
    for label in labels:
        index2label[i] = label
        label2index[label] = i
        i += 1
    return index2label,label2index

Qlabel,Qindex = generate_index_map(Q)
Vlabel,Vindex = generate_index_map(V)
print('Qlabel,Qindex,Vlabel,Vindex:')
print(Qlabel,Qindex)
print(Vlabel,Vindex)

def gen_Pi(pi_dict,Qlabel):
    v = np.zeros(len(Qlabel),dtype=float)
    for key in pi_dict:
        v[Qlabel[key]] = pi_dict[key]
    return v 

def gen_matrix(trans_dict,Qlabel,Vlabel):
    res = np.zeros((len(Qlabel),len(Vlabel)),dtype=float)
    for si in trans_dict:
        for sj in trans_dict[si]:
            res[Qlabel[si]][Vlabel[sj]] =  trans_dict[si][sj]
    return res 

A = gen_matrix(A,Qindex,Qindex)
B = gen_matrix(B,Qindex,Vindex)
pi = gen_Pi(Pi,Qindex)
print('state transition matrix')
print(A)
print('state-observation matrix')
print(B)
print('state prob distribution')
print(pi)



Qlabel,Qindex,Vlabel,Vindex:
{0: 'Healthy', 1: 'Fever'} {'Healthy': 0, 'Fever': 1}
{0: 'normal', 1: 'cold', 2: 'dizzy'} {'normal': 0, 'cold': 1, 'dizzy': 2}
state transition matrix
[[0.7 0.3]
 [0.4 0.6]]
state-observation matrix
[[0.5 0.4 0.1]
 [0.1 0.3 0.6]]
state prob distribution
[0.6 0.4]


In [26]:
def simulate(T,A,B,pi):
    def draw_from(state_prob_distribution):
        return np.where(np.random.multinomial(1,state_prob_distribution)==1)[0][0]
    observations = np.zeros(T,dtype=int)
    states = np.zeros(T,dtype=int)
    states[0] = draw_from(pi)
    observations[0] = draw_from(B[states[0],:])
    for t in range(1, T):
        states[t] = draw_from(A[states[t-1],:])
        observations[t] = draw_from(B[states[t],:])
    return observations, states

# simulate the data 
O,I = simulate(10,A,B,pi)
print([Qlabel[i] for i in I])
print([Vlabel[j] for j in O])


['Fever', 'Fever', 'Fever', 'Fever', 'Healthy', 'Healthy', 'Fever', 'Fever', 'Fever', 'Fever']
['dizzy', 'dizzy', 'cold', 'dizzy', 'normal', 'dizzy', 'cold', 'dizzy', 'dizzy', 'dizzy']


In [35]:
# In the study of HMM, there are three main problems
# Track Probability: give the O={o1,o2,...,oT},P(O)
# Learning problem: give the O, estimate the parameters
# Forecasting problem: (decoding problem also),given the observation track, 
#                       show the most likely state track

def forward(obs_seq,A,B,pi):
    '''cal the probability from the observation sequence with forward''' 
    state_len = A.shape[0]
    track_len = len(obs_seq)
    F = np.zeros((state_len,track_len))

    # init the state with the pi
    F[:,0] = pi*B[:,obs_seq[0]]
    for t in range(1,track_len):
        for n in range(state_len):
            F[n,t] = np.dot(F[:,t-1],(A[:,n]))*B[n,obs_seq[t]]
    return F

def backward(obs_seq,A,B,pi):
    '''cal the probability from the observation sequence with backward''' 
    N = A.shape[0]
    T = len(obs_seq)
    # X保存后向概率矩阵
    X = np.zeros((N,T))
    X[:,-1:] = 1

    for t in reversed(range(T-1)):
        for n in range(N):
            X[n,t] = np.sum(X[:,t+1] * A[n,:] * B[:, obs_seq[t+1]])

    return X

print('track probability forward')
print(forward(O,A,B,pi))
print('track probability backward')
print(backward(O,A,B,pi))

track probability forward
[[6.00000000e-02 1.38000000e-02 1.94160000e-02 2.10864000e-03
  2.78613600e-03 2.22137232e-04 1.81474247e-04 1.88703033e-05
  4.84769398e-06 1.74484295e-06]
 [2.40000000e-01 9.72000000e-02 1.87380000e-02 1.02405600e-02
  6.77692800e-04 7.45473888e-04 1.54177651e-04 8.81693187e-05
  3.51376093e-05 1.35221243e-05]]
track probability backward
[[3.41866152e-05 1.33075207e-04 3.19940608e-04 1.56638588e-03
  3.92894800e-03 1.48276000e-02 3.39850000e-02 8.95000000e-02
  2.50000000e-01 1.00000000e+00]
 [5.50657097e-05 1.38174171e-04 4.83242629e-04 1.16829776e-03
  6.37513600e-03 1.60612000e-02 5.90200000e-02 1.54000000e-01
  4.00000000e-01 1.00000000e+00]]


In [55]:
# Based on the EM algrithm, the HMM learning process can be solved by Baum-Weich algorithm
def baum_weich(obs_seq,A,B,pi,criterion=0.05):
    '''
    Unsupervised learning algorithm
    '''
    n_states = A.shape[0]
    n_samples = len(obs_seq)

    done = False

    while not done:
        # Cal the track probability
        # alpha(i) = p(obs_seq | q0=s_i,hmm)
        alpha = forward(obs_seq,A,B,pi)
        # beta(i)=p(obs_seq | q0=s_i,hmm)
        beta = backward(obs_seq,A,B,pi)

        # xi: in the timestamp t, the t state is i and the t+1 state is j
        xi = np.zeros((n_states,n_states,n_samples-1))
        for t in range(n_samples-1):
            denom = np.dot(np.dot(alpha[:,t].T, A) * B[:,obs_seq[t+1]].T, beta[:,t+1])
            for i in range(n_states):
                numer = alpha[i,t] * A[i,:] * B[:,obs_seq[t+1]].T * beta[:,t+1].T
                xi[i,:,t] = numer / denom
            
        # gamma, in the timestamp t ,the t state is i
        gamma = np.sum(xi,axis=1)
        # prod for what
        prod = (alpha[:,n_samples-1]*beta[:,n_samples-1]).reshape((-1,1))
        gamma = np.hstack((gamma,prod/np.sum(prod)))

        # M
        newpi = gamma[:,0]
        newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
        newB = np.copy(B)
        num_levels = B.shape[1]
        sumgamma = np.sum(gamma,axis=1)
        for lev in range(num_levels):
            mask = obs_seq == lev
            newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma
        # 检查是否满足阈值
        if np.max(abs(pi - newpi)) < criterion and \
                        np.max(abs(A - newA)) < criterion and \
                        np.max(abs(B - newB)) < criterion:
            done = 1
        A[:], B[:], pi[:] = newA, newB, newpi
    return newA, newB, newpi

A = np.array([[0.5, 0.5],[0.5, 0.5]])
B = np.array([[0.3, 0.3, 0.3],[0.3, 0.3, 0.3]])
pi = np.array([0.6, 0.4])

observations_data, states_data = simulate(100,A,B,pi)
newA, newB, newpi = baum_weich(observations_data, A, B, pi)
print("newA: ", newA)
print("newB: ", newB)
print("newpi: ", newpi)



newA:  [[0.49998064 0.50001936]
 [0.49998347 0.50001653]]
newB:  [[0.27888263 0.36255398 0.35856339]
 [0.28112184 0.35743579 0.36144237]]
newpi:  [0.60170547 0.39829453]


NameError: name 'observations' is not defined