Vierbi Algorithm

M. Amintoosi

In [1]:
import numpy as np

def viterbi(obs, states, start_p, trans_p, emit_p):
    """
    :param obs: observation sequence
    :param states: hidden state sequence
    :param start_p: initial probabilities of hidden states
    :param trans_p: transition probabilities between hidden states
    :param emit_p: emission probabilities of observed symbols given hidden states
    :return: most probable sequence of hidden states
    """
    # Initialize trellis table and backpointer table
    T = len(obs)
    N = len(states)
    trellis = np.zeros((N, T))
    backpointers = np.zeros((N, T), dtype=int)

    # Fill in first column of trellis table using initial probabilities and emission probabilities
    for i in range(N):
        trellis[i][0] = start_p[i] * emit_p[i][obs[0]]

    # Fill in remaining columns of trellis table using dynamic programming
    for t in range(1, T):
        for j in range(N):
            prob = [(trellis[i][t - 1] * trans_p[i][j] * emit_p[j][obs[t]], i) for i in range(N)]
            max_prob, max_index = max(prob)
            trellis[j][t] = max_prob
            backpointers[j][t] = max_index

    # Find the path with maximum probability using backpointers
    path = [0] * T
    path[T - 1] = np.argmax(trellis[:, T - 1])
    for t in range(T - 2, -1, -1):
        path[t] = backpointers[path[t + 1]][t + 1]

    # Map hidden states to their corresponding labels and return the most probable sequence of labels
    return [states[i] for i in path]


In [2]:
# Define observation sequence, hidden states, and model parameters
obs = [0, 1, 0, 2, 1]
states = ['A', 'B', 'C']
start_p = [0.6, 0.3, 0.1]
trans_p = [[0.7, 0.2, 0.1],
           [0.3, 0.5, 0.2],
           [0.2, 0.4, 0.4]]
emit_p = [[0.4, 0.6],
          [0.3, 0.7],
          [0.5, 0.5]]

# Apply the Viterbi algorithm to find the most probable sequence of hidden states
hidden_seq = viterbi(obs, states, start_p, trans_p, emit_p)

# Print the result
print(hidden_seq)


IndexError: list index out of range

In [None]:
T = len(obs)
N = len(states)
trellis = np.zeros((N, T))
backpointers = np.zeros((N, T), dtype=int)

# Fill in first column of trellis table using initial probabilities and emission probabilities
for i in range(N):
    trellis[i][0] = start_p[i] * emit_p[i][obs[0]]

# Fill in remaining columns of trellis table using dynamic programming
for t in range(1, T):
    for j in range(N):
        prob = [(trellis[i][t - 1] * trans_p[i][j] * emit_p[j][obs[t]], i) for i in range(N)]
        max_prob, max_index = max(prob)
        trellis[j][t] = max_prob
        backpointers[j][t] = max_index

# Find the path with maximum probability using backpointers
path = [0] * T
path[T - 1] = np.argmax(trellis[:, T - 1])
for t in range(T - 2, -1, -1):
    path[t] = backpointers[path[t + 1]][t + 1]

# Map hidden states to their corresponding labels and return the most probable sequence of labels
sol = [states[i] for i in path]


IndexError: list index out of range

In [3]:
import numpy as np


def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False):
    '''Finds the most-probable (Viterbi) path through the HMM state trellis
    Notation:
        Z[t] := Observation at time t
        Q[t] := Hidden state at time t
    Inputs:
        prior: np.array(num_hid)
            prior[i] := Pr(Q[0] == i)
        transmat: np.ndarray((num_hid,num_hid))
            transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i)
        obslik: np.ndarray((num_hid,num_obs))
            obslik[i,t] := Pr(Z[t] | Q[t] == i)
        scaled: bool
            whether or not to normalize the probability trellis along the way
            doing so prevents underflow by repeated multiplications of probabilities
        ret_loglik: bool
            whether or not to return the log-likelihood of the best path
    Outputs:
        path: np.array(num_obs)
            path[t] := Q[t]
    '''
    num_hid = obslik.shape[0] # number of hidden states
    num_obs = obslik.shape[1] # number of observations (not observation *states*)

    # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)])
    trellis_prob = np.zeros((num_hid,num_obs))
    # trellis_state[i,t] := best predecessor state given that we ended up in state i at t
    trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies
    path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies

    trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult
    if scaled:
        scale = np.ones(num_obs) # only instantiated if necessary to save memory
        scale[0] = 1.0 / np.sum(trellis_prob[:,0])
        trellis_prob[:,0] *= scale[0]

    trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor
    for t in np.arange(1, num_obs):
        for j in np.arange(num_hid):
            trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult
            trellis_state[j,t] = trans_probs.argmax()
            trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs
            trellis_prob[j,t] *= obslik[j,t]
        if scaled:
            scale[t] = 1.0 / np.sum(trellis_prob[:,t])
            trellis_prob[:,t] *= scale[t]

    path[-1] = trellis_prob[:,-1].argmax()
    for t in range(num_obs-2, -1, -1):
        path[t] = trellis_state[(path[t+1]), t+1]

    if not ret_loglik:
        return path
    else:
        if scaled:
            loglik = -np.sum(np.log(scale))
        else:
            p = trellis_prob[path[-1],-1]
            loglik = np.log(p)
        return path, loglik


