In [77]:
import numpy as np

### Task 0: Markov Chain

In [78]:
"""Task 0: Markov Chain"""


def markov_chain(P, s, t=1):
    """Determines the probability of a markov chain being in a particular state
        after a specified number of steps
    Args:
        P: is a square 2D numpy.ndarray of shape (n, n) representing the
            transition matrix
            P[i, j] is the probability of transitioning from state i to state j
            n is the number of states in the markov chain
        s: is a numpy.ndarray of shape (1, n) representing the probability of
            starting in each state
        t: is the number of steps that the markov chain has been through
    Returns:
        a numpy.ndarray of shape (1, n) representing the probability of being
        in a specific state after t steps, or None on failure
    """
    for i in range(t):
        s = s @ P
    return s

### Task 1: Regular Chains

In [79]:
"""Task 1: Regular Chains"""


def regular(P):
    """Determines the steady state probabilities of a regular markov chain
    Args:
        P: is a square 2D numpy.ndarray of shape (n, n) representing the
            transition matrix
            P[i, j] is the probability of transitioning from state i to state j
            n is the number of states in the markov chain
    Returns:
        a numpy.ndarray of shape (1, n) representing the steady state
        probabilities, or None on failure
    """

    if not isinstance(P, np.ndarray) or len(P.shape) != 2:
        return None
    if P.shape[0] != P.shape[1]:
        return None
    if np.sum(P, axis=1).all() != 1:
        return None
    if np.any(P <= 0):
        return None

    n = P.shape[0]
    s = np.ones((1, n)) / n
    t = 1000
    for i in range(t):
        s = s @ P
    return s

### Task 2

In [100]:
"""Task 2: Absorbing Chains"""


def absorbing(P):
    """Determines if a makov chain is absorbing
    Args:
        P: is a square 2D numpy.ndarray of shape (n, n) representing the
            transition matrix
            P[i, j] is the probability of transitioning from state i to state j
            n is the number of states in the markov chain
    Returns:
        True if it is absorbing, or False on failure
    """
    
    if not isinstance(P, np.ndarray) or len(P.shape) != 2:
        return False
    if P.shape[0] != P.shape[1]:
        return False
    if np.sum(P, axis=1).all() != 1:
        return False
    if not np.any(np.diag(P) == 1):
        return False
    if (np.diag(P) == 1).all():
        return True

    n = P.shape[0]
    Q = P[np.diag(P) != 1, :][:, np.diag(P) != 1]
    print(Q)
    if np.any(np.sum(Q, axis=1) == 0):
        return False
    return True

In [101]:
a = np.eye(2)
b = np.array([[0.6, 0.4],
                [0.3, 0.7]])
c = np.array([[0.25, 0.2, 0.25, 0.3],
                [0.2, 0.3, 0.2, 0.3],
                [0.25, 0.25, 0.4, 0.1],
                [0.3, 0.3, 0.1, 0.3]])
d = np.array([[1, 0, 0, 0, 0],
                [0.25, 0.75, 0, 0, 0],
                [0, 0, 0.5, 0.2, 0.3],
                [0, 0, 0.3, 0.5, .2],
                [0, 0, 0.2, 0.3, 0.5]])
e = np.array([[1, 0, 0, 0, 0],
                [0.25, 0.75, 0, 0, 0],
                [0, 0.1, 0.5, 0.2, 0.2],
                [0, 0.1, 0.2, 0.5, .2],
                [0, 0.1, 0.2, 0.2, 0.5]])
f = np.array([[1, 0, 0, 0],
                [0, 1, 0, 0],
                [0, 0, 0.5, 0.5],
                [0, 0.5, 0.5, 0]])
# print(absorbing(a))
# print(absorbing(b))
# print(absorbing(c))
print(absorbing(d))
print(absorbing(e))
print(absorbing(f))

LinAlgError: Singular matrix

### Task 3: The Forward Algorithm

In [82]:
"""Task 3: The Forward Algorithm"""


