# 隐马尔科夫模型（HMM）及其Python实现
上一篇总结了Logistic Regression和Max Entropy，这一篇继续下图的思路，整理HMM。

代码主要参考[Hankcs](http://www.hankcs.com/ml/hidden-markov-model.html)大神的博客。
![](https://raw.githubusercontent.com/applenob/machine_learning_basic/master/res/crf.png)

首先总结一下图模型，所谓图模型，其实就是在统计建模的时候，结合图论的思想。


让我们先从朴素贝叶斯开始思考，随机变量y和所有的观测变量X有关，但每个观测变量对于y来说，又是独立的，也就是我们说的“naive”。这基本上是最简单的随机变量的关系了:$P(X|y)=p(x_1|y)\cdot p(x_2|y)...\cdot p(x_n|y)$。那我们可以从这里引申出什么呢？如果把所有的随机变量，都用图论中的节点表示，变量间的关系，由边表示，暂时先不考虑边的方向的问题，那么朴素贝叶斯就可以很直观地画成上面的第一幅图。


$$P_w(y|x)=\frac{1}{Z_w(x)}exp\bigl(\begin{smallmatrix}
\sum_{i=1}^{n} w_i\cdot f_i(x,y)
\end{smallmatrix}\bigr)$$

In [1]:
import numpy as np

In [2]:
# 对应状态集合
states = ('Healthy', 'Fever')
# 对应观测集合
observations = ('normal', 'cold', 'dizzy')
# 初始状态概率向量π
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
# 状态转移矩阵A
transition_probability = {
    'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
    'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}
# 观测概率矩阵B
emission_probability = {
    'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
    'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}

In [3]:
# 随机生成观测序列和状态序列    
def simulate(T):

    def draw_from(probs):
        """
        1.np.random.multinomial:
        按照多项式分布，生成数据
        >>> np.random.multinomial(20, [1/6.]*6, size=2)
                array([[3, 4, 3, 3, 4, 3],
                       [2, 4, 3, 4, 0, 7]])
         For the first run, we threw 3 times 1, 4 times 2, etc.  
         For the second, we threw 2 times 1, 4 times 2, etc.
        2.np.where:
        >>> x = np.arange(9.).reshape(3, 3)
        >>> np.where( x > 5 )
        (array([2, 2, 2]), array([0, 1, 2]))
        """
        return np.where(np.random.multinomial(1,probs) == 1)[0][0]

    observations = np.zeros(T, dtype=int)
    states = np.zeros(T, dtype=int)
    states[0] = draw_from(pi)
    observations[0] = draw_from(B[states[0],:])
    for t in range(1, T):
        states[t] = draw_from(A[states[t-1],:])
        observations[t] = draw_from(B[states[t],:])
    return observations,states

In [4]:
def generate_index_map(lables):
    """为label生成index""" 
    index_label = {}
    label_index = {}
    i = 0
    for l in lables:
        index_label[i] = l
        label_index[l] = i
        i += 1
    return label_index, index_label
 
 
states_label_index, states_index_label = generate_index_map(states)
observations_label_index, observations_index_label = generate_index_map(observations)
print states_label_index, states_index_label
print observations_label_index, observations_index_label

{'Healthy': 0, 'Fever': 1} {0: 'Healthy', 1: 'Fever'}
{'cold': 1, 'dizzy': 2, 'normal': 0} {0: 'normal', 1: 'cold', 2: 'dizzy'}


In [5]:
def convert_observations_to_index(observations, label_index):
    list = []
    for o in observations:
        list.append(label_index[o])
    return list
 
 
def convert_map_to_vector(map, label_index):
    v = np.empty(len(map), dtype=float)
    for e in map:
        v[label_index[e]] = map[e]
    return v
 
 
def convert_map_to_matrix(map, label_index1, label_index2):
    m = np.empty((len(label_index1), len(label_index2)), dtype=float)
    for line in map:
        for col in map[line]:
            m[label_index1[line]][label_index2[col]] = map[line][col]
    return m

In [6]:
A = convert_map_to_matrix(transition_probability, states_label_index, states_label_index)
print A
B = convert_map_to_matrix(emission_probability, states_label_index, observations_label_index)
print B
observations_index = convert_observations_to_index(observations, observations_label_index)
pi = convert_map_to_vector(start_probability, states_label_index)
print pi

[[ 0.7  0.3]
 [ 0.4  0.6]]
[[ 0.5  0.4  0.1]
 [ 0.1  0.3  0.6]]
[ 0.6  0.4]


In [7]:
# 生成模拟数据
observations_data, states_data = simulate(10)
print observations_data
print states_data
# 相应的label
print [states_index_label[index] for index in states_data]
print [observations_index_label[index] for index in observations_data]

[1 1 2 2 2 1 0 2 2 1]
[0 0 1 1 1 0 0 1 1 1]
['Healthy', 'Healthy', 'Fever', 'Fever', 'Fever', 'Healthy', 'Healthy', 'Fever', 'Fever', 'Fever']
['cold', 'cold', 'dizzy', 'dizzy', 'dizzy', 'cold', 'normal', 'dizzy', 'dizzy', 'cold']


前向算法/后向算法python实现：

In [8]:
def forward(obs_seq):
    """前向算法"""
    N = A.shape[0]
    T = len(obs_seq)

    F = np.zeros((N,T))
    F[:,0] = pi * B[:, obs_seq[0]]

    for t in range(1, T):
        for n in range(N):
            F[n,t] = np.dot(F[:,t-1], (A[:,n])) * B[n, obs_seq[t]]

    return F

def backward(obs_seq):
    """后向算法"""
    N = A.shape[0]
    T = len(obs_seq)

    X = np.zeros((N,T))
    X[:,-1:] = 1

    for t in reversed(range(T-1)):
        for n in range(N):
            X[n,t] = np.sum(X[:,t+1] * A[n,:] * B[:, obs_seq[t+1]])

    return X

In [9]:
def baum_welch_train(observations, criterion=0.05):
    """无监督学习算法——Baum-Weich算法"""
    n_states = A.shape[0]
    n_samples = len(observations)

    done = False
    while not done:
        # alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
        # Initialize alpha
        alpha = forward(observations)

        # beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
        # Initialize beta
        beta = backward(observations)

        xi = np.zeros((n_states,n_states,n_samples-1))
        for t in range(n_samples-1):
            denom = np.dot(np.dot(alpha[:,t].T, A) * B[:,observations[t+1]].T, beta[:,t+1])
            for i in range(n_states):
                numer = alpha[i,t] * A[i,:] * B[:,observations[t+1]].T * beta[:,t+1].T
                xi[i,:,t] = numer / denom

        # gamma_t(i) = P(q_t = S_i | O, hmm)
        gamma = np.sum(xi,axis=1)
        # Need final gamma element for new B
        prod =  (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
        gamma = np.hstack((gamma,  prod / np.sum(prod))) #append one more to gamma!!!

        newpi = gamma[:,0]
        newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
        newB = np.copy(B)

        num_levels = B.shape[1]
        sumgamma = np.sum(gamma,axis=1)
        for lev in range(num_levels):
            mask = observations == lev
            newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma

        if np.max(abs(pi - newpi)) < criterion and \
                        np.max(abs(A - newA)) < criterion and \
                        np.max(abs(B - newB)) < criterion:
            done = 1

        A[:], B[:], pi[:] = newA, newB, newpi


In [10]:
def observation_prob(obs_seq):
    """ P( entire observation sequence | A, B, pi ) """
    return np.sum(forward(obs_seq)[:,-1])

def state_path(obs_seq):
    """
    Returns
    -------
    V[last_state, -1] : float
        Probability of the optimal state path
    path : list(int)
        Optimal state path for the observation sequence
    """
    V, prev = viterbi(obs_seq)

    # Build state path with greatest probability
    last_state = np.argmax(V[:,-1])
    path = list(build_viterbi_path(prev, last_state))

    return V[last_state,-1], reversed(path)


In [11]:
def viterbi(obs_seq):
    """
    Returns
    -------
    V : numpy.ndarray
        V [s][t] = Maximum probability of an observation sequence ending
                   at time 't' with final state 's'
    prev : numpy.ndarray
        Contains a pointer to the previous state at t-1 that maximizes
        V[state][t]
    """
    N = A.shape[0]
    T = len(obs_seq)
    prev = np.zeros((T - 1, N), dtype=int)

    # DP matrix containing max likelihood of state at a given time
    V = np.zeros((N, T))
    V[:,0] = pi * B[:,obs_seq[0]]

    for t in range(1, T):
        for n in range(N):
            seq_probs = V[:,t-1] * A[:,n] * B[n, obs_seq[t]]
            prev[t-1,n] = np.argmax(seq_probs)
            V[n,t] = np.max(seq_probs)

    return V, prev

def build_viterbi_path(prev, last_state):
    """Returns a state path ending in last_state in reverse order."""
    T = len(prev)
    yield(last_state)
    for i in range(T-1, -1, -1):
        yield(prev[i, last_state])
        last_state = prev[i, last_state]

In [13]:
A = np.array([[0.5, 0.5],[0.5, 0.5]])
B = np.array([[0.3, 0.3, 0.3],[0.3, 0.3, 0.3]])
pi = np.array([0.5, 0.5])

observations_data, states_data = simulate(100)
# print observations_data
# print states_data
baum_welch_train(observations_data)
states_out = state_path(observations_data)[1]
p = 0.0
for s in states_data:
    if next(states_out) == s: p += 1
 
print p / len(states_data)

0.56


In [15]:
A = convert_map_to_matrix(transition_probability, states_label_index, states_label_index)
B = convert_map_to_matrix(emission_probability, states_label_index, observations_label_index)
observations_index = convert_observations_to_index(observations, observations_label_index)
pi = convert_map_to_vector(start_probability, states_label_index)
V, p = viterbi(observations_index)
print " " * 7, " ".join(("%10s" % observations_index_label[i]) for i in observations_index)
for s in range(0, 2):
    print "%7s: " % states_index_label[s] + " ".join("%10s" % ("%f" % v) for v in V[s])
print '\nThe most possible states and probability are:'
p, ss = state_path(observations_index)
for s in ss:
    print states_index_label[s],
print p

            normal       cold      dizzy
Healthy:   0.300000   0.084000   0.005880
  Fever:   0.040000   0.027000   0.015120

The most possible states and probability are:
Healthy Healthy Fever 0.01512


隐马尔科夫用类包装的代码：

In [None]:
class HMM:
    """
    Order 1 Hidden Markov Model
 
    Attributes
    ----------
    A : numpy.ndarray
        State transition probability matrix
    B: numpy.ndarray
        Output emission probability matrix with shape(N, number of output types)
    pi: numpy.ndarray
        Initial state probablity vector
    """
    def __init__(self, A, B, pi):
        self.A = A
        self.B = B
        self.pi = pi
    
    def simulate(self, T):
 
        def draw_from(probs):
        """
        1.np.random.multinomial:
        按照多项式分布，生成数据
        >>> np.random.multinomial(20, [1/6.]*6, size=2)
                array([[3, 4, 3, 3, 4, 3],
                       [2, 4, 3, 4, 0, 7]])
         For the first run, we threw 3 times 1, 4 times 2, etc.  
         For the second, we threw 2 times 1, 4 times 2, etc.
        2.np.where:
        >>> x = np.arange(9.).reshape(3, 3)
        >>> np.where( x > 5 )
        (array([2, 2, 2]), array([0, 1, 2]))
        """
            return np.where(np.random.multinomial(1,probs) == 1)[0][0]

        observations = np.zeros(T, dtype=int)
        states = np.zeros(T, dtype=int)
        states[0] = draw_from(self.pi)
        observations[0] = draw_from(self.B[states[0],:])
        for t in range(1, T):
            states[t] = draw_from(self.A[states[t-1],:])
            observations[t] = draw_from(self.B[states[t],:])
        return observations,states
    
    def _forward(self, obs_seq):
        """前向算法"""
        N = self.A.shape[0]
        T = len(obs_seq)

        F = np.zeros((N,T))
        F[:,0] = self.pi * self.B[:, obs_seq[0]]

        for t in range(1, T):
            for n in range(N):
                F[n,t] = np.dot(F[:,t-1], (self.A[:,n])) * self.B[n, obs_seq[t]]

        return F
    
    def _backward(self, obs_seq):
        """后向算法"""
        N = self.A.shape[0]
        T = len(obs_seq)

        X = np.zeros((N,T))
        X[:,-1:] = 1

        for t in reversed(range(T-1)):
            for n in range(N):
                X[n,t] = np.sum(X[:,t+1] * self.A[n,:] * self.B[:, obs_seq[t+1]])

        return X
    
    def baum_welch_train(self, observations, criterion=0.05):
        """无监督学习算法——Baum-Weich算法"""
        n_states = self.A.shape[0]
        n_samples = len(observations)

        done = False
        while not done:
            # alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
            # Initialize alpha
            alpha = self._forward(observations)

            # beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
            # Initialize beta
            beta = self._backward(observations)

            xi = np.zeros((n_states,n_states,n_samples-1))
            for t in range(n_samples-1):
                denom = np.dot(np.dot(alpha[:,t].T, self.A) * self.B[:,observations[t+1]].T, beta[:,t+1])
                for i in range(n_states):
                    numer = alpha[i,t] * self.A[i,:] * self.B[:,observations[t+1]].T * beta[:,t+1].T
                    xi[i,:,t] = numer / denom

            # gamma_t(i) = P(q_t = S_i | O, hmm)
            gamma = np.sum(xi,axis=1)
            # Need final gamma element for new B
            prod =  (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
            gamma = np.hstack((gamma,  prod / np.sum(prod))) #append one more to gamma!!!

            newpi = gamma[:,0]
            newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
            newB = np.copy(self.B)

            num_levels = self.B.shape[1]
            sumgamma = np.sum(gamma,axis=1)
            for lev in range(num_levels):
                mask = observations == lev
                newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma

            if np.max(abs(self.pi - newpi)) < criterion and \
                            np.max(abs(self.A - newA)) < criterion and \
                            np.max(abs(self.B - newB)) < criterion:
                done = 1

            self.A[:],self.B[:],self.pi[:] = newA,newB,newpi
    
    def observation_prob(self, obs_seq):
        """ P( entire observation sequence | A, B, pi ) """
        return np.sum(self._forward(obs_seq)[:,-1])

    def state_path(self, obs_seq):
        """
        Returns
        -------
        V[last_state, -1] : float
            Probability of the optimal state path
        path : list(int)
            Optimal state path for the observation sequence
        """
        V, prev = self.viterbi(obs_seq)

        # Build state path with greatest probability
        last_state = np.argmax(V[:,-1])
        path = list(self.build_viterbi_path(prev, last_state))

        return V[last_state,-1], reversed(path)

    def viterbi(self, obs_seq):
        """
        Returns
        -------
        V : numpy.ndarray
            V [s][t] = Maximum probability of an observation sequence ending
                       at time 't' with final state 's'
        prev : numpy.ndarray
            Contains a pointer to the previous state at t-1 that maximizes
            V[state][t]
        """
        N = self.A.shape[0]
        T = len(obs_seq)
        prev = np.zeros((T - 1, N), dtype=int)

        # DP matrix containing max likelihood of state at a given time
        V = np.zeros((N, T))
        V[:,0] = self.pi * self.B[:,obs_seq[0]]

        for t in range(1, T):
            for n in range(N):
                seq_probs = V[:,t-1] * self.A[:,n] * self.B[n, obs_seq[t]]
                prev[t-1,n] = np.argmax(seq_probs)
                V[n,t] = np.max(seq_probs)

        return V, prev

    def build_viterbi_path(self, prev, last_state):
        """Returns a state path ending in last_state in reverse order."""
        T = len(prev)
        yield(last_state)
        for i in range(T-1, -1, -1):
            yield(prev[i, last_state])
            last_state = prev[i, last_state]