# **Problem #2**

Write a program to determine learning parameters of Hidden Markov Model using unsupervised learning (Baum Welch algorithm) considering the following dataset.

States: H, L

Observations: R, D

Timestamps: 1, 2, 3, 4, 5, 6

Observation Sequence 1: R, R, D, D, R, D, R

Observation Sequence 2: D, R, D, D, R, R, D

Observation Sequence 3: R, D, D, D, R, D, R

Observation Sequence 4: R, R, R, D, D, R, D

Observation Sequence 5: D, R, D, D, D, D, R

# **Solution**

In [None]:
import numpy as np

In [None]:
# Define states and observations
states = ['H', 'L']
observations = ['R', 'D']
num_states = len(states)
num_observations = len(observations)

# Observation Sequences
observation_sequences = [
    ['R', 'R', 'D', 'D', 'R', 'D'],
    ['D', 'R', 'D', 'D', 'R', 'R'],
    ['R', 'D', 'D', 'D', 'R', 'D'],
    ['R', 'R', 'R', 'D', 'D', 'R'],
    ['D', 'R', 'D', 'D', 'D', 'D']
]

In [None]:
# Initialize parameters randomly
np.random.seed(42)  # Set seed for reproducibility
pi = np.random.rand(num_states)
pi = pi / pi.sum()  # Normalize

A = np.random.rand(num_states, num_states)
A = A / A.sum(axis=1, keepdims=True)  # Normalize each row

B = np.random.rand(num_states, num_observations)
B = B / B.sum(axis=1, keepdims=True)  # Normalize each row

# Observation to index mapping
obs_map = {obs: i for i, obs in enumerate(observations)}

In [None]:
# Function to calculate forward probabilities (alpha)
def forward(observation_seq, A, B, pi):
    T = len(observation_seq)
    alpha = np.zeros((T, num_states))

    # Initial step
    alpha[0] = pi * B[:, obs_map[observation_seq[0]]]

    # Recursive step
    for t in range(1, T):
        for j in range(num_states):
            alpha[t, j] = np.sum(alpha[t-1] * A[:, j]) * B[j, obs_map[observation_seq[t]]]

    return alpha

In [None]:
# Function to calculate backward probabilities (beta)
def backward(observation_seq, A, B):
    T = len(observation_seq)
    beta = np.zeros((T, num_states))

    # Initial step
    beta[-1] = 1  # All probabilities are 1 at the final step

    # Recursive step
    for t in range(T-2, -1, -1):
        for i in range(num_states):
            beta[t, i] = np.sum(A[i, :] * B[:, obs_map[observation_seq[t+1]]] * beta[t+1])

    return beta

In [None]:
# Function to re-estimate parameters (M-step)
def re_estimate_params(observation_sequences, A, B, pi):
    num_sequences = len(observation_sequences)
    T = len(observation_sequences[0])

    # Initialize updated parameters
    pi_new = np.zeros(num_states)
    A_new = np.zeros_like(A)
    B_new = np.zeros_like(B)

    # Accumulators for pi, A, and B
    gamma_sum = np.zeros(num_states)
    xi_sum = np.zeros((num_states, num_states))
    gamma_obs_sum = np.zeros((num_states, num_observations))

    for observation_seq in observation_sequences:
        alpha = forward(observation_seq, A, B, pi)
        beta = backward(observation_seq, A, B)

        # Gamma: Probability of being in state i at time t
        gamma = (alpha * beta) / np.sum(alpha * beta, axis=1, keepdims=True)

        # Xi: Probability of transitioning from state i to j at time t
        xi = np.zeros((T-1, num_states, num_states))
        for t in range(T-1):
            denom = np.sum(alpha[t, :] @ A * B[:, obs_map[observation_seq[t+1]]] * beta[t+1, :])
            for i in range(num_states):
                xi[t, i, :] = alpha[t, i] * A[i, :] * B[:, obs_map[observation_seq[t+1]]] * beta[t+1, :] / denom

        # Accumulate pi_new
        pi_new += gamma[0]

        # Accumulate A_new
        xi_sum += np.sum(xi, axis=0)

        # Accumulate B_new
        for t in range(T):
            gamma_obs_sum[:, obs_map[observation_seq[t]]] += gamma[t]
        gamma_sum += np.sum(gamma, axis=0)

    # Normalize new estimates
    pi_new /= num_sequences
    A_new = xi_sum / np.sum(xi_sum, axis=1, keepdims=True)
    B_new = gamma_obs_sum / gamma_sum[:, None]

    return pi_new, A_new, B_new

In [None]:
# Baum-Welch Algorithm
def baum_welch(observation_sequences, A, B, pi, max_iter=100, tol=1e-4):
    for iteration in range(max_iter):
        pi_new, A_new, B_new = re_estimate_params(observation_sequences, A, B, pi)

        # Check for convergence
        if np.max(np.abs(A_new - A)) < tol and np.max(np.abs(B_new - B)) < tol and np.max(np.abs(pi_new - pi)) < tol:
            print(f"Converged after {iteration + 1} iterations.")
            break

        A, B, pi = A_new, B_new, pi_new

    return pi, A, B

In [None]:
pi_final, A_final, B_final = baum_welch(observation_sequences, A, B, pi)

# Output the results
print("Final Initial State Probabilities (π):")
for i, state in enumerate(states):
    print(f"π({state}) = {pi_final[i]:.3f}")

print("\nFinal Transition Probabilities (A):")
for i, state_from in enumerate(states):
    for j, state_to in enumerate(states):
        print(f"A({state_from} -> {state_to}) = {A_final[i, j]:.3f}")

print("\nFinal Emission Probabilities (B):")
for i, state in enumerate(states):
    for j, obs in enumerate(observations):
        print(f"B({state} -> {obs}) = {B_final[i, j]:.3f}")

Final Initial State Probabilities (π):
π(H) = 0.000
π(L) = 1.000

Final Transition Probabilities (A):
A(H -> H) = 0.985
A(H -> L) = 0.015
A(L -> H) = 0.593
A(L -> L) = 0.407

Final Emission Probabilities (B):
B(H -> R) = 0.341
B(H -> D) = 0.659
B(L -> R) = 0.658
B(L -> D) = 0.342
