In [1]:
import pandas as pd
import numpy as np

# A C G T
# 0 1 2 3

# CGTCAG
# 123102


V = np.array([1,2,3,1,0,2])

# Transition Probabilities
a = np.array(((0.8, 0.2), (0.2, 0.8)))

# Emission Probabilities
b = np.array(((0.4, 0.1, 0.4, 0.1), (0.1, 0.4, 0.1, 0.4)))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))


def forward(V, a, b, initial_distribution):
    alpha = np.zeros((V.shape[0], a.shape[0]))
    c = np.zeros((V.shape[0], a.shape[0]))
    
    alpha[0, :] = initial_distribution * b[:, V[0]]
    c[0]=np.sum(alpha[0])
#     alpha[0]=alpha[0]/c[0]
    for t in range(1, V.shape[0]):
        for j in range(a.shape[0]):
            # Matrix Computation Steps
            #                  ((1x2) . (1x2))      *     (1)
            #                        (1)            *     (1)
            alpha[t, j] = alpha[t - 1].dot(a[:, j]) * b[j, V[t]]
        c[t]=np.sum(alpha[t])
        alpha[t]=alpha[t]/c[t]
    return alpha,c
alpha,ca=forward(V, a, b, initial_distribution)
p_obs=np.prod(ca[1:])

print('forward probabilities:\n',alpha,'\n')
print('P(o1,o2,..o6|π0)=',p_obs)


forward probabilities:
 [[0.05       0.2       ]
 [0.65306122 0.34693878]
 [0.26605505 0.73394495]
 [0.12311558 0.87688442]
 [0.60137931 0.39862069]
 [0.83628137 0.16371863]] 

P(o1,o2,..o6|π0)= 2.4206381056000022e-08


In [2]:
V = np.array([1,2,3,1,0,2])

# V = np.array([1,0,0,0,0,0,1])

# Transition Probabilities
a = np.array(((0.8, 0.2), (0.2, 0.8)))

# Emission Probabilities
b = np.array(((0.4, 0.1, 0.4, 0.1), (0.1, 0.4, 0.1, 0.4)))
# b = np.array(((0.8, 0.2), (0.1, 0.9)))
# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))

def backward(V, a, b, ca):
    beta = np.zeros((V.shape[0], a.shape[0]))
    
    # setting beta(T) = 1
    beta[V.shape[0] - 1] = np.ones((a.shape[0]))
    
    # Loop in backward way from T-1 to
    # Due to python indexing the actual loop will be T-2 to 0
    for t in range(V.shape[0] - 2, -1, -1):
        for j in range(a.shape[0]):
            beta[t, j] = (beta[t + 1] * b[:, V[t + 1]]).dot(a[j, :])
        beta[t]=beta[t]/ca[t+1]
    return beta
 
 
beta = backward(V, a, b,ca)
prob_S1 = beta[0,0]*alpha[0,0]
prob_S2 = beta[0,1]*alpha[0,1]
print('backward probabilities:\n',beta,'\n')
print('P(S1|o1,o2,..o6,π0)=',prob_S1)
print('P(S2|o1,o2,..o6,π0)=',prob_S2)

backward probabilities:
 [[5.12546277 3.71863431]
 [0.68531469 1.59234883]
 [0.85191279 1.05368161]
 [2.29206088 0.81859317]
 [1.26748252 0.59646236]
 [1.         1.        ]] 

P(S1|o1,o2,..o6,π0)= 0.25627313862607987
P(S2|o1,o2,..o6,π0)= 0.7437268613739204


In [3]:
print(alpha)
print()
print(beta)
print()
gama=np.multiply(alpha, beta)
print(gama)


[[0.05       0.2       ]
 [0.65306122 0.34693878]
 [0.26605505 0.73394495]
 [0.12311558 0.87688442]
 [0.60137931 0.39862069]
 [0.83628137 0.16371863]]

[[5.12546277 3.71863431]
 [0.68531469 1.59234883]
 [0.85191279 1.05368161]
 [2.29206088 0.81859317]
 [1.26748252 0.59646236]
 [1.         1.        ]]

[[0.25627314 0.74372686]
 [0.44755245 0.55244755]
 [0.2266557  0.7733443 ]
 [0.2821884  0.7178116 ]
 [0.76223776 0.23776224]
 [0.83628137 0.16371863]]


In [38]:
states = ('S1', 'S2')
end_state = 'E'
 
observations = ('C','G','T','C','A','G')
 
start_probability = {'S1': 0.5, 'S2': 0.5}
 
