In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (11, 5)  #set default figure size
import quantecon as qe
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
from sklearn.preprocessing import normalize

In [185]:
def hmc_sample_path(A, B, ψ_0=None, sample_size=1_000):

    # set up
    A = np.asarray(A)
    B = np.asarray(B)
    X = np.empty(sample_size, dtype=int)
    Y = np.empty(sample_size, dtype=int)
    # Convert each row of P into a cdf
    m = len(A)
    n = len(B)
    A_dist = [np.cumsum(A[i, :]) for i in range(m)]
    B_dist = [np.cumsum(B[i, :]) for i in range(n)]

    # draw initial state, defaulting to 0
    if ψ_0 is not None:
        ψ_0 = np.asarray(ψ_0)
        X_0 = qe.random.draw(np.cumsum(ψ_0))
    else:
        X_0 = 0

    # simulate
    X[0] = X_0
    Y[0] = qe.random.draw(B_dist[X_0])
    for t in range(sample_size - 1):
        X[t+1] = qe.random.draw(A_dist[X[t]])
        Y[t+1] = qe.random.draw(B_dist[X[t+1]])

    return X, Y

In [235]:
A = [[0.5, 0.5],
    [0.5, 0.5]]
B = [[0.4, 0.6],
     [0.5, 0.5]]
ψ_0 = np.array([0.5, 0.5])

In [269]:
obs = hmc_sample_path(A, B, sample_size=10)[1]

In [270]:
obs

array([1, 0, 1, 1, 0, 0, 1, 1, 1, 0])

## Forward algorithm

In [271]:
def forward(obs, A, B, ψ_0):
    A = np.asarray(A)
    B = np.asarray(B)
    ψ_0 = np.asarray(ψ_0)
    num_states = len(A)
    α = np.empty((num_states, len(obs)))
    α[:,0] = ψ_0 * B[:, obs[0]]
    for t in range(len(obs)-1):
        for s in range(num_states):
            α[s, t+1] = α[:, t].dot(A[:, s]) * B[s, obs[t+1]]
    return α

In [272]:
a = forward(obs, A, B, ψ_0)

In [273]:
a


array([[0.40331794, 0.11265126, 0.08410719, 0.06094838, 0.01686923,
        0.00484123, 0.00374088, 0.00276586, 0.00199246, 0.00054941],
       [0.23923633, 0.10959504, 0.05734704, 0.03089605, 0.01441591,
        0.00638394, 0.00327972, 0.00172921, 0.00093973, 0.00044258]])

In [274]:
a.sum(axis=0)

array([0.64255426, 0.2222463 , 0.14145423, 0.09184443, 0.03128514,
       0.01122517, 0.0070206 , 0.00449507, 0.00293219, 0.00099199])

## Viterbi Algorithm

In [275]:
def viterbi(obs, A, B, ψ_0):
    A = np.asarray(A)
    B = np.asarray(B)
    ψ_0 = np.asarray(ψ_0)
    num_per = len(obs)
    num_states = len(A)
    v = np.empty((num_states, len(obs)))
    bt = np.empty((num_states, len(obs)))
    v[:,0] = ψ_0 * B[:, obs[0]]
    bt[:,0] = 0
    for t in range(num_per-1):
        for s in range(num_states):
            rhs = np.multiply(A[:,s], v[:, t]) * B[s, obs[t+1]]
            v[s, t+1] = np.max(rhs)
            bt[s, t+1] = np.argmax(rhs)
    bestpath = np.zeros(num_per, dtype=int)
    
    bestpath[0] = int(np.argmax(v[:, num_per-1]))
    
    for t in range(num_per-1):
        bestpath[t+1] = bt[bestpath[t], num_per-t-1]
    
    bestpath = np.flip(bestpath, axis=0)
    return v, bestpath, bt

In [276]:
v, b, bt = viterbi(obs, A, B, ψ_0)

In [277]:
v

array([[4.03317937e-01, 1.05627206e-01, 7.11075463e-02, 4.78691365e-02,
        1.25367177e-02, 3.28331161e-03, 2.21030397e-03, 1.48796222e-03,
        1.00168647e-03, 2.62337309e-04],
       [2.39236327e-01, 9.76569308e-02, 4.75557283e-02, 2.31580829e-02,
        9.45319355e-03, 3.85881977e-03, 1.87911891e-03, 9.15069392e-04,
        4.45608838e-04, 1.81898761e-04]])

In [278]:
b

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [279]:
bt

