In [1]:
def zero_matrix(nrow, ncol):
    M = []
    while len(M) < nrow:
        M.append([]) # create a new row
        while len(M[-1]) < ncol:
            M[-1].append(0.0)
    return M

In [2]:
def zero_array_3d(time, row, col):
    M = zero_matrix(row, col)
    M_3d = [M]
    while len(M_3d) < time:
        M_3d.append(M)
    return M_3d

In [3]:
# HMM2 - Decoding
from math import log

In [4]:
with open('hmm4_01.in') as inp:
    A_list = list(map(float, inp.readline().split()))
    B_list = list(map(float, inp.readline().split()))
    pi_list = list(map(float, inp.readline().split()))
    obs_list = list(map(int, inp.readline().split()))
    print(A_list, type(A_list))
    print(B_list)
    print(pi_list)
    print(obs_list[0])

[4.0, 4.0, 0.4, 0.2, 0.2, 0.2, 0.2, 0.4, 0.2, 0.2, 0.2, 0.2, 0.4, 0.2, 0.2, 0.2, 0.2, 0.4] <class 'list'>
[4.0, 4.0, 0.4, 0.2, 0.2, 0.2, 0.2, 0.4, 0.2, 0.2, 0.2, 0.2, 0.4, 0.2, 0.2, 0.2, 0.2, 0.4]
[1.0, 4.0, 0.241896, 0.266086, 0.249153, 0.242864]
1000


In [5]:
with open('hmm4_01.ans') as ans:
    ans_list = ans.readline().split()
print(ans_list)

['4', '4', '0.545455', '0.454545', '0.0', '0.0', '0.0', '0.506173', '0.493827', '0.0', '0.0', '0.0', '0.504132', '0.495868', '0.478088', '0.0', '0.0', '0.521912']


In [6]:
N = int(A_list[0])
K = int(B_list[1])
T = obs_list[0]
print(T, type(T))
print(K)

1000 <class 'int'>
4


In [21]:
# initial lambda

A = zero_matrix(N, N)
B = zero_matrix(N, K)
pi = pi_list[2:]
# pi = [pi_list[2: ]]

obs = obs_list[1:]

In [22]:
print(len(pi))

4


In [8]:
for i in range(N):
    for j in range(N):
        A[i][j] = A_list[2 + N*i + j]
    for j in range(K):
        B[i][j] = B_list[2 + K*i + j]

In [9]:
print(B)
print(A)

[[0.4, 0.2, 0.2, 0.2], [0.2, 0.4, 0.2, 0.2], [0.2, 0.2, 0.4, 0.2], [0.2, 0.2, 0.2, 0.4]]
[[0.4, 0.2, 0.2, 0.2], [0.2, 0.4, 0.2, 0.2], [0.2, 0.2, 0.4, 0.2], [0.2, 0.2, 0.2, 0.4]]


In [10]:
# alpha-pass: ouput alphas as a T * N matrix, and coefficients c_t
def alpha_pass_scaled(A, B, pi, obs):
    # by log
    T = len(obs)
    N = len(A)
    alphas = zero_matrix(T, N)
    c = [0.0] * T
    alpha_tilde = [pi[i] * B[i][obs[0]] for i in range(N)]
    c[0] = 1.0 / sum(alpha_tilde)
    alphas[0] = [c[0] * alpha for alpha in alpha_tilde]
    for t in range(1, T):
        alpha_tilde = [sum(alphas[t-1][j] * A[j][i] * B[i][obs[t]] for j in range(N)) for i in range(N)]
        c[t] = 1.0 / sum(alpha_tilde)
        alphas[t] = [c[t] * alpha for alpha in alpha_tilde]
    return alphas, c

In [11]:
alphas, c = alpha_pass_scaled(A, B, pi, obs)
print(alphas[T-1])

[0.43334919514301057, 0.18667402956642004, 0.18668889861131918, 0.19328787667925015]


In [12]:
# beta-pass
def beta_pass_scaled(A, B, obs, c):
    T = len(obs)
    N = len(A)
    betas = zero_matrix(T, N)
    betas[T-1] = [c[T-1]] * N
    for t in range(T-2, -1, -1):
        betas[t] = [sum(betas[t+1][j] * B[j][obs[t+1]] * A[i][j] for j in range(N)) * c[t] for i in range(N) ]
    return betas

In [13]:
betas = beta_pass_scaled(A, B, obs, c)
print(betas[T-1])

[3.916627012142473, 3.916627012142473, 3.916627012142473, 3.916627012142473]


