# 		Lab HMM Report
谭树杰  11849060

In [23]:
import numpy as np

In [24]:
class HMM(object):
    # Implements discrete 1-st order Hidden Markov Model

    def __init__(self, tolerance=1e-6, max_iterations=10000, scaling=True):
        self.tolerance = tolerance
        self.max_iter = max_iterations
        self.scaling = scaling

    def HMMfwd(self, a, b, o, pi):
        # Implements HMM Forward algorithm

        N = np.shape(b)[0]  
        T = np.shape(o)[0]      # o = [o1, o2, o3,...,oT]
        c = np.ones((T))
        alpha = np.zeros((N, T))
        # initialise first column with observation values
        alpha[:, 0] = pi * b[:, o[0]]   # pi = [pi_1, pi_2,...,pi_T]

        for t in range(1, T):
            for i in range(N):
                alpha[i, t] = b[i, o[t]] * np.sum(alpha[:, t - 1] * a[:, i])

        return alpha, c

    def HMMbwd(self, a, b, o):  # delete c
        # Implements HMM Backward algorithm

        N = np.shape(b)[0]
        T = np.shape(o)[0]

        beta = np.zeros((N, T))
        # initialise last row with 1
        beta[:, T - 1] = 1

        for t in range(T - 2, -1, -1):
            for i in range(N):
                beta[i, t] = np.sum(b[:, o[t + 1]] * beta[:, t + 1] * a[i, :])
            # scale beta by the same value as a
            # beta[:, t] = beta[:, t] * c[t]

        return beta

    def HMMViterbi(self, a, b, o, pi):
        # Implements HMM Viterbi algorithm

        N = np.shape(b)[0]
        T = np.shape(o)[0]

        path = np.zeros(T)
        delta = np.zeros((N, T))
        phi = np.zeros((N, T))

        # initialise first column of delta and phi
        delta[:, 0] = pi * b[:, o[0]]
        phi[:, 0] = 0

        for t in range(1, T):
            for i in range(N):
                delta[i, t] = np.max(delta[:, t - 1] * a[:, i]) * b[i, o[t]]
                phi[i, t] = np.argmax(delta[:, t - 1] * a[:, i])

        path[T - 1] = np.argmax(delta[:, T - 1])
        for t in range(T - 2, -1, -1):
            path[t] = phi[int(path[t + 1]), t + 1]

        return path, delta, phi

    def HMMBaumWelch(self, o, N, dirichlet=False, verbose=False, rand_seed=1):
        # Implements HMM Baum-Welch algorithm

        T = np.shape(o)[0]
        M = int(max(o)) + 1  # now all hist time-series will contain all observation vals, but we have to provide for all

        digamma = np.zeros((N, N, T))

        # Initialise A, B and pi randomly, but so that they sum to one
        np.random.seed(rand_seed)

        # Initialisation can be done either using dirichlet distribution (all randoms sum to one)
        # or using approximates uniforms from matrix sizes
        if dirichlet:
            pi = np.ndarray.flatten(np.random.dirichlet(np.ones(N), size=1))

            a = np.random.dirichlet(np.ones(N), size=N)

            b = np.random.dirichlet(np.ones(M), size=N)
        else:

            pi_randomizer = np.ndarray.flatten(np.random.dirichlet(np.ones(N), size=1)) / 100
            pi = 1.0 / N * np.ones(N) - pi_randomizer

            a_randomizer = np.random.dirichlet(np.ones(N), size=N) / 100
            a = 1.0 / N * np.ones([N, N]) - a_randomizer

            b_randomizer = np.random.dirichlet(np.ones(M), size=N) / 100
            b = 1.0 / M * np.ones([N, M]) - b_randomizer

        error = self.tolerance + 10
        itter = 0
        while (error > self.tolerance) & (itter < self.max_iter):

            prev_a = a.copy()
            prev_b = b.copy()

            # Estimate model parameters
            alpha, c = self.HMMfwd(a, b, o, pi)
            beta = self.HMMbwd(a, b, o, c)

            for t in range(T - 1):
                for i in range(N):
                    for j in range(N):
                        digamma[i, j, t] = alpha[i, t] * a[i, j] * b[j, o[t + 1]] * beta[j, t + 1]
                digamma[:, :, t] /= np.sum(digamma[:, :, t])

            for i in range(N):
                for j in range(N):
                    digamma[i, j, T - 1] = alpha[i, T - 1] * a[i, j]
            digamma[:, :, T - 1] /= np.sum(digamma[:, :, T - 1])

            # Maximize parameter expectation
            for i in range(N):
                pi[i] = np.sum(digamma[i, :, 0])
                for j in range(N):
                    a[i, j] = np.sum(digamma[i, j, :T - 1]) / np.sum(digamma[i, :, :T - 1])

                for k in range(M):
                    filter_vals = (o == k).nonzero()
                    b[i, k] = np.sum(digamma[i, :, filter_vals]) / np.sum(digamma[i, :, :])

            error = (np.abs(a - prev_a)).max() + (np.abs(b - prev_b)).max()
            itter += 1

            if verbose:
                print("Iteration: ", itter, " error: ", error, "P(O|lambda): ", np.sum(alpha[:, T - 1]))

        return a, b, pi, alpha