array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 1., 1., 1., 1., 1., 1., 1., 1., 1.]])

## backwards algorithm

In [280]:
def backward(obs, A, B, ψ_0):
    A = np.asarray(A)
    B = np.asarray(B)
    ψ_0 = np.asarray(ψ_0)
    num_states = len(A)
    β = np.empty((num_states, len(obs)))
    β[:,0] = 1
    for t in range(len(obs)-1):
        for s in range(num_states):
            β[s, t+1] = β[:, t].dot(A[:, s]) * B[s, obs[t+1]]
    return np.flip( β, axis=1) 

In [281]:
b= backward(obs, A, B, ψ_0)

## Making a HMM class

In [292]:
class HiddenMarkovModel:
    
    def __init__(self, obs):
        
        self.obs = obs
        
    def forward(self, A, B, ψ_0):
        
        obs = self.obs
        
        A = np.asarray(A)
        B = np.asarray(B)
        ψ_0 = np.asarray(ψ_0)
        num_states = len(A)
        α = np.empty((num_states, len(obs)))
        α[:,0] = ψ_0 * B[:, obs[0]]
        for t in range(len(obs)-1):
            for s in range(num_states):
                α[s, t+1] = α[:, t].dot(A[:, s]) * B[s, obs[t+1]]
        return α
    def backward(self, A, B, ψ_0):
        
        obs = self.obs

        A = np.asarray(A)
        B = np.asarray(B)
        ψ_0 = np.asarray(ψ_0)
        num_states = len(A)
        β = np.empty((num_states, len(obs)))
        β[:,0] = 1
        for t in range(len(obs)-1):
            for s in range(num_states):
                β[s, t+1] = β[:, t].dot(A[:, s]) * B[s, obs[t+1]]
        return β
    def ϵ(self, α, β, A, B):
        obs = self.obs
        c = α
        num_states = len(A)
        ϵ = np.empty((len(obs), num_states, num_states))
        for t in range(len(obs)-1):
            denominator = c.sum(axis=0)[t]
            #print(denominator)
            for i in range(num_states):
                for j in range(num_states):
                    numerator = α[i, t]*A[i,j]*B[j, obs[t+1]]*β[j, t+1]
                    #print(numerator)
                    ϵ[t, i, j] = numerator/denominator
        return ϵ
    
    def γ(self, α, β):
        c = α
        return (α*β)/c.sum(axis=0)
    
    def A_new(self, α, β, A, B):
        ϵ = self.ϵ(α, β, A, B)
        z = ϵ.sum(axis=0)
        
        return z/z.sum(axis=1)
        
    
    def B_new(self, α, β, obs, B):
        
        γ = self.γ(α, β)
        b_demons = γ.sum(axis=1)
        
        for emission in range(B.shape[1]):
    
            γ_state = γ[:, np.where(obs==emission)[0]].sum(axis=1)
            B[:,emission] = γ_state/b_demons
        return B

    def forwardsbackwards(self, 
                          num_states, 
                          num_obs_types,
                          tol=1e-4, 
                          max_iter=1000
                         ):
        obs = self.obs
        
        A = np.random.rand(num_states, num_states)
        A = normalize(A, axis=1, norm='l1')

        B = np.random.rand(num_states, num_obs_types)
        B = normalize(B, axis=1, norm='l1')
        
        ψ_0 = np.random.rand(num_states)
        ψ_0 = ψ_0/ψ_0.sum()
        print(ψ_0)
        
        i = 0
        error = tol + 1
        
        while i < max_iter and error > tol:

            α = self.forward(A, B, ψ_0)
            β = self.backward(A, B, ψ_0)

            A_new = self.A_new(α, β, A, B)
            
            error = np.max(np.abs( A_new  -  A ))
            i += 1
            
            B_new = self.B_new(α, β, obs, B)
            
            A = A_new
            B = B_new
        return A, B

    def viterbi(self, A, B, ψ_0):
        
        obs = self.obs
        A = np.asarray(A)
        B = np.asarray(B)
        ψ_0 = np.asarray(ψ_0)
        num_per = len(obs)
        num_states = len(A)
        v = np.empty((num_states, len(obs)))
        bt = np.empty((num_states, len(obs)))
        v[:,0] = ψ_0 * B[:, obs[0]]
        bt[:,0] = 0
        for t in range(num_per-1):
            for s in range(num_states):
                rhs = np.multiply(A[:,s], v[:, t]) * B[s, obs[t+1]]
                v[s, t+1] = np.max(rhs)
                bt[s, t+1] = np.argmax(rhs)
        bestpath = np.zeros(num_per, dtype=int)

        bestpath[0] = int(np.argmax(v[:, num_per-1]))

        for t in range(num_per-1):
            bestpath[t+1] = bt[bestpath[t], num_per-t-1]

        bestpath = np.flip(bestpath, axis=0)
        return v, bestpath, bt

