In [1]:
import numpy as np
from tqdm import tqdm

In [16]:
#隐马尔可夫模型
class HMM:
    #param:
    # status 状态集合
    # observe 观测集合
    # (A,B,PI) 状态转移矩阵 发射矩阵 初始状态矩阵
    def __init__(self,status,observe,A=None,B=None,PI=None):
        self.print=True
        
        #初始化 状态集合status 观测集合observe
        self.status= status # 状态集合 N个 {盒子1，盒子2，盒子3}
        self.observe=observe # 观测集合 M个 {红，白}
        self.N = len(self.status)  # 状态集合有多少元素
        self.M = len(self.observe)  # 观测集合有多少元素
        
        #数字化 状态集合Q 观测集合V 
        self.status_dict={} # {'盒子1':0，'盒子2':1，'盒子3':2}
        self.observe_dict={}# {'红':0，'白':1}
        for i in range(len(status)):
            self.status_dict[status[i]]=i
        for i in range(len(observe)):
            self.observe_dict[observe[i]]=i
        self.Q=np.arange(0,self.N) #状态集合 [0,1,2]
        self.V=np.arange(0,self.M) #观测集合 [0,1]
        
        #初始化 (A,B,PI)
        self.A=A # 状态转移矩阵
        self.B=B # 发射矩阵
        self.PI=PI # 初始状态概率
        
        
        if self.print:
            print('状态集合status',self.status_dict)
            print('观测集合observe',self.observe_dict)
            print('状态集合Q',self.Q)
            print('观测集合V',self.V)
            print()
        
    
    #计算前向概率
    #param
    # o 观测序列
    def calc_foward(self,o):
        print('calc_foward')
        #数字化 观测序列O
        O=np.array([self.observe_dict[x] for x in o])
        
        A=self.A
        B=self.B
        
        if self.print:
            print('观测序列O',O)
        
        #初始化alpha
        alpha=self.PI*(B.T[O[0]])
        if self.print:
            print('alpha0',alpha)
            
        #循环计算alpha
        for i in range(1,len(O)):
            alpha=alpha@A*(B.T[O[i]])
            if self.print:
                print(f'alpha{i}',alpha)
        ret=np.sum(alpha)
        print(f'前向概率:{ret}\n')
        return ret
    
    #计算后向概率
    def calc_backward(self,o):
        print('calc_backward')
        #数字化 观测序列O
        O=np.array([self.observe_dict[x] for x in o])
        
        A=self.A
        B=self.B
        
        if self.print:
            print('观测序列O',O)
        
        #初始化beta
        beta=np.ones(self.N)
        
        #循环计算beta
        for i in range(len(O)-1,0,-1):
            beta=A@(beta*(B.T[O[i]]))
            if self.print:
                print(f'beta{i}',beta)
        
        beta=self.PI*(beta*(B.T[O[0]]))
        if self.print:
            print('beta0',beta)
            
        ret=np.sum(beta)
        print(f'后向概率:{ret}\n')
        return ret
    
    #训练模型(A,B,PI)
    def train(self,data):
        self.calc_Param(data)
        pass
    
    #利用数据data(状态序列+观测序列) 计算参数(A,B,PI) 直接使用统计方法 统计A,B,PI
    #param
    # data: (N,(2,n)) N:数据总条数 
    def calc_Param(self,data):
        self.N = len(self.status)  # 状态集合有多少元素
        self.M = len(self.observe)  # 观测集合有多少元素
        
        A=np.zeros((self.N,self.N)) #(N,N)
        B=np.zeros((self.N,self.M)) #(N,M)
        PI=np.zeros(self.N) #(N,)
        
        for i in range(data.shape[0]):
            #d (2,n)状态和观测序列
            #row0 状态序列
            #row1 观测序列
            d=data[i]
            n=d.shape[1]

            PI[d[0,0]]+=1
            B[d[0,0],d[1,0]]+=1
            
            for j in range(1,n):
                #d[0,i] 当前状态
                #d[0,i-1] 上一次状态
                #[1,i] 当前观测
                A[d[0,j-1],d[0,j]]+=1
                B[d[0,j],d[1,j]]+=1
        A=A/np.sum(A,axis=1,keepdims=True)
        B=B/np.sum(B,axis=1,keepdims=True)
        PI=PI/np.sum(PI)
        print('训练参数结果(A,B,PI)')
        print(f'A:{A}')
        print(f'B:{B}')
        print(f'PI:{PI}')
        self.A=A
        self.B=A
        self.PI=PI
 
    #维特比算法(动态规划)
    #已知(A,B,PI) 和一观测序列o 求解其状态序列的概率最大解
    #param:
    # o 观测序列 [0,1,1,1,1,0] shape(n,)
    #return:
    # ret 状态序列 shape(n,)
    #solition:
    # delta计算从前一个的所有状态 到当前状态的概率最大值 psi记录下来概率最大值的前一个状态
    # 从delta的最后取最大值，利用psi向前回溯即可找到最大概率序列
    def viterbi_t(self,o):
        n=o.shape[0]
        
        delta=np.zeros((self.N,n)) #delta[:,i] 为到达该观测序列的 每个状态的最大概率值 shape(状态个数,序列长度)
        psi=np.zeros((self.N,n),dtype=np.int32) #psi[:,i] 为到达该观测序列的 最大概率值的 上一个状态为哪个
        A=self.A
        B=self.B
        PI=self.PI
        
        delta[:,0]=PI*B[:,o[0]]
        #psi[:,0]=np.argmax(delta[:,0])
        for i in range(1,n):
            temp=delta[:,i-1]*A.T
            psi[:,i]=np.argmax(temp,axis=1)
            delta[:,i]=np.max(temp,axis=1)*B[:,o[i]]
            #check(delta)
            #check(psi)
        
        ret=np.zeros(n,dtype=np.int32)
        ret[-1]=np.argmax(delta[:,-1])

        for i in range(n-2,-1,-1):
            ret[i]=psi[ret[i+1]][i+1]
        return ret
    
    #计算a,b阵列的准确率
    #param:
    # a 源标签
    # b 目的标签
    def calc_acc(self,label,predict):
        return np.mean(np.equal(label,predict))
    
    #在已有的参数(A,B,PI)下随机生成一组长度为n的 状态和观测序列
    #return:
    # np.ndarray[] shape=(2,n)
    # row0 状态序列
    # row1 观测序列
    def generate_Sequence(self,n):
        PI=self.PI
        A=self.A
        B=self.B
        
        ret=np.zeros((2,n),dtype=np.int32)
        
        #从状态集中以概率P随机选择一个状态
        ret[0,0]=np.random.choice(self.Q,p=PI)
        #从该状态生成一个观测值
        ret[1,0]=np.random.choice(self.V,p=B[ret[0,0]])
        
        
        for i in range(1,n):
            ret[0,i]=np.random.choice(self.Q,p=A[ret[0,i-1]])
            ret[1,i]=np.random.choice(self.V,p=B[ret[0,i]])
        return ret
    
    #生成 N组序列 序列中状态长度为n shape=(N,2,n)
    def generate_Data(self,N,n):
        ret=np.array([])
        for i in range(N):
            a=self.generate_Sequence(n)
            a=a.reshape(1,a.shape[0],a.shape[1])
            if i==0:
                ret=a
            else:
                ret=np.r_[ret,a]
        return ret
    
    #数字化观测序列o 将['red','white','red']转换为[0,1,0]
    def observe_transform(self,o):
        return np.array([self.observe_dict[x] for x in o])

