In [20]:
import numpy as np
from typing import List,Tuple


In [21]:
class HiddenMarkovModel:
    """实现隐马尔可夫模型"""
    def __init__(self,states:List[str],observations:List[str]):
        """statesList:状态集合 observationsList:观测集合"""
        self.states = states
        self.observations = observations
        self.n_states=len(states)
        self.n_observations=len(observations)
        #index to state
        self.state_to_index={state:index for index,state in enumerate(states)}
        self.obs_to_index={obs:index for index,obs in enumerate(observations)}
        #init the matrix
        self.A=np.random.rand(self.n_states,self.n_states) #转移矩阵
        self.A=self.A/self.A.sum(axis=1,keepdims=True) #归一化

        self.B=np.random.rand(self.n_states,self.n_observations) #观测矩阵
        self.B=self.B/self.B.sum(axis=1,keepdims=True) #归一化

        self.pi=np.random.rand(self.n_states)
        self.pi=self.pi/self.pi.sum()

    def forward_algorithm(self, obs_sequence: List[str]) -> Tuple[np.ndarray, float]:
        """
        Args:
            obs_sequence (List[str]): 观测序列

        Returns:
            Tuple[np.ndarray, float]: 
                alpha (np.ndarray): 前向概率矩阵
                prob (float): 观测序列的概率
        """
        T=len(obs_sequence)

        #init the probability matrix
        alpha=np.zeros((T, self.n_states))
        obs_idx=[self.obs_to_index[obs] for obs in obs_sequence]
        alpha[0,:]=self.pi*self.B[:,obs_idx[0]]

        #calculate the probability matrix
        for t in range(1,T):
            obs_idx=self.obs_to_index[obs_sequence[t]]
            for j in range(self.n_states):
                alpha[t,j]=np.sum(alpha[t-1,:]*self.A[:,j])*self.B[j,obs_idx]

        #calculate the probability
        prob=np.sum(alpha[T-1,:])
        return alpha,prob
    
    def backward_algorithm(self,obs_sequence:List[str])->np.ndarray:
        """算法介绍
        obs_sequence:观测序列
        返回：
        beta:后向概率矩阵
        """
        T=len(obs_sequence)
        beta=np.zeros((T,self.n_states))

        beta[T-1,:]=self.pi
        for t in range(T-2,-1,-1):
            obs_idx=self.obs_to_index[obs_sequence[t]]
            for i in range(self.n_states):
                beta[t, i] = np.sum(self.A[i, :] * self.B[:, obs_idx] * beta[t+1, :])
        return beta
    
    def viterbi_algorithm(self,obs_sequence:List[str])->Tuple[List[str],float]:
        """维比特算法 为解决观测序列的解码问题
        obs_sequence:观测序列
        返回：
        1. 最可能的隐藏状态序列
        2. 最可能的隐藏状态序列的概率
        """
        T=len(obs_sequence)

        delta=np.zeros((T,self.n_states))
        psi=np.zeros((T,self.n_states),dtype=int)
        obs_idx=self.obs_to_index[obs_sequence[0]]
        delta[0,:]=self.pi[:]*self.B[:,obs_idx]
        for t in range(1,T):
            obs_idx=self.obs_to_index[obs_sequence[t]]
            for j in range(self.n_states):
                delta[t,j]=np.max(delta[t-1,:]*self.A[:,j])*self.B[j,obs_idx]
                psi[t,j]=np.argmax(delta[t-1,:]*self.A[:,j])
        # backward pass
        best_path_idx=[int(np.argmax(delta[T-1,:]))]
        for t in range(T-1,-1,-1):
            prev_state=int(psi[t,best_path_idx[-1]])
            best_path_idx.append(prev_state)
        best_path_idx.reverse()
        best_path=[self.states[i] for i in best_path_idx]
        best_prob=np.max(delta[T-1,:])
        return best_path, best_prob
    def baum_welch_algorithm(self,obs_sequences:List[List[str]],max_iter:int=1000):
        """
        使用Baum-Welch算法对模型进行训练 解决学习问题
        参数:
            obs_sequences:观测序列
            max_iter:最大迭代次数
        """
        for iteration in range(max_iter):
            xi_sum=np.zeros((self.n_states,self.n_states))
            gamma_sum=np.zeros((self.n_states,self.n_observations))
            alpha_sum=np.zeros(self.n_states)

            for obs_sequence in obs_sequences:
                T=len(obs_sequence)
                alpha,_=self.forward_algorithm(obs_sequence)
                beta=self.backward_algorithm(obs_sequence)

                xi=np.zeros((T-1,self.n_states,self.n_states))
                gamma=np.zeros((T,self.n_states))
                for t in range(T-1):
                    obs_index=self.obs_to_index[obs_sequence[t+1]]
                    denominator = np.sum(alpha[t, :] * np.dot(self.A, self.B[:, obs_index]) * beta[t+1, :])
                    for i in range(self.n_states):
                        for j in range(self.n_states):
                            xi[t, i, j] = alpha[t, i] * self.A[i, j] * self.B[j, obs_index] * beta[t+1, j] / denominator
                    
                for t in range(T):
                    gamma[t, :] = alpha[t, :] * beta[t, :]
                    gamma[t, :] = gamma[t, :] / np.sum(gamma[t, :])
                
                # 累积统计量
                for t in range(T-1):
                    xi_sum += xi[t, :, :]
                
                for t in range(T):
                    obs_index = self.obs_to_index[obs_sequence[t]]
                    gamma_sum[:, obs_index] += gamma[t, :]
                
                alpha_sum += gamma[0, :]
            
            # 更新参数
            old_A, old_B, old_pi = self.A.copy(), self.B.copy(), self.pi.copy()
            
            # 更新转移概率矩阵
            self.A = xi_sum / np.sum(xi_sum, axis=1, keepdims=True)
            
            # 更新观测概率矩阵
            self.B = gamma_sum / np.sum(gamma_sum, axis=1, keepdims=True)
            
            # 更新初始状态概率
            self.pi = alpha_sum / np.sum(alpha_sum)
            
            # 检查收敛性
            if (np.allclose(self.A, old_A) and 
                np.allclose(self.B, old_B) and 
                np.allclose(self.pi, old_pi)):
                print(f"算法在第 {iteration+1} 次迭代后收敛")
                break
    
    def print_model(self):
        """
        打印模型参数
        """
        print("隐藏状态:", self.states)
        print("观测状态:", self.observations)
        print("\n初始状态概率 π:")
        for i, state in enumerate(self.states):
            print(f"  {state}: {self.pi[i]:.4f}")
        
        print("\n状态转移概率矩阵 A:")
        print("      ", end="")
        for state in self.states:
            print(f"{state:>8}", end="")
        print()
        for i, state1 in enumerate(self.states):
            print(f"{state1:>6} ", end="")
            for j in range(self.n_states):
                print(f"{self.A[i, j]:>8.4f}", end="")
            print()
        
        print("\n观测概率矩阵 B:")
        print("      ", end="")
        for obs in self.observations:
            print(f"{obs:>8}", end="")
        print()
        for i, state in enumerate(self.states):
            print(f"{state:>6} ", end="")
            for j in range(self.n_observations):
                print(f"{self.B[i, j]:>8.4f}", end="")
            print()