In [293]:
hmm = HiddenMarkovModel(obs)

In [297]:
hmm.forwardsbackwards(2, 2)

[0.61314267 0.38685733]


(array([[1.00000000e+00, 3.35010419e-13],
        [7.89015360e-17, 1.30607888e-37]]),
 array([[1.15069507e-07, 9.99999885e-01],
        [6.37778920e-84, 1.00000000e+00]]))

In [289]:
num_states = 2
num_obs_types = 2
A = np.random.rand(num_states, num_states)
A = normalize(A, axis=1, norm='l1')

B = np.random.rand(num_states, num_obs_types)
B = normalize(B, axis=1, norm='l1')

ψ_0 = np.random.rand(num_states)
ψ_0 = ψ_0/ψ_0.sum()

In [290]:
α = hmm.forward(A, B, ψ_0)
β = hmm.backward(A, B, ψ_0)

In [291]:
α

array([[7.63996369e-02, 2.33391856e-01, 3.59668897e-02, 8.40341371e-03,
        1.21960264e-02, 9.64758205e-03, 1.38786241e-03, 3.12113840e-04,
        8.49948127e-05, 1.28509141e-04],
       [3.30279626e-01, 4.72327747e-02, 3.49672229e-02, 1.15722598e-02,
        2.00107936e-03, 9.95396767e-04, 1.23168531e-03, 4.20277249e-04,
        1.26515284e-04, 2.14614013e-05]])

In [254]:
γ = hmm.γ(α, β)

  return c/c.sum(axis=0)


In [229]:
b_demons = γ.sum(axis=1)
        
for emission in range(num_obs_types):

    γ_state = γ[:, np.where(obs==emission)[0]].sum(axis=1)
    B[:,emission] = γ_state/b_demons

In [230]:

B.shape[1]

2

In [231]:
tol=1e-4
max_iter=1000
i = 0
error = tol + 1

In [232]:
while i < max_iter and error > tol:


    α = hmm.forward(A, B, ψ_0)
    β = hmm.backward(A, B, ψ_0)

    A_new = hmm.A_new(α, β, A, B)

    error = np.max(np.abs( A_new  -  A ))
    i += 1

    B_new = hmm.B_new(α, β, obs, B)

    A = A_new
    B = B_new
    print(A)
    print(B)

[[0.82718037 0.3900808 ]
 [0.35336539 0.2023993 ]]
[[0.25939413 0.74060587]
 [0.89540226 0.10459774]]
[[0.85177504 0.90580099]
 [0.13185531 0.19423375]]
[[0.21085434 0.78914566]
 [0.98833529 0.01166471]]
[[0.54350349 9.51010653]
 [0.01516482 0.68407404]]
[[1.27846260e-01 8.72153740e-01]
 [9.99464324e-01 5.35676270e-04]]
[[5.78846901e-03 5.46888621e+01]
 [5.84673502e-06 9.99678387e-01]]
[[1.01142587e-01 8.98857413e-01]
 [9.99483005e-01 5.16994748e-04]]
[[5.01373704e-10 7.08211775e+01]
 [3.58253889e-12 1.00000000e+00]]
[[0.91341316 0.08658684]
 [0.36251778 0.63748222]]
[[3.15991610e-22 2.22960318e+02]
 [8.89398329e-26 1.00000000e+00]]
[[1.00000000e+00 1.45544747e-24]
 [3.35097387e-01 6.64902613e-01]]
[[1.78128164e-47 1.62243742e+03]
 [1.06430811e-52 1.00000000e+00]]
[[1.00000000e+00 2.32112932e-97]
 [3.34828926e-01 6.65171074e-01]]
[[1.07452947e-099 4.41098997e+004]
 [1.32575317e-106 1.00000000e+000]]
[[1.00000000e+000 8.44674520e-297]
 [3.34827755e-001 6.65172245e-001]]
[[5.29312110e-20

  c = α*β
  numerator = α[i, t]*A[i,j]*B[j, obs[t+1]]*β[j, t+1]
  c = α*β
  return c/c.sum(axis=0)