# 1.	According to the following initial models (Tab-A1, B1 and C1), please use the HMM algorithm (Viterbi) to decode the order of the coin flipping (hidden state) for Seq, and give/compare the predictions under different priori distributions (Tab-A1).


In [26]:
# case 1
hmm = HMM()
# (a, b, pi_est, alpha_est) = hmm.HMMBaumWelch(hist_O, 2, False, True)
# (path, delta, phi) = hmm.HMMViterbi(a, b, hist_O, pi_est)

pi = np.array([0.6, 0.2, 0.2])
a = np.array([[0.6, 0.2, 0.2], [0.3, 0.5, 0.2], [0.5, 0.2, 0.3]])
b = np.array([[0.7, 0.3], [0.4, 0.6],[0.5,0.5]])
o = np.array([1, 1, 0, 1, 0, 1,  1, 0, 0, 0, 1, 1, 0, 1, 0])    # 1 for head, 0 for tail
(path, delta, phi) = hmm.HMMViterbi(a, b, o, pi)
print(path)


[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [27]:
# case 2
pi = np.array([0.3, 0.5, 0.2])
a = np.array([[0.6, 0.2, 0.2], [0.3, 0.5, 0.2], [0.5, 0.2, 0.3]])
b = np.array([[0.7, 0.3], [0.4, 0.6],[0.5,0.5]])
o = np.array([1, 1, 0, 1, 0, 1,  1, 0, 0, 0, 1, 1, 0, 1, 0])    # 1 for head, 0 for tail
(path, delta, phi) = hmm.HMMViterbi(a, b, o, pi)
print(path)

[1. 1. 1. 1. 1. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0.]


In [28]:
# case 3
pi = np.array([0.5, 0.2, 0.3])
a = np.array([[0.6, 0.2, 0.2], [0.3, 0.5, 0.2], [0.5, 0.2, 0.3]])
b = np.array([[0.7, 0.3], [0.4, 0.6],[0.5,0.5]])
o = np.array([1, 1, 0, 1, 0, 1,  1, 0, 0, 0, 1, 1, 0, 1, 0])    # 1 for head, 0 for tail
(path, delta, phi) = hmm.HMMViterbi(a, b, o, pi)
print(path)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


# 2.	Given a set of randomly initial transition probabilities (Tab-A2), initial observation probabilities (Tab-B2) and the probabilities of picking each coin (Tab-C2), please give the new model parameters after 2 iterations for Seq;

In [31]:
def HMMBaumWelch2(a, b, o, pi,  N):
        # Implements HMM Baum-Welch algorithm

        T = np.shape(o)[0]  
        M = int(max(o)) + 1  # now all hist time-series will contain all observation vals, but we have to provide for all

        digamma = np.zeros((N, N, T))  # N is number of hidden states

        itter = 0
        while itter < 2:
            # Estimate model parameters
            alpha, c = hmm.HMMfwd(a, b, o, pi)
            beta = hmm.HMMbwd(a, b, o)  # ,c

            for t in range(T - 1):
                for i in range(N):
                    for j in range(N):
                        digamma[i, j, t] = alpha[i, t] * a[i, j] * b[j, o[t + 1]] * beta[j, t + 1]
                digamma[:, :, t] /= np.sum(digamma[:, :, t])

            for i in range(N):
                for j in range(N):
                    digamma[i, j, T - 1] = alpha[i, T - 1] * a[i, j]
            digamma[:, :, T - 1] /= np.sum(digamma[:, :, T - 1])

            # Maximize parameter expectation
            for i in range(N):
                pi[i] = np.sum(digamma[i, :, 0])
                for j in range(N):
                    a[i, j] = np.sum(digamma[i, j, :T - 1]) / np.sum(digamma[i, :, :T - 1])

                for k in range(M):
                    filter_vals = (o == k).nonzero()
                    b[i, k] = np.sum(digamma[i, :, filter_vals]) / np.sum(digamma[i, :, :])

            itter += 1

        return a, b, pi


In [32]:
a = np.array([[0.1,0.3,0.6], [0.5,0.3,0.2], [0.4, 0.3, 0.3]])
b = np.array([[0.2,0.8],[0.4,0.6],[0.5,0.5]])
pi = np.array([0.34, 0.33, 0.33])
o = np.array([1, 1, 0, 1, 0, 1,  1, 0, 0, 0, 1, 1, 0, 1, 0]) 
a_, b_, pi_ = HMMBaumWelch2(a, b, o, pi, 3)

In [34]:
print("After 2 iterations, the new model parameters is")
print("transition matrix a is ")
print(a)
print("observation matrix b is")
print(b)

After 2 iterations, the new model parameters is
transition matrix a is 
[[0.08938234 0.27652377 0.63409388]
 [0.49157463 0.29418758 0.21423779]
 [0.39227178 0.29387318 0.31385505]]
observation matrix b is
[[0.23133301 0.76866699]
 [0.50460543 0.49539457]
 [0.63750276 0.36249724]]