if __name__=='__main__':
    # Assume there are 3 observation states, 2 hidden states, and 5 observations
    priors = np.array([0.5, 0.5])
    transmat = np.array([
        [0.75, 0.25],
        [0.32, 0.68]])
    emmat = np.array([
        [0.8, 0.1, 0.1],
        [0.1, 0.2, 0.7]])
    observations = np.array([0, 1, 2, 1, 0], dtype=int)
    obslik = np.array([emmat[:,z] for z in observations]).T
    print(viterbi_path(priors, transmat, obslik))                                #=> [0 1 1 1 0]
    # print viterbi_path(priors, transmat, obslik, scaled=False)                  #=> [0 1 1 1 0]
    # print viterbi_path(priors, transmat, obslik, ret_loglik=True)               #=> (array([0, 1, 1, 1, 0]), -7.776472586614755)
    # print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227)

[0 1 1 1 0]


In [4]:
import numpy as np

def viterbi(y, A, B, Pi=None):
    """
    Return the MAP estimate of state trajectory of Hidden Markov Model.

    Parameters
    ----------
    y : array (T,)
        Observation state sequence. int dtype.
    A : array (K, K)
        State transition matrix. See HiddenMarkovModel.state_transition  for
        details.
    B : array (K, M)
        Emission matrix. See HiddenMarkovModel.emission for details.
    Pi: optional, (K,)
        Initial state probabilities: Pi[i] is the probability x[0] == i. If
        None, uniform initial distribution is assumed (Pi[:] == 1/K).

    Returns
    -------
    x : array (T,)
        Maximum a posteriori probability estimate of hidden state trajectory,
        conditioned on observation sequence y under the model parameters A, B,
        Pi.
    T1: array (K, T)
        the probability of the most likely path so far
    T2: array (K, T)
        the x_j-1 of the most likely path so far
    """
    # Cardinality of the state space
    K = A.shape[0]
    # Initialize the priors with default (uniform dist) if not given by caller
    Pi = Pi if Pi is not None else np.full(K, 1 / K)
    T = len(y)
    T1 = np.empty((K, T), 'd')
    T2 = np.empty((K, T), 'B')

    # Initilaize the tracking tables from first observation
    T1[:, 0] = Pi * B[:, y[0]]
    T2[:, 0] = 0

    # Iterate throught the observations updating the tracking tables
    for i in range(1, T):
        T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
        T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    x = np.empty(T, 'B')
    x[-1] = np.argmax(T1[:, T - 1])
    for i in reversed(range(1, T)):
        x[i - 1] = T2[x[i], i]

    return x, T1, T2

In [9]:
Pi = np.array([1, 0])
A = np.array([
    [0.6, 0.4],
    [0.0, 1.0]])
emmat = np.array([
    [0.8, 0.2],
    [0.3, 0.7]])
y = np.array([0, 1, 1], dtype=int)
B = np.array([emmat[:,z] for z in y]).T
x, T1, T2 = viterbi(y, A, B, Pi)
print(x, '\n', T1, '\n', T2)                                #=> [0 1 1 1 0]


[0 1 1] 
 [[0.8     0.096   0.01152]
 [0.      0.224   0.1568 ]] 
 [[0 0 0]
 [0 0 1]]


In [10]:
B

array([[0.8, 0.2, 0.2],
       [0.3, 0.7, 0.7]])

In [13]:
# !pip install hidden_markov

In [14]:
# ====Initializing Parameters ====

# States
states = ('s', 't')

# list of possible observations
possible_observation = ('A','B' )

# The observations that we observe and feed to the model
obs1 = ('A', 'B','B','A')
obs2 = ('B', 'A','B')

# Number of observation sequece 1 and observation sequence 2
quantities_observations = [10, 20]

observation_tuple = []
observation_tuple.extend( [obs1,obs2] )

# Input parameters as Numpy matrices
start_probability = np.matrix( '0.5 0.5 ')
transition_probability = np.matrix('0.6 0.4 ;  0.3 0.7 ')
emission_probability = np.matrix( '0.3 0.7 ; 0.4 0.6 ' )

In [18]:
possible_observation

('A', 'B')

In [16]:
import numpy as np
from hidden_markov import hmm
test = hmm(states,possible_observation,start_probability,transition_probability,emission_probability)

In [17]:
#Forward Algorithm Results on 'obs1'
test.forward_algo(obs1)

0.051533999999999996

In [19]:
# States
states = ('s', 't')

# list of possible observations
possible_observation = ('A','B' )

# The observations that we observe and feed to the model
obs1 = ('A', 'A','B')
# obs2 = ('B', 'A','B')

# Number of observation sequece 1 and observation sequence 2
quantities_observations = [10, 20]

observation_tuple = []
observation_tuple.extend( [obs1,obs2] )

# Input parameters as Numpy matrices
start_probability = np.matrix( '1 0 ')
transition_probability = np.matrix('0.6 0.4 ;  0 1 ')
emission_probability = np.matrix( '0.8 0.2 ; 0.3 0.7 ' )

In [20]:
test = hmm(states,possible_observation,start_probability,transition_probability,emission_probability)
test.forward_algo(obs1)

0.22080000000000002

In [21]:
test.viterbi(obs1)

['s', 's', 't']