# HMM示例

In [1]:
import numpy as np
class HMM(object):
    def __init__(self):
        self.train = None    #训练集显状态序列
        self.label = None    #训练集隐状态序列
        self.prob = None    #全局隐状态边缘分布
        self.trans = None    #转移矩阵(row:T时刻隐状态 col:T+1时刻隐状态)
        self.emit = None    #发射(混淆)矩阵(row:隐状态 col:显状态)
    
    def fit(self,train,label):
        '''导入训练集，生成全局隐状态边缘分布、转移矩阵以及混淆矩阵'''
        assert isinstance(train,list)
        assert isinstance(label,list)
        assert len(train) == len(label)
        self.train = train
        self.label = label
        self.cal_prob()
        self.cal_trans()
        self.cal_emit()
        
    def cal_prob(self):
        '''生成全局隐状态边缘分布'''
        prob = np.zeros(max(self.label)+1)
        for y in set(self.label):
            prob[y] += float(self.label.count(y)/len(self.label))
        self.prob = np.log(prob + 1e-12)

    def cal_trans(self):
        '''生成转移矩阵'''
        trans = np.zeros((len(set(self.label)),len(set(self.label))))
        last = self.label[0]
        for y in self.label[1:]:
            trans[last,y] += 1
            last = y
        trans = np.array([row/row.sum() for row in trans])
        self.trans = np.log(trans+ 1e-12)
    
    def cal_emit(self):
        '''生成发射(混淆)矩阵'''
        emit = np.zeros((len(set(self.label)),len(set(self.train))))
        for x,y in zip(self.train,self.label):
            emit[y,x] += 1
        emit = np.array([row/row.sum() for row in emit])
        self.emit = np.log(emit+ 1e-12)

    def viterbi_predict(self,test):
        '''Viterbi算法调整'''
        assert isinstance(test,list)
        assert len(set(test)-set(np.arange(self.emit.shape[1]))) == 0
        seq = []
        psi_mat = np.zeros((len(set(self.label)), len(test)))
        pred = self.prob+self.emit[:,test[0]] 
        for t in range(1, len(test)):
            psi_mat[:,t] = np.array([(pred+self.trans[:,y]).argmax() 
                                     for y in range(pred.shape[0])])
            #在原模型上添加此行
            pred = np.array([(pred+self.trans[:,y]).max()
                             for y in range(pred.shape[0])])    
            pred = pred+self.emit[:,test[t]]    #预测T时刻隐状态
        last_state = pred.argmax()
        seq.append(last_state)
        for i in list(range(1, len(test)))[::-1]:
            last_state = int(psi_mat[last_state,i])
            seq.append(last_state)
        seq.reverse()
        return seq

In [2]:
train = [0,5,2,6,4,3,2,0,5,1,5,1,2,5,2,0,2,0,2,3,6,4,6,4,2,5,1,0,2,5]
label = [1,0,2,1,2,1,2,2,0,2,0,1,0,0,2,1,1,2,2,0,1,2,0,2,1,2,0,0,0,1]
test = [0,2,3,6,3,1,1,2,2,5,1,5,4,2,3,6,2,5,0,5,2]
model = HMM()
model.fit(train, label)    #模型训练
model.viterbi_predict(test)

[1, 2, 0, 1, 0, 1, 2, 1, 2, 0, 2, 0, 2, 2, 0, 1, 2, 0, 2, 0, 2]