In [1]:
#Viterbi algorithm example
obs = ('normal', 'cold', 'dizzy')
states = ('Healthy', 'Fever')
start_p = {'Healthy': 0.6, 'Fever':0.4}
trans_p = {
    'Healthy': {'Healthy':0.7, 'Fever':0.3},
    'Fever' : {'Healthy':0.4, 'Fever':0.6}
}
emit_p = {
    'Healthy': {'normal':0.5, 'cold': 0.4, 'dizzy':0.1},
    'Fever':{'normal':0.1, 'cold':0.3, 'dizzy':0.6}
}

In [3]:
'''
start_probability = doctors belief about which state the HMM is in 
                    when the patient first visits (all he knows is that
                    the patient tends to be healthy)
Particular prob distn used here isnt the eqlbm one
transition_probability = change of the halth condition in the underlying MC
emission_probability = how likely the patient is to feel on each day
'''

'\nstart_probability = doctors belief about which state the HMM is in \n                    when the patient first visits (all he knows is that\n                    the patient tends to be healthy)\nParticular prob distn used here isnt the eqlbm one\ntransition_probability = change of the halth condition in the underlying MC\nemission_probability = how likely the patient is to feel on each day\n'

In [4]:
'''
The patient visits three days in a row and the doctror discovers that 
on the 1st day he feels normal,
on the 2nd day he feels cold,
on the 3rd day he feels dizzy.
the doctor has a question: what is the most likely sequence of health
conditions of the patient that would explain these observations?'''

'\nThe patient visits three days in a row and the doctror discovers that \non the 1st day he feels normal,\non the 2nd day he feels cold,\non the 3rd day he feels dizzy.\nthe doctor has a question: what is the most likely sequence of health\nconditions of the patient that would explain these observations?'

In [10]:
def viterbi(obs, states, start_p, trans_p, emit_p):
    V=[{}]
    for st in states:
        V[0][st] = {"prob":start_p[st]*emit_p[st][obs[0]], "prev":None}
    #Run viterbi when t>0
    for t in range(1, len(obs)):
        V.append({})
        for st in states:
            max_tr_prob = max(V[t-1][prev_st]["prob"]*trans_p[prev_st][st] for prev_st in states)
            for prev_st in states:
                if V[t-1][prev_st]["prob"]*trans_p[prev_st][st]==max_tr_prob:
                    max_prob = max_tr_prob*emit_p[st][obs[t]]
                    V[t][st] = {"prob":max_prob, "prev":prev_st}
                    break
    for line in dptable(V):
        print(line)
    opt = []
    #The highest probability
    max_prob = max(value['prob'] for value in V[-1].values())
    previous = None
    #Get most probable state and its backtrack
    for st,data in V[-1].items():
        if data["prob"]== max_prob:
            opt.append(st)
            previous = st
            break
    #Follow the backtrack till the 1st observation
    for t in range(len(V)-2,-1,-1):
        opt.insert(0,V[t+1][previous]['prev'])
        previous = V[t+1][previous]['prev']

    print("The steps of states are " + ' '.join(opt) + ' with highest probability of %s' % max_prob)

def dptable(V):
    #Print a table of steps from dictionary
    yield " ".join(("%12d" % i) for i in range(len(V)))
    for state in V[0]:
        yield "%.7s: " % state + " ".join("%.7s" % ("%f" % v[state]["prob"]) for v in V)

In [11]:
viterbi(obs,
       states,
       start_p,
       trans_p,
       emit_p)

           0            1            2
Healthy: 0.30000 0.08400 0.00588
Fever: 0.04000 0.02700 0.01512
The steps of states are Healthy Healthy Fever with highest probability of 0.01512


In [12]:
#Forward-backward algo example
obs = ('normal', 'cold', 'dizzy')

states = ('Healthy', 'Fever')
end_state = 'E'

start_p = {'Healthy': 0.6, 'Fever':0.4}
trans_p = {
    'Healthy': {'Healthy':0.69, 'Fever':0.3, 'E':0.01},
    'Fever' : {'Healthy':0.4, 'Fever':0.59, 'E':0.01}
}
emit_p = {
    'Healthy': {'normal':0.5, 'cold': 0.4, 'dizzy':0.1},
    'Fever':{'normal':0.1, 'cold':0.3, 'dizzy':0.6}
}

In [22]:
def fwd_bkw(obs, states, start_p, trans_p, emit_p, end_st):
    #forward part of the algo
    fwd = []
    f_prev= {}
    for i, obs_i in enumerate(obs):
        f_curr ={}
        for st in states:
            if i == 0:
                #base case for the forward part
                prev_f_sum = start_p[st]
            else:
                prev_f_sum = sum(f_prev[k]*trans_p[k][st] for k in states)
            f_curr[st] = emit_p[st][obs_i]*prev_f_sum
        fwd.append(f_curr)
        f_prev = f_curr
    p_fwd = sum(f_curr[k]*trans_p[k][end_st] for k in states)
    
    #backward part of the algo
    bkw = []
    b_prev = {}
    for i, obs_i_plus in enumerate(reversed(obs[1:]+(None,))):
        b_curr = {}
        for st in states:
            if i == 0:
                #base case for backward part
                b_curr[st] = trans_p[st][end_st]
            else:
                b_curr[st] = sum(trans_p[st][l]*emit_p[l][obs_i_plus]*b_prev[l] for l in states)
            
        bkw.insert(0, b_curr)
        b_prev = b_curr
    
    p_bkw = sum(start_p[l]*emit_p[l][obs[0]]*b_curr[l] for l in states)
    
    #merging the two parts
    posterior = []
    for i in range(len(obs)):
        posterior.append({st:fwd[i][st]*bkw[i][st]/p_fwd for st in states})
    
    assert p_fwd == p_bkw
    
    return fwd, bkw, posterior

In [23]:
def example():
    return fwd_bkw(obs,
                  states,
                  start_p,
                  trans_p,
                  emit_p,
                  end_state)

In [24]:
for line in example():
    print(*line)

{'Healthy': 0.3, 'Fever': 0.04000000000000001} {'Healthy': 0.0892, 'Fever': 0.03408} {'Healthy': 0.007518, 'Fever': 0.028120319999999997}
{'Healthy': 0.0010418399999999998, 'Fever': 0.00109578} {'Healthy': 0.00249, 'Fever': 0.00394} {'Healthy': 0.01, 'Fever': 0.01}
{'Healthy': 0.8770110375573259, 'Fever': 0.1229889624426741} {'Healthy': 0.623228030950954, 'Fever': 0.3767719690490461} {'Healthy': 0.2109527048413057, 'Fever': 0.7890472951586943}