def forward(Observation, Emission, Transition, Initial):
    """Performs the forward algorithm for a hidden markov model
    Args:
        Observation: is a numpy.ndarray of shape (T,) that contains the index
            of the observation
            T is the number of observations
        Emission: is a numpy.ndarray of shape (N, M) containing the emission
            probability of a specific observation given a hidden state
            Emission[i, j] is the probability of observing j given the hidden
            state i
            N is the number of hidden states
            M is the number of all possible observations
        Transition: is a 2D numpy.ndarray of shape (N, N) containing the
            transition probabilities
            Transition[i, j] is the probability of transitioning from the
            hidden state i to j
        Initial: numpy.ndarray of shape (N, 1) containing the probability of
            starting in a particular hidden state
        Returns:
            P, F, or None, None on failure
            P is the likelihood of the observations given the model
            F is a numpy.ndarray of shape (N, T) containing the forward path
                probabilities
                F[i, j] is the probability of being in hidden state i at time
                j given the previous observations"""

    if not isinstance(Observation, np.ndarray) or len(Observation.shape) != 1:
        return None, None
    if not isinstance(Emission, np.ndarray) or len(Emission.shape) != 2:
        return None, None
    if not isinstance(Transition, np.ndarray) or len(Transition.shape) != 2:
        return None, None
    if not isinstance(Initial, np.ndarray) or len(Initial.shape) != 2:
        return None, None

    T = Observation.shape[0]
    N, M = Emission.shape
    if Transition.shape[0] != N or Transition.shape[1] != N:
        return None, None
    if Initial.shape[0] != N or Initial.shape[1] != 1:
        return None, None

    F = np.zeros((N, T))
    F[:, 0] = Initial.T * Emission[:, Observation[0]]

    for t in range(1, T):
        for s in range(N):
            F[s, t] = np.sum(F[:, t - 1] * Transition[:, s] *
                             Emission[s, Observation[t]])

    P = np.sum(F[:, T - 1])

    return P, F


In [83]:
np.random.seed(1)
Emission = np.array([[0.90, 0.10, 0.00, 0.00, 0.00, 0.00],
                        [0.40, 0.50, 0.10, 0.00, 0.00, 0.00],
                        [0.00, 0.25, 0.50, 0.25, 0.00, 0.00],
                        [0.00, 0.00, 0.05, 0.70, 0.15, 0.10],
                        [0.00, 0.00, 0.00, 0.20, 0.50, 0.30]])
Transition = np.array([[0.60, 0.39, 0.01, 0.00, 0.00],
                        [0.20, 0.50, 0.30, 0.00, 0.00],
                        [0.01, 0.24, 0.50, 0.24, 0.01],
                        [0.00, 0.00, 0.15, 0.70, 0.15],
                        [0.00, 0.00, 0.01, 0.39, 0.60]])
Initial = np.array([0.05, 0.20, 0.50, 0.20, 0.05])
Hidden = [np.random.choice(5, p=Initial)]
for _ in range(364):
    Hidden.append(np.random.choice(5, p=Transition[Hidden[-1]]))
Hidden = np.array(Hidden)
Observations = []
for s in Hidden:
    Observations.append(np.random.choice(6, p=Emission[s]))
Observations = np.array(Observations)
P, F = forward(Observations, Emission, Transition, Initial.reshape((-1, 1)))
print(P)
print(F)

1.7080966131859584e-214
[[0.00000000e+000 0.00000000e+000 2.98125000e-004 ... 0.00000000e+000
  0.00000000e+000 0.00000000e+000]
 [2.00000000e-002 0.00000000e+000 3.18000000e-003 ... 0.00000000e+000
  0.00000000e+000 0.00000000e+000]
 [2.50000000e-001 3.31250000e-002 0.00000000e+000 ... 2.13885975e-214
  1.17844112e-214 0.00000000e+000]
 [1.00000000e-002 4.69000000e-002 0.00000000e+000 ... 2.41642482e-213
  1.27375484e-213 9.57568349e-215]
 [0.00000000e+000 8.00000000e-004 0.00000000e+000 ... 1.96973759e-214
  9.65573676e-215 7.50528264e-215]]


### Task 4: The Viretbi Algorithm

In [84]:
"""Task 4: The Viterbi Algorithm"""