In [28]:

#使用测试样例
def useTestCase():
    status=['盒子1','盒子2','盒子3']
    observe=['red','white']
    A=np.array([[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]])
    B=np.array([[0.5,0.5],[0.4,0.6],[0.7,0.3]])
    PI=np.array([0.2,0.4,0.4])
    o=['red','white','red']
    hmm=HMM(status,observe,A,B,PI)
    O=hmm.observe_transform(o)
    #hmm.calc_foward(O)
    #hmm.calc_backward(O)
    
    #训练参数(A,B,PI)
    def train():
        #生成m组数据(状态+观测序列)，每组序列的长度为n
        #根据生成的数据来计算(A,B,PI)
        hmm.train(hmm.generate_Data(10000,10)) #(m,2,n)
    
    #维特比函数预测观测序列的状态 求准确度
    def eval1():
        acc=0
        #epoch执行次数
        epoch=100
        for i in tqdm(range(epoch)):
            #生成1组数据(状态+观测序列)，每组序列的长度为n
            data=hmm.generate_Data(1,10) #(1,2,n)
            #check(data)
            predict=hmm.viterbi_t(data[0][1])
            label=data[0][0]
            #print(predict)
            #print(label)
            acc+=hmm.calc_acc(label,predict)
        print(acc/epoch)
    train()
    #eval1()
    
    
    
useTestCase()
    #维特比预测状态准确率
    #1    -> 0.52
    #10   -> 0.42
    #100  -> 0.41
    #1000 -> 0.377

状态集合status {'盒子1': 0, '盒子2': 1, '盒子3': 2}
观测集合observe {'red': 0, 'white': 1}
状态集合Q [0 1 2]
观测集合V [0 1]

训练参数结果(A,B,PI)
A:[[0.50086028 0.19804066 0.30109906]
 [0.30540541 0.49404103 0.20055357]
 [0.20018824 0.29865957 0.50115219]]
B:[[0.49803527 0.50196473]
 [0.39815931 0.60184069]
 [0.69982446 0.30017554]]
PI:[0.2    0.3995 0.4005]


In [3]:
#查看数组a1的属性
def check(a1):
    print(a1)  
    print("数据类型",type(a1))           #打印数组数据类型  
    print("数组元素数据类型：",a1.dtype) #打印数组元素数据类型  
    print("数组元素总数：",a1.size)      #打印数组尺寸，即数组元素总数  
    print("数组形状：",a1.shape)         #打印数组形状  
    print("数组的维度数目",a1.ndim)      #打印数组的维度数目
    print()