In [None]:
def learning_iter(A, B, pi, obs):
    N = len(A)
    K = len(B[0])
    T = len(obs)
    # calculate alpha, beta, di_gamma, gamma
    alphas, c = alpha_pass_scaled(A, B, pi, obs)
    betas = beta_pass_scaled(A, B, obs, c)
    # di_gamma and gamma are computed at each time
    # sum of di_gamma and gamma over time 0:T-1
    sum_di_gamma = zero_matrix(N, N)
    sum_gamma = [0.0] * N
    sum_O_k_count_times_gamma = zero_matrix(N, K)
    new_pi = None
    for t in range(T-1):
        di_gamma_t = zero_matrix(N, N)
        gamma_t = [0.0] * N
        O_t = obs[t]
        O_t_plus_1 = obs[t+1]
        for i in range(N):
            for j in range(N):
                di_gamma_t[i][j] = betas[t+1][j] * A[i][j] * B[j][O_t_plus_1] * alphas[t][i]
                gamma_t[i] += di_gamma_t[i][j]
                sum_di_gamma[i][j] += di_gamma_t[i][j]
            sum_gamma[i] += gamma_t[i]
            sum_O_k_count_times_gamma[i][O_t] += gamma_t[i]
        if t == 0:
            new_pi = gamma_t

    new_A = create_zero_mat(self.N, self.N)
    new_B = create_zero_mat(self.N, self.K)
    for i in range(self.N):
        for j in range(self.N):
            new_A[i][j] = sum_di_gamma[i][j] / sum_gamma[i]
        for k in range(self.K):
            new_B[i][k] = sum_O_k_count_times_gamma[i][k] / sum_gamma[i]
    return new_A, new_B, new_pi, c

In [14]:
def gamma_func(A, B, pi, obs):
    T = len(obs)
    N = len(A)
    alphas, c = alpha_pass_scaled(A, B, pi, obs)
    betas = beta_pass_scaled(A, B, obs, c)
    di_gammas = zero_array_3d(T-1, N, N)
    gammas = zero_matrix(T, N)
    for t in range(T-2, -1, -1):
        for i in range(N):
            gammas[t][i] = 0
            for j in range(N):
                di_gammas[t][i][j] = alphas[t][i] * A[i][j] * B[j][obs[t+1]] * betas[t+1][j] # / sum(alphas[T-1][k] for k in range(N))
                gammas[t][i] += di_gammas[t][i][j]
    gammas[T-1] = [alphas[T-1][i] for i in range(N)]
    return gammas, di_gammas, c

In [19]:
gammas, di_gammas, c= gamma_func(A, B, pi, obs)
print(gammas[T-1])

[0.5331035643930517, 0.3478984383414458, 0.06723797492664782, 0.05176002233885484]


In [16]:
def baum_welch(A, B, pi, obs):
    # given a starting guess
    N = len(A)
    K = len(B[0])
    T = len(obs)
    # Compute di-gamma and gamma functions
    gammas,di_gammas, c = gamma_func(A, B, pi, obs)
    # re-estimate lambda
    for i in range(N):
        for j in range(N):
            numr = 0
            dem = 0
            for t in range(T-1):
                numr += di_gammas[t][i][j]
                dem += gammas[t][i]
            A[i][j] = numr / dem
        for k in range(K):
            numr = 0
            dem = 0
            for t in range(T):
                numr += (obs[t] == k) * gammas[t][i]
                dem += gammas[t][i]
            B[i][k] = numr / dem
        pi[i] = gammas[0][i]
    return A, B, pi, c

In [17]:
def learning(A, B, pi, obs):
    N = len(A)
    T = len(obs)
    maxIters = 1000
    loss = 1e-3
    oldLogProb = 0
    for iters in range(maxIters):
        A, B, pi, c = baum_welch(A, B, pi, obs)
        # Compute log P(O|lambda)
        logProb = -sum(log(ct) for ct in c)
        if iters != 0 and (logProb <= oldLogProb or abs(logProb - oldLogProb) <= loss):
        # if logProb <= oldLogProb or abs(logProb - oldLogProb) <= loss:
            break
        oldLogProb = logProb
    return A, B

In [18]:
A, B = learning(A, B, pi, obs)
print(A)
print(B)

[[0.7607367830329481, 0.9999221551466781, 0.2551860381549284, 0.17277815552981357], [0.053800755696747735, 0.2828656049485824, 0.03609448629855387, 0.024438401146672815], [0.08764205267339552, 0.23039566942267628, 0.11759667047160739, 0.03981043784259935], [0.10377785445485417, 0.27281387780221106, 0.06962371247771523, 0.0942798964239097]]
[[0.4500848081056146, 0.1568244272679548, 0.1689192583220022, 0.2241715063044277], [0.1894307560526987, 0.39628133132835514, 0.2184186934539254, 0.19586921916502076], [0.1523363162013573, 0.15482168573381982, 0.4736303478636909, 0.21921165020113195], [0.1733741504686374, 0.12902023466861307, 0.19603981753657504, 0.5015657973261737]]
