In [2]:
import numpy as np

## HMM模型算法

In [3]:
class HMM(object):
    # HMM模型
    def __init__(self, N, M, pi=None, A=None, B=None):
        """
        N: 状态数量集
        M: 观测数量集
        pi: 初始概率分布
        A: 状态转移概率矩阵
        B: 观测概率矩阵
        """
        self.N = N
        self.M = M
        self.pi = pi
        self.A = A
        self.B = B
    
    # 根据给定的概率分布随机返回数据（索引）
    def get_data_with_distribute(self, dist):
        r = np.random.rand()
        for i, p in enumerate(dist):
            if r < p:
                return i
            r -= p
    
    # 随机生成观测序列
    def generate(self, T: int):
        '''
        根据给定的参数生成观测序列
        T: 指定要生成数据的数量
        '''
        z = self.get_data_with_distribute(self.pi)
        x = self.get_data_with_distribute(self.B[z])
        result = [x]
        for _ in range(T - 1):
            z = self.get_data_with_distribute(self.A[z])
            x = self.get_data_with_distribute(self.B[z])
            result.append(x)
        return result
    
    # 前向算法（概率计算）
    def evaluate(self, X):
        '''
        根据给定的参数计算条件概率
        X: 观测数据
        '''
        alpha = self.pi * self.B[:, X[0]]
        for x in X[1:]:
            alpha = np.sum(self.A * alpha.reshape(-1, 1) * self.B[:, x].reshape(1, -1), axis=0)
        return alpha.sum()
    
    # 前向后向算法（概率计算）
    def evaluate_backward(self, X):
        beta = np.ones(self.N)
        for x in X[:0:-1]:
            beta_next = np.empty(self.N)
            for i in range(self.N):
                beta_next[i] = np.sum(self.A[i,:] * self.B[:,x] * beta)
            beta = beta_next
        return np.sum(beta * self.pi * self.B[:,X[0]])
    
    # Baum-Welch算法（参数估计）
    def fit(self, X):
        '''
        根据给定观测序列反推参数
        '''
        # 初始化参数 pi, A, B
        self.pi = np.random.sample(self.N)
        self.A = np.ones((self.N,self.N)) / self.N
        self.B = np.ones((self.N,self.M)) / self.M
        self.pi = self.pi / self.pi.sum()
        T = len(X)
        for _ in range(50):
            # 按公式计算下一时刻的参数
            alpha, beta = self.get_something(X)
            gamma = alpha * beta

            for i in range(self.N):
                for j in range(self.N):
                    self.A[i,j] = np.sum(alpha[:-1,i]*beta[1:,j]*self.A[i,j]*self.B[j,X[1:]]) / gamma[:-1,i].sum()

            for j in range(self.N):
                for k in range(self.M):
                    self.B[j,k] = np.sum(gamma[:,j]*(X == k)) / gamma[:,j].sum()
            
            self.pi = gamma[0] / gamma[-1].sum()

    def get_something(self, X):
        '''
        根据给定数据与参数，计算所有时刻的前向概率和后向概率
        '''
        T = len(X)
        alpha = np.zeros((T,self.N))
        alpha[0,:] = self.pi * self.B[:,X[0]]
        for i in range(T-1):
            x = X[i+1]
            alpha[i+1,:] = np.sum(self.A * alpha[i].reshape(-1,1) * self.B[:,x].reshape(1,-1), axis=0)

        beta = np.ones((T,self.N))
        for j in range(T-1,0,-1):
            for i in range(self.N):
                beta[j-1,i] = np.sum(self.A[i,:] * self.B[:,X[j]] * beta[j])

        return alpha, beta
    
    # 维特比算法（解码算法）
    def decode(self, X):
        T = len(X)
        x = X[0]
        delta = self.pi * self.B[:,x]
        varphi = np.zeros((T, self.N), dtype=int)
        path = [0] * T
        for i in range(1, T):
            delta = delta.reshape(-1,1)     # 转成一列方便广播
            tmp = delta * self.A
            varphi[i,:] = np.argmax(tmp, axis=0)
            delta = np.max(tmp, axis=0) * self.B[:,X[i]]
        path[-1] = np.argmax(delta)
        # 回溯最优路径
        for i in range(T-1,0,-1):
            path[i-1] = varphi[i,path[i]]
        return path

In [4]:
pi = np.array([0.25, 0.25, 0.25, 0.25])
A = np.array([
    [0, 1, 0, 0],
    [0.4, 0, 0.6, 0],
    [0, 0.4, 0, 0.6],
    [0, 0, 0.5, 0.5]])
B = np.array([
    [0.5, 0.5],
    [0.3, 0.7],
    [0.6, 0.4],
    [0.8, 0.2]])

In [5]:
# pi = np.array([0.33, 0.33, 0.34])
# A = np.array([
#     [0.33, 0.33, 0.34],
#     [0.33, 0.33, 0.34],
#     [0.33, 0.33, 0.34]])
# B = np.array([
#     [0.16, 0.16, 0.16, 0.16, 0.16, 0.2, 0, 0],
#     [0.25, 0.25, 0.25, 0.25, 0, 0, 0, 0],
#     [0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125]])

## 创建模型

In [6]:
hmm = HMM(4, 2, pi, A, B)

In [19]:
# 随机生成一组观测序列
temp = hmm.generate(10)
temp

[1, 1, 1, 1, 1, 0, 0, 0, 1, 1]

In [20]:
hmm.evaluate(temp)

0.00061630632671457277

In [21]:
hmm.evaluate_backward(temp)

0.00061630632671457288

## 编码问题，维特比算法

In [12]:
hmm.decode(temp)

[1, 0, 1, 2, 3]

## 学习问题

In [61]:
data = np.array([1, 0, 1, 1, 0, 0, 1, 0, 0, 1])
hmm = HMM(4, 2)
hmm.fit(data)               # 先根据给定数据反推参数
gen_obs = hmm.generate(10)  # 再根据学习的参数生成数据
print(gen_obs)

[1, 0, 0, 1, 0, 0, 1, 0, 1, 1]


In [62]:
hmm.pi

array([ 1.,  0.,  0.,  0.])

In [63]:
hmm.B

array([[ 0.        ,  1.        ],
       [ 0.59835734,  0.40164266],
       [ 0.8781513 ,  0.1218487 ],
       [ 1.        ,  0.        ]])