transition_probability = {
   'S1' : {'S1': 0.8, 'S2': 0.2, 'E': 0.01},
   'S2' : {'S1': 0.2, 'S2': 0.8, 'E': 0.01},
   }
 
emission_probability = {
      
   'S1' : {'A': 0.4, 'C': 0.1, 'G': 0.4, 'T': 0.1},
   'S2' : {'A': 0.1, 'C': 0.4, 'G': 0.1, 'T': 0.4},
   }

def fwd_bkw(observations, states, start_prob, trans_prob, emm_prob, end_st):
    """Forward–backward algorithm."""
    # Forward part of the algorithm
    fwd = []
    f_prev = {}
    for i, observation_i in enumerate(observations):
        f_curr = {}
        for st in states:
            if i == 0:
                # base case for the forward part
                prev_f_sum = start_prob[st]
            else:
                prev_f_sum = sum(f_prev[k]*trans_prob[k][st] for k in states)

            f_curr[st] = emm_prob[st][observation_i] * prev_f_sum

        fwd.append(f_curr)
        f_prev = f_curr

    p_fwd = sum(f_curr[k] * trans_prob[k][end_st] for k in states)

    # Backward part of the algorithm
    bkw = []
    b_prev = {}
    for i, observation_i_plus in enumerate(reversed(observations[1:]+(None,))):
        b_curr = {}
        for st in states:
            if i == 0:
                # base case for backward part
                b_curr[st] = trans_prob[st][end_st]
            else:
                b_curr[st] = sum(trans_prob[st][l] * emm_prob[l][observation_i_plus] * b_prev[l] for l in states)

        bkw.insert(0,b_curr)
        b_prev = b_curr

    p_bkw = sum(start_prob[l] * emm_prob[l][observations[0]] * b_curr[l] for l in states)

    # Merging the two parts
    posterior = []
    for i in range(len(observations)):
        posterior.append({st: fwd[i][st] * bkw[i][st] / p_fwd for st in states})

    assert p_fwd == p_bkw
    return fwd, bkw, posterior

def example():
    return fwd_bkw(observations,
                   states,
                   start_probability,
                   transition_probability,
                   emission_probability,
                   end_state)

for line in example():
    print(*line)


{'S2': 0.2, 'S1': 0.05} {'S2': 0.017000000000000005, 'S1': 0.03200000000000001} {'S2': 0.008000000000000004, 'S1': 0.002900000000000001} {'S2': 0.0027920000000000015, 'S1': 0.0003920000000000002} {'S2': 0.00023120000000000017, 'S1': 0.00034880000000000024} {'S2': 2.5472000000000023e-05, 'S1': 0.0001301120000000001}
{'S2': 5.785600000000004e-06, 'S1': 7.974400000000008e-06} {'S2': 5.056000000000003e-05, 'S1': 2.1760000000000015e-05} {'S2': 0.00015040000000000008, 'S1': 0.00012160000000000007} {'S2': 0.0004000000000000002, 'S1': 0.0011200000000000003} {'S2': 0.0016000000000000003, 'S1': 0.0034000000000000007} {'S2': 0.01, 'S1': 0.01}
{'S2': 0.7437268613739201, 'S1': 0.25627313862607987} {'S2': 0.5524475524475525, 'S1': 0.4475524475524476} {'S2': 0.7733443027560676, 'S1': 0.22665569724393256} {'S2': 0.7178116001645415, 'S1': 0.2821883998354587} {'S2': 0.2377622377622378, 'S1': 0.7622377622377623} {'S2': 0.16371863430686964, 'S1': 0.8362813656931304}


In [41]:
import pandas as pd
import numpy as np
 
 
def forward(V, a, b, initial_distribution):
    alpha = np.zeros((V.shape[0], a.shape[0]))
    alpha[0, :] = initial_distribution * b[:, V[0]]
 
    for t in range(1, V.shape[0]):
        for j in range(a.shape[0]):
            # Matrix Computation Steps
            #                  ((1x2) . (1x2))      *     (1)
            #                        (1)            *     (1)
            alpha[t, j] = alpha[t - 1].dot(a[:, j]) * b[j, V[t]]
 
    return alpha
 
 
def backward(V, a, b):
    beta = np.zeros((V.shape[0], a.shape[0]))
 
    # setting beta(T) = 1
    beta[V.shape[0] - 1] = np.ones((a.shape[0]))
 
    # Loop in backward way from T-1 to
    # Due to python indexing the actual loop will be T-2 to 0
    for t in range(V.shape[0] - 2, -1, -1):
        for j in range(a.shape[0]):
            beta[t, j] = (beta[t + 1] * b[:, V[t + 1]]).dot(a[j, :])
 
    return beta
 
 
 
