## HMM

### Model Specification & Notation

state set 
$$ S = \{ s_1, s_2, \dots, s_N \}, \quad s_i, s_j$$

obseration set 
$$ O = \{ o_1, o_2, \dots, o_M \}, \quad o_k $$

model parameter
$$ \lambda  =  (\pi, A, B) $$   
where
$$ A_{ij} = P(s^{(t)} = s_j | s^{(t-1)} = s_i)$$   
$$ B_{j}(k) = P(o = o_k | s = s_j)$$   
$$ \pi_i = P(s^{(1)} = s_i)  $$

### Baum-Welch and its "greeks"

#### 定义
$\alpha$ is complete data prob, while $\beta$ is cond prob

$$ \alpha_t(i) \overset{def}{=} P(o_1, \dots, o_t, s_t = s_i | \lambda) $$   
$$ \beta_t(i) \overset{def}{=} P(o_{t+1}, \dots, o_T, | s_t = s_i, \lambda) $$

conditional prob (conditional on total observation seq $O$):

$$ \gamma_t(i) \overset{def}{=} P(s_t = s_i | O, \lambda) $$   
$$ \xi_t(i, j) \overset{def}{=} P(s_t = s_i, s_{t+1} = s_j | O, \lambda) $$   

#### 递推公式

forward-backward formula of $\alpha, \beta$

$$ \alpha_{t+1}(i) = \big( \sum_{j=1}^N \alpha_t(j) A_{ji} \big) B_i(o_{t+1}) $$
$$ \alpha_1(i) = \pi_i B_i(o_1) $$

$$ \beta_t(i) = \sum_{j=1}^N \beta_{t+1}(j) A_{ij} B_i(o_t) $$
$$ \beta_T(i) = 1 $$

#### gamma, xi 定义 && 公式

$$\gamma_t(i) \overset{def}{=} P(s_t = i | O, \lambda) $$
$$\xi_t(i, j) \overset{def}{=} P(s_t = i, s_{t+1} = j | O, \lambda) $$

$\gamma$ and $\xi$ formula (based on alpha and beta)

$$ \gamma_t(i) = \frac{\alpha_t(i) \beta_t(i)}{\sum_{s=1}^N \alpha_t(s) \beta_t(s)} $$

$$ \xi_t(i, j) = \frac{\alpha_t(i)\ A_{ij}\ \beta_{t+1}(j) \ B_j(o_{t+1})}{\sum_{s=1}^N \sum_{q=1}^N ...} $$

![](./HMM_greeks.png)

### Viterbi and its greeks

定义两个变量： **$t$时刻、状态为$i$、观察为$o$的所有路径中，概率最大的那个路径**

- 路径的概率为 $\delta_t(i)$
- 路径 $t-1$ 时刻节点值为 $\psi_t(i)$


$$ \begin{align}
\delta_t(i)      =&  \max_{s_1, s_2, ..., s_{t-1}} P(s_1, ..., s_{t-1}, s_t = i, o_1, ..., o_t | \lambda) \\ 
\delta_{t+1}(i)  =&  \max_{j}[\delta_t(j) A_{ji}] B_i(o_{t+1}) \\
\psi_t(i)        =&  \argmax_j  \delta_{t-1}(j)A_{ji}    
\end{align}$$

Viterbi算法：

1. 从前向后计算每个节点（所有可能状态）的 $\delta_t(i)$, $\psi_t(i)$
2. 往前回溯，找出整个路径

具体的：

1. 初始化
$$ \delta_1(i) = \pi(i) B_i(o_1) $$

2. for t = 2 ~ T, 根据递推公式计算 $\delta_t(i)$, $\psi_t(i)$

3. T时刻的 $\delta$ 即为最大概率，对应的 $\argmax_j$ 即为最后一个节点的状态
$$ P^* = \max_j \delta_T(j) $$
$$ s_T^* = \argmax_j \delta_T(j) $$

4. 往前回溯
$$ s_t^* = \psi_{t+1}(s_{t+1}^*) $$

![](HMM_greeks_2.png)

### Code

In [1]:
import numpy as np

In [166]:
# Data Generating Process