def example_usage():
    # 定义天气模型示例
    states = ['Sunny', 'Rainy']
    observations = ['walk', 'shop', 'clean']
    
    # 创建HMM模型
    hmm = HiddenMarkovModel(states, observations)
    
    # 设置示例参数
    hmm.pi = np.array([0.6, 0.4])  # 初始状态概率
    
    # 状态转移概率矩阵
    hmm.A = np.array([
        [0.7, 0.3],  # Sunny -> Sunny, Sunny -> Rainy
        [0.4, 0.6]   # Rainy -> Sunny, Rainy -> Rainy
    ])
    
    # 观测概率矩阵
    hmm.B = np.array([
        [0.6, 0.3, 0.1],  # Sunny时的观测概率: walk, shop, clean
        [0.1, 0.4, 0.5]   # Rainy时的观测概率: walk, shop, clean
    ])
    
    print("=== HMM模型参数 ===")
    hmm.print_model()
    
    # 观测序列示例
    obs_sequence = ['walk', 'shop', 'clean', 'walk']
    print(f"\n=== 观测序列: {obs_sequence} ===")
    
    # 1. 评估问题 - 使用前向算法计算观测序列概率
    alpha, prob = hmm.forward_algorithm(obs_sequence)
    print(f"\n观测序列概率 (前向算法): {prob:.6f}")
    
    # 2. 解码问题 - 使用维特比算法找出最优状态序列
    best_path, best_prob = hmm.viterbi_algorithm(obs_sequence)
    print(f"\n最优状态序列 (维特比算法): {best_path}")
    print(f"最优路径概率: {best_prob:.6f}")
    
    # 3. 学习问题示例 - 使用Baum-Welch算法
    print("\n=== Baum-Welch算法示例 ===")
    training_sequences = [
        ['walk', 'shop', 'clean'],
        ['walk', 'walk', 'clean'],
        ['shop', 'clean', 'clean'],
        ['walk', 'shop', 'walk']
    ]
    
    # 创建新的随机初始化模型进行训练
    hmm_train = HiddenMarkovModel(states, observations)
    print("\n训练前的模型参数:")
    hmm_train.print_model()
    
    # 训练模型
    hmm_train.baum_welch_algorithm(training_sequences, max_iter=50)
    
    print("\n训练后的模型参数:")
    hmm_train.print_model()

if __name__ == "__main__":
    example_usage()

=== HMM模型参数 ===
隐藏状态: ['Sunny', 'Rainy']
观测状态: ['walk', 'shop', 'clean']

初始状态概率 π:
  Sunny: 0.6000
  Rainy: 0.4000

状态转移概率矩阵 A:
         Sunny   Rainy
 Sunny   0.7000  0.3000
 Rainy   0.4000  0.6000

观测概率矩阵 B:
          walk    shop   clean
 Sunny   0.6000  0.3000  0.1000
 Rainy   0.1000  0.4000  0.5000

=== 观测序列: ['walk', 'shop', 'clean', 'walk'] ===

观测序列概率 (前向算法): 0.011853

最优状态序列 (维特比算法): ['Sunny', 'Sunny', 'Rainy', 'Rainy', 'Sunny']
最优路径概率: 0.003110

=== Baum-Welch算法示例 ===

训练前的模型参数:
隐藏状态: ['Sunny', 'Rainy']
观测状态: ['walk', 'shop', 'clean']

初始状态概率 π:
  Sunny: 0.8100
  Rainy: 0.1900

状态转移概率矩阵 A:
         Sunny   Rainy
 Sunny   0.1139  0.8861
 Rainy   0.9210  0.0790

观测概率矩阵 B:
          walk    shop   clean
 Sunny   0.1537  0.1843  0.6620
 Rainy   0.5330  0.1500  0.3169
算法在第 7 次迭代后收敛

训练后的模型参数:
隐藏状态: ['Sunny', 'Rainy']
观测状态: ['walk', 'shop', 'clean']

初始状态概率 π:
  Sunny: 1.0000
  Rainy: 0.0000

状态转移概率矩阵 A:
         Sunny   Rainy
 Sunny   0.0000  1.0000
 Rainy   1.0000  0.0000

观测概率矩