def viterbi(V, a, b, initial_distribution):
    T = V.shape[0]
    M = a.shape[0]
 
    omega = np.zeros((T, M))
    omega[0, :] = np.log(initial_distribution * b[:, V[0]])
 
    prev = np.zeros((T - 1, M))
 
    for t in range(1, T):
        for j in range(M):
            # Same as Forward Probability
            probability = omega[t - 1] + np.log(a[:, j]) + np.log(b[j, V[t]])
 
            # This is our most probable state given previous state at time t (1)
            prev[t - 1, j] = np.argmax(probability)
 
            # This is the probability of the most probable state (2)
            omega[t, j] = np.max(probability)
 
    # Path Array
    S = np.zeros(T)
 
    # Find the most probable last hidden state
    last_state = np.argmax(omega[T - 1, :])
 
    S[0] = last_state
 
    backtrack_index = 1
    for i in range(T - 2, -1, -1):
        S[backtrack_index] = prev[i, int(last_state)]
        last_state = prev[i, int(last_state)]
        backtrack_index += 1
 
    # Flip the path array since we were backtracking
    S = np.flip(S, axis=0)
 
    # Convert numeric values to actual hidden states
    result = []
    for s in S:
        if s == 0:
            result.append("S1")
        else:
            result.append("S2")
 
    return result, prev, omega
 
  
V = np.array([1,2,3,1,0,2])

# Transition Probabilities
a = np.array(((0.8, 0.2), (0.2, 0.8)))

# Emission Probabilities
b = np.array(((0.4, 0.1, 0.4, 0.1), (0.1, 0.4, 0.1, 0.4)))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))

print(viterbi(V, a, b, initial_distribution))

(['S2', 'S2', 'S2', 'S2', 'S1', 'S1'], array([[0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 1.],
       [0., 1.]]), array([[ -2.99573227,  -1.60943791],
       [ -4.13516656,  -4.13516656],
       [ -6.6608952 ,  -5.27460084],
       [ -9.18662385,  -6.41403512],
       [ -8.93976377,  -8.93976377],
       [-10.07919805, -11.46549241]]))


In [37]:
def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    for st in states:
        V[0][st] = {"prob": start_p[st] * emit_p[st][obs[0]], "prev": None}
    # Run Viterbi when t > 0
    for t in range(1, len(obs)):
        V.append({})
        for st in states:
            max_tr_prob = V[t-1][states[0]]["prob"]*trans_p[states[0]][st]
            prev_st_selected = states[0]
            for prev_st in states[1:]:
                tr_prob = V[t-1][prev_st]["prob"]*trans_p[prev_st][st]
                if tr_prob > max_tr_prob:
                    max_tr_prob = tr_prob
                    prev_st_selected = prev_st
                    
            max_prob = max_tr_prob * emit_p[st][obs[t]]
            V[t][st] = {"prob": max_prob, "prev": prev_st_selected}
                    
    for line in dptable(V):
        print(line)
    
    opt = []
    max_prob = 0.0
    previous = None
    # Get most probable state and its backtrack
    for st, data in V[-1].items():
        if data["prob"] > max_prob:
            max_prob = data["prob"]
            best_st = st
    opt.append(best_st)
    previous = best_st
    
    # Follow the backtrack till the first observation
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]

    print ('The steps of states are ' + ' '.join(opt) + ' with highest probability of %s' % max_prob)

def dptable(V):
    # Print a table of steps from dictionary
    yield " ".join(("%12d" % i) for i in range(len(V)))
    for state in V[0]:
        yield "%.7s: " % state + " ".join("%.7s" % ("%f" % v[state]["prob"]) for v in V)
    
states = ('S1', 'S2')
 
observations = ('C','G','T','C','A','G')
 
start_probability = {'S1': 0.5, 'S2': 0.5}
 
transition_probability = {
   'S1' : {'S1': 0.8, 'S2': 0.2},
   'S2' : {'S1': 0.2, 'S2': 0.8},
   }
 
emission_probability = {
     
   'S1' : {'A': 0.4, 'C': 0.1, 'G': 0.4, 'T': 0.1},
   'S2' : {'A': 0.1, 'C': 0.4, 'G': 0.1, 'T': 0.4},
   }    
    
    
viterbi(observations, states, start_probability,
        transition_probability, emission_probability)

           0            1            2            3            4            5
S2: 0.20000 0.01600 0.00512 0.00163 0.00013 0.00001
S1: 0.05000 0.01600 0.00128 0.00010 0.00013 0.00004
The steps of states are S2 S2 S2 S2 S1 S1 with highest probability of 4.1943040000000025e-05