class HMM(object):
    """Hidden Markov Model
    """
    
    def __init__(self, pi, A, B, T=10, observation_set=None, state_set=None):
        
        self.N = pi.shape[0]
        self.M = B.shape[1]
        assert A.shape == (self.N, self.N)
        assert B.shape == (self.N, self.M)
        
        self.pi = pi
        self.A = A
        self.B = B
        
        assert T > 0 and type(T) is int
        self.T = T
        
        if observation_set is None:
            observation_set = list('Obs' + str(i) for i in range(self.M))
        if state_set is None:
            state_set = list('State' + str(i) for i in range(self.N))    
        
        assert len(observation_set) == self.M
        assert len(state_set) == self.N
        
        self.observation_set = observation_set
        self.statet_set = state_set

    def generate_one_sample(self):
        pi, A, B, T, N, M = self.pi, self.A, self.B, self.T, self.N, self.M
        
        s = np.zeros(T).astype(int)
        o = np.zeros(T).astype(int)
        #import pdb; pdb.set_trace()
        s[0] = np.random.choice(N, p=pi)
        o[0] = np.random.choice(M, p=B[s[0],:])
        
        for t in range(1, T):
            s[t] = np.random.choice(N, p=A[s[t-1], :])
            o[t] = np.random.choice(M, p=B[s[t], :])
            
        return s, o
    
    def viterbi(self, o, lambda_ = None):
        """viterbi算法
        
        已知 O, (pi, A, B)，估计 I
        1. 初始化 delta_0(i) = pi(i)B_i(o1)
        2. 递推算出 delta_t(i), psi_t(i)
        3. 回溯 psi_t
        """
        if lambda_ is None:
            pi, A, B = self.pi, self.A, self.B
            N, M = self.N, self.M
            T = len(o)
        else:
            pass

        lattice_delta = np.full((N, T), np.nan)
        lattice_psi = np.full((N, T), np.nan)
        
        lattice_delta[:, 0] = pi * B[:, o[0]]
        
        for t in range(1, T):
            for i in range(N):
                j, prob = self._which_max(lattice_delta[:, t-1] * A[:, i] * B[i, o[t]])
                lattice_delta[i, t] = prob
                lattice_psi[i, t] = j
                
        self.lattice_delta = lattice_delta
        self.lattice_psi = lattice_psi
        
        j, prob = self._which_max(lattice_delta[:, T-1])
        
        s = np.zeros(T).astype(int)
        s[T-1] = j
        for t in range(T-1, 0, -1):
            s[t-1] = lattice_psi[s[t], t]

        return s
    
    def _which_max(self, iterable):
        idx, value = 0, -np.inf
        for i, it in enumerate(iterable):
            if it > value:
                value = it
                idx = i
        return idx, value

In [200]:
obs_set = ['红', '白']
state_set = ['box1', 'box2', 'box3', 'box4']

pi = np.array([0.25, 0.25, 0.25, 0.25])
A = np.array([[0, 1, 0, 0],
              [.4, 0, .6, 0],
              [0, .4, 0, .6],
              [0, 0, .5, .5],
             ])
B = np.array([[.5, .5],
              [.3, .7],
              [.6, .4],
              [.8, .2],
             ])

# model ======================================================

m = HMM(pi, A, B, 10, obs_set, state_set)
s, o = m.generate_one_sample()
s_hat = m.viterbi(o)

In [201]:
m.lattice_delta
m.lattice_psi

array([[nan,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.],
       [nan,  0.,  2.,  0.,  0.,  2.,  2.,  2.,  2.,  0.],
       [nan,  3.,  3.,  1.,  3.,  3.,  3.,  3.,  1.,  3.],
       [nan,  3.,  3.,  2.,  3.,  3.,  3.,  3.,  2.,  3.]])

In [202]:
s_hat

array([3, 3, 2, 3, 3, 3, 3, 2, 3, 3])

In [203]:
s

array([3, 3, 2, 3, 2, 3, 2, 1, 2, 3])

In [204]:
from hmmlearn import hmm

model = hmm.MultinomialHMM(n_components=4)
model.startprob_ = pi
model.transmat_ = A
model.emissionprob_ = B
prob, state_seq = model.decode(o.reshape(-1,1))

state_seq

array([3, 3, 2, 3, 3, 3, 3, 2, 3, 3])

In [205]:
state_seq - s_hat   # check

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])