def viterbi(Observation, Emission, Transition, Initial):
    """Calculates the most likely sequence of hidden states for a hidden
    markov model
    Args:
        Observation: is a numpy.ndarray of shape (T,) that contains the index
            of the observation
            T is the number of observations
        Emission: is a numpy.ndarray of shape (N, M) containing the emission
            probability of a specific observation given a hidden state
            Emission[i, j] is the probability of observing j given the hidden
            state i
            N is the number of hidden states
            M is the number of all possible observations
        Transition: is a 2D numpy.ndarray of shape (N, N) containing the
            transition probabilities
            Transition[i, j] is the probability of transitioning from the
            hidden state i to j
        Initial: numpy.ndarray of shape (N, 1) containing the probability of
            starting in a particular hidden state
        Returns:
            path, P, or None, None on failure
            path is the a list of length T containing the most likely sequence
                of hidden states
            P is the probability of obtaining the path sequence"""

    if not isinstance(Observation, np.ndarray) or len(Observation.shape) != 1:
        return None, None
    if not isinstance(Emission, np.ndarray) or len(Emission.shape) != 2:
        return None, None
    if not isinstance(Transition, np.ndarray) or len(Transition.shape) != 2:
        return None, None
    if not isinstance(Initial, np.ndarray) or len(Initial.shape) != 2:
        return None, None

    T = Observation.shape[0]
    N, M = Emission.shape
    if Transition.shape[0] != N or Transition.shape[1] != N:
        return None, None
    if Initial.shape[0] != N or Initial.shape[1] != 1:
        return None, None

    V = np.zeros((N, T))
    B = np.zeros((N, T))
    V[:, 0] = Initial.T * Emission[:, Observation[0]]

    for t in range(1, T):
        for s in range(N):
            V[s, t] = np.max(V[:, t - 1] * Transition[:, s] *
                             Emission[s, Observation[t]])
            B[s, t] = np.argmax(V[:, t - 1] * Transition[:, s])

    path = [np.argmax(V[:, T - 1])]
    for t in range(T - 1, 0, -1):
        path.append(int(B[path[-1], t]))
    path.reverse()

    P = np.max(V[:, T - 1])

    return path, P

In [85]:
np.random.seed(1)
Emission = np.array([[0.90, 0.10, 0.00, 0.00, 0.00, 0.00],
                        [0.40, 0.50, 0.10, 0.00, 0.00, 0.00],
                        [0.00, 0.25, 0.50, 0.25, 0.00, 0.00],
                        [0.00, 0.00, 0.05, 0.70, 0.15, 0.10],
                        [0.00, 0.00, 0.00, 0.20, 0.50, 0.30]])
Transition = np.array([[0.60, 0.39, 0.01, 0.00, 0.00],
                        [0.20, 0.50, 0.30, 0.00, 0.00],
                        [0.01, 0.24, 0.50, 0.24, 0.01],
                        [0.00, 0.00, 0.15, 0.70, 0.15],
                        [0.00, 0.00, 0.01, 0.39, 0.60]])
Initial = np.array([0.05, 0.20, 0.50, 0.20, 0.05])
Hidden = [np.random.choice(5, p=Initial)]
for _ in range(364):
    Hidden.append(np.random.choice(5, p=Transition[Hidden[-1]]))
Hidden = np.array(Hidden)
Observations = []
for s in Hidden:
    Observations.append(np.random.choice(6, p=Emission[s]))
Observations = np.array(Observations)
path, P = viterbi(Observations, Emission, Transition, Initial.reshape((-1, 1)))
print(P)
print(path)

4.701733355108224e-252
[2, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 1, 1, 1, 1, 0, 0, 1, 2, 2, 2, 3, 3, 3, 2, 1, 2, 1, 1, 2, 2, 2, 3, 3, 2, 2, 3, 4, 4, 3, 3, 2, 2, 3, 3, 3, 2, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 2, 3, 3, 2, 1, 2, 1, 1, 1, 2, 2, 3, 4, 4, 4, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 3, 3, 3, 3, 2, 2, 2, 3, 3, 3, 4, 4, 4, 4, 4, 3, 2, 2, 3, 2, 2, 3, 4, 4, 4, 3, 2, 1, 0, 0, 0, 1, 2, 2, 1, 1, 2, 3, 3, 2, 1, 1, 1, 2, 3, 3, 3, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 2, 1, 2, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 1, 0, 0, 1, 2, 2, 1, 2, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 3, 3, 4, 4, 4, 4, 3, 3, 3, 2, 1, 1, 1, 1, 2, 1, 0, 0, 0, 0, 1, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 2, 3, 4, 4, 4, 3, 3, 3, 3, 2, 2, 3, 3, 3, 3, 4, 4, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 2, 1, 2, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 2, 1, 1, 2, 1, 1, 2, 2, 2, 1, 0, 0, 0

### Task 5: The Backward Algorithm

In [86]:
"""Task 5: The Backward Algorithm"""


def backward(Observation, Emission, Transition, Initial):
    """Performs the backward algorithm for a hidden markov model
    Args:
        Observation: is a numpy.ndarray of shape (T,) that contains the index
            of the observation
            T is the number of observations
        Emission: is a numpy.ndarray of shape (N, M) containing the emission
            probability of a specific observation given a hidden state
            Emission[i, j] is the probability of observing j given the hidden
            state i
            N is the number of hidden states
            M is the number of all possible observations
        Transition: is a 2D numpy.ndarray of shape (N, N) containing the
            transition probabilities
            Transition[i, j] is the probability of transitioning from the
            hidden state i to j
        Initial: numpy.ndarray of shape (N, 1) containing the probability of
            starting in a particular hidden state
        Returns: P, B, or None, None on failure
            Pis the likelihood of the observations given the model
            B is a numpy.ndarray of shape (N, T) containing the backward
                path probabilities
            B[i, j] is the probability of generating the future observations
                from hidden state i at time j"""

    if not isinstance(Observation, np.ndarray) or len(Observation.shape) != 1:
        return None, None
    if not isinstance(Emission, np.ndarray) or len(Emission.shape) != 2:
        return None, None
    if not isinstance(Transition, np.ndarray) or len(Transition.shape) != 2:
        return None, None
    if not isinstance(Initial, np.ndarray) or len(Initial.shape) != 2:
        return None, None

    T = Observation.shape[0]
    N, M = Emission.shape
    if Transition.shape[0] != N or Transition.shape[1] != N:
        return None, None
    if Initial.shape[0] != N or Initial.shape[1] != 1:
        return None, None

    B = np.zeros((N, T))
    B[:, T - 1] = np.ones(N)

    for t in range(T - 2, -1, -1):
        for s in range(N):
            B[s, t] = np.sum(B[:, t + 1] * Transition[s, :] *
                             Emission[:, Observation[t + 1]])

    P = np.sum(Initial.T * Emission[:, Observation[0]] * B[:, 0])

    return P, B

In [87]:
np.random.seed(1)
Emission = np.array([[0.90, 0.10, 0.00, 0.00, 0.00, 0.00],
                        [0.40, 0.50, 0.10, 0.00, 0.00, 0.00],
                        [0.00, 0.25, 0.50, 0.25, 0.00, 0.00],
                        [0.00, 0.00, 0.05, 0.70, 0.15, 0.10],
                        [0.00, 0.00, 0.00, 0.20, 0.50, 0.30]])
Transition = np.array([[0.60, 0.39, 0.01, 0.00, 0.00],
                        [0.20, 0.50, 0.30, 0.00, 0.00],
                        [0.01, 0.24, 0.50, 0.24, 0.01],
                        [0.00, 0.00, 0.15, 0.70, 0.15],
                        [0.00, 0.00, 0.01, 0.39, 0.60]])
Initial = np.array([0.05, 0.20, 0.50, 0.20, 0.05])
Hidden = [np.random.choice(5, p=Initial)]
for _ in range(364):
    Hidden.append(np.random.choice(5, p=Transition[Hidden[-1]]))
Hidden = np.array(Hidden)
Observations = []
for s in Hidden:
    Observations.append(np.random.choice(6, p=Emission[s]))
Observations = np.array(Observations)
P, B = backward(Observations, Emission, Transition, Initial.reshape((-1, 1)))
print(P)
print(B)

1.7080966131859631e-214
[[1.28912952e-215 6.12087935e-212 1.00555701e-211 ... 6.75000000e-005
  0.00000000e+000 1.00000000e+000]
 [3.86738856e-214 2.69573528e-212 4.42866330e-212 ... 2.02500000e-003
  0.00000000e+000 1.00000000e+000]
 [6.44564760e-214 5.15651808e-213 8.47145100e-213 ... 2.31330000e-002
  2.70000000e-002 1.00000000e+000]
 [1.93369428e-214 0.00000000e+000 0.00000000e+000 ... 6.39325000e-002
  1.15000000e-001 1.00000000e+000]
 [1.28912952e-215 0.00000000e+000 0.00000000e+000 ... 5.77425000e-002
  2.19000000e-001 1.00000000e+000]]


### Task 6: The Baum-Welch Algorithm

In [88]:
"""Task 6: The Baum-Welch Algorithm"""


def baum_welch(Observations, Transition, Emission, Initial, iterations=1000):
    """Performs the Baum-Welch algorithm for a hidden markov model
    Args:
        Observations: is a numpy.ndarray of shape (T,) that contains the index
            of the observation
            T is the number of observations
        Transition: is a numpy.ndarray of shape (M, M) that contains the
            initialized transition probabilities
            M is the number of hidden states
        Emission: is a numpy.ndarray of shape (M, N) that contains the
            initialized emission probabilities
            N is the number of output states
        Initial: is a numpy.ndarray of shape (M, 1) that contains the
            initialized starting probabilities
        iterations: is the number of times expectation-maximization should be
            performed
    Returns: the converged Transition, Emission, or None, None on failure"""

    if not isinstance(Observations, np.ndarray) or len(Observations.shape) != 1:
        return None, None
    if not isinstance(Emission, np.ndarray) or len(Emission.shape) != 2:
        return None, None
    if not isinstance(Transition, np.ndarray) or len(Transition.shape) != 2:
        return None, None
    if not isinstance(Initial, np.ndarray) or len(Initial.shape) != 2:
        return None, None

    T = Observations.shape[0]
    N, M = Emission.shape
    if Transition.shape[0] != M or Transition.shape[1] != M:
        return None, None
    if Initial.shape[0] != M or Initial.shape[1] != 1:
        return None, None

    for n in range(iterations):
        _, alpha = forward(Observations, Emission, Transition, Initial)
        _, beta = backward(Observations, Emission, Transition, Initial)

        xi = np.zeros((M, M, T - 1))
        for t in range(T - 1):
            denominator = np.dot(np.dot(alpha[:, t].T, Transition) *
                                 Emission[:, Observations[t + 1]].T,
                                 beta[:, t + 1])
            for i in range(M):
                numerator = alpha[i, t] * Transition[i] * \
                            Emission[:, Observations[t + 1]].T * \
                            beta[:, t + 1].T
                xi[i, :, t] = numerator / denominator

        gamma = np.sum(xi, axis=1)
        Transition = np.sum(xi, 2) / np.sum(gamma, axis=1).reshape((-1, 1))
        gamma = np.hstack((gamma,
                           np.sum(xi[:, :, T - 2], axis=0).reshape((-1, 1))))

        denominator = np.sum(gamma, axis=1)
        for i in range(N):
            Emission[:, i] = np.sum(gamma[:, Observations == i], axis=1)

        Emission = np.divide(Emission, denominator.reshape((-1, 1)))

    return Transition, Emission

In [89]:
np.random.seed(1)
Emission = np.array([[0.90, 0.10, 0.00],
                        [0.40, 0.50, 0.10]])
Transition = np.array([[0.60, 0.4],
                        [0.30, 0.70]])
Initial = np.array([0.5, 0.5])
Hidden = [np.random.choice(2, p=Initial)]
for _ in range(364):
    Hidden.append(np.random.choice(2, p=Transition[Hidden[-1]]))
Hidden = np.array(Hidden)
Observations = []
for s in Hidden:
    Observations.append(np.random.choice(3, p=Emission[s]))
Observations = np.array(Observations)
T_test = np.ones((2, 2)) / 2
E_test = np.abs(np.random.randn(2, 3))
E_test = E_test / np.sum(E_test, axis=1).reshape((-1, 1))
T, E = baum_welch(Observations, T_test, E_test, Initial.reshape((-1, 1)))
print(np.round(T, 2))
print(np.round(E, 2))

TypeError: unsupported operand type(s) for *: 'NoneType' and 'float'