In [56]:
import numpy as np
import warnings
warnings.filterwarnings("ignore")
np.set_printoptions(suppress=True, precision=8)

In [57]:
# Example values
Pi = np.array([0.99, 0.01])
Emission = np.array([[0.8, 0.2], [0.1, 0.9]])
Transition = np.array([[0.99, 0.01], [0.01, 0.99]])
# Example sequence of observations
observations = [0, 1, 0]

In [58]:
def forward_algorithm(observations, P, E, Pi):
    # Number of hidden states
    num_states = len(Pi)
    # Number of observations
    num_observations = len(observations)
    # Initialize the forward matrix
    forward_matrix = np.zeros((num_states, num_observations))
    # Init the first value of each state
    forward_matrix[:, 0] = Pi * E[:, observations[0]]
    # Print the initial forward variables
    print(f"Alpha at time 1:\n{forward_matrix[:, 0]}\n")
    # Fill in the rest of the forward matrix
    for t in range(1, num_observations):
        for j in range(num_states):
            forward_matrix[j, t] = np.sum(
                forward_matrix[i, t - 1] * P[i, j] * E[j, observations[t]]
                for i in range(num_states)
            )

        # Print the forward variables at each time step
        print(f"Alpha at time {t+1}:\n{forward_matrix[:, t]}\n")
    # Termination step: Compute the likelihood of the sequence by summation of last 2 times
    likelihood = np.sum(forward_matrix[:, num_observations - 1])
    # Print the final forward matrix and likelihood
    print(
        f"Final Forward Matrix: \n{forward_matrix}\n",
        f"Likelihood of the sequence: {likelihood}",
        sep="\n",
    )
    return forward_matrix, likelihood

In [59]:
def backward_algorithm(observations, P, E):
    # Number of hidden states
    num_states = len(P)
    # Number of observations
    num_observations = len(observations)
    # Initialize the backward matrix
    backward_matrix = np.zeros((num_states, num_observations))
    # Initialize the last column of the backward matrix to 1
    backward_matrix[:, num_observations - 1] = 1
    # Print the initial backward variables
    print(
        f"Beta at time {num_observations}:\n{backward_matrix[:, num_observations - 1]}\n"
    )
    # Fill in the rest of the backward matrix
    for t in range(num_observations - 2, -1, -1):
        for i in range(num_states):
            backward_matrix[i, t] = np.sum(
                backward_matrix[j, t + 1] * P[i, j] * E[j, observations[t + 1]]
                for j in range(num_states)
            )

        # Print the backward variables at each time step
        print(f"Beta at time {t+1}:\n{backward_matrix[:, t]}\n")

    # Termination step: Compute the likelihood of the sequence using the initial probabilities and the first observation
    likelihood = np.sum(
        backward_matrix[i, 0] * P[0, j] * E[j, observations[0]]
        for j in range(num_states)
    )

    # Print the final backward matrix and likelihood
    print(
        f"Final Backward Matrix: \n{backward_matrix}\n",
        f"Likelihood of the sequence: {likelihood}",
        sep="\n",
    )

    return backward_matrix, likelihood

In [60]:
def viterbi_algorithm(observations, P, E, Pi):
    # Number of hidden states
    num_states = len(Pi)
    # Number of observations
    num_observations = len(observations)
    # Initialize the Viterbi matrix
    viterbi_matrix = np.zeros((num_states, num_observations))
    # Initialize the backpointer matrix
    path = np.zeros((num_states, num_observations), dtype=int)
    # Initialization step
    viterbi_matrix[:, 0] = Pi * E[:, observations[0]]
    # Recursion step
    for t in range(1, num_observations):
        for j in range(num_states):
            prev_probs = viterbi_matrix[:, t - 1] * P[:, j] * E[j, observations[t]]
            viterbi_matrix[j, t] = round(max(prev_probs),5)
            path[j, t] = np.argmax(prev_probs)

    # Termination step
    best_path_prob = max(viterbi_matrix[:, -1])
    best_last_state = np.argmax(viterbi_matrix[:, -1])

    # Backtrace to find the best path
    best_path = [best_last_state]
    for t in range(num_observations - 1, 0, -1):
        best_last_state = path[best_last_state, t]
        best_path.insert(0, best_last_state)

    return best_path, best_path_prob, viterbi_matrix

In [61]:
# Forward algorithm
forward_matrix, likelihood_forward = forward_algorithm(
    observations, Transition, Emission, Pi
)

# Backward algorithm
backward_matrix, likelihood_backward = backward_algorithm(
    observations, Transition, Emission
)

# Viterbi algorithm
best_path, best_path_prob, viterbi_matrix = viterbi_algorithm(
    observations, Transition, Emission, Pi
)

# Print results
print("\nViterbi Algorithm:")
print("Best Path:", best_path)
print("Best Path Probability:", best_path_prob)
print("Viterbi Matrix:\n", viterbi_matrix)

Alpha at time 1:
[0.792 0.001]

Alpha at time 2:
[0.156818 0.008019]

Alpha at time 3:
[0.12426401 0.0009507 ]

Final Forward Matrix: 
[[0.792      0.156818   0.12426401]
 [0.001      0.008019   0.0009507 ]]

Likelihood of the sequence: 0.125214707
Beta at time 3:
[1. 1.]

Beta at time 2:
[0.793 0.107]

Beta at time 1:
[0.157977 0.096923]

Final Backward Matrix: 
[[0.157977 0.793    1.      ]
 [0.096923 0.107    1.      ]]

Likelihood of the sequence: 0.07685993900000002

Viterbi Algorithm:
Best Path: [0, 0, 0]
Best Path Probability: 0.1242
Viterbi Matrix:
 [[0.792   0.15682 0.1242 ]
 [0.001   0.00713 0.00071]]


In [62]:
forward_probabilities, _ = forward_algorithm(
    observations=observations, P=Transition, Pi=Pi, E=Emission
)
backward_probabilites,_=backward_algorithm(observations,Transition,Emission)
print(forward_probabilities, backward_probabilites,sep='\n\n')

Alpha at time 1:
[0.792 0.001]

Alpha at time 2:
[0.156818 0.008019]

Alpha at time 3:
[0.12426401 0.0009507 ]

Final Forward Matrix: 
[[0.792      0.156818   0.12426401]
 [0.001      0.008019   0.0009507 ]]

Likelihood of the sequence: 0.125214707
Beta at time 3:
[1. 1.]

Beta at time 2:
[0.793 0.107]

Beta at time 1:
[0.157977 0.096923]

Final Backward Matrix: 
[[0.157977 0.793    1.      ]
 [0.096923 0.107    1.      ]]

Likelihood of the sequence: 0.07685993900000002
[[0.792      0.156818   0.12426401]
 [0.001      0.008019   0.0009507 ]]

[[0.157977 0.793    1.      ]
 [0.096923 0.107    1.      ]]


In [63]:
import numpy as np
def baum_welch(
    Pi,
    Emission,
    Transition,
    observations,
    num_iterations=100,
):
    state_numbers = len(Pi)
    # Iterate through the Baum-Welch algorithm
    for iteration in range(num_iterations):
        # E-step: Use forward and backward algorithms to calculate expected counts
        forward_matrix, forward_likelihood = forward_algorithm(
            observations, Transition, Emission, Pi
        )
        backward_matrix, backward_likelihood = backward_algorithm(
            observations, Transition, Emission
        )

        # M-step: Update parameters based on expected counts
        # Update initial state probabilities
        Pi = forward_matrix[:, 0] * backward_matrix[:, 0] / forward_likelihood

        # Update transition probabilities
        for i in range(state_numbers):
            for j in range(state_numbers):
                numer = sum(
                    forward_matrix[i, t]
                    * Transition[i, j]
                    * Emission[j, observations[t + 1]]
                    * backward_matrix[j, t + 1]
                    for t in range(len(observations) - 1)
                )
                denom = sum(
                    forward_matrix[i, t] * backward_matrix[i, t]
                    for t in range(len(observations) - 1)
                )
                Transition[i, j] = numer / denom

        # Update emission probabilities
        for j in range(state_numbers):
            for k in range(state_numbers):
                numer = sum(
                    forward_matrix[i, t] * backward_matrix[i, t]
                    if observations[t] == k
                    else 0
                    for t in range(len(observations) - 1)
                )
                denom = sum(
                    forward_matrix[i, t] * backward_matrix[i, t]
                    for t in range(len(observations) - 1)
                )
                Emission[j, k] = numer / denom

    return Pi, Transition, Emission


Pi, Transition, Emission = baum_welch(Pi, Emission, Transition, observations)

print("\nFinal Parameters:")
print(f"Initial State Probabilities: {Pi}")
print(f"Transition Probabilities:\n{Transition}")
print(f"Emission Probabilities:\n{Emission}")

Alpha at time 1:
[0.792 0.001]

Alpha at time 2:
[0.156818 0.008019]

Alpha at time 3:
[0.12426401 0.0009507 ]

Final Forward Matrix: 
[[0.792      0.156818   0.12426401]
 [0.001      0.008019   0.0009507 ]]

Likelihood of the sequence: 0.125214707
Beta at time 3:
[1. 1.]

Beta at time 2:
[0.793 0.107]

Beta at time 1:
[0.157977 0.096923]

Final Backward Matrix: 
[[0.157977 0.793    1.      ]
 [0.096923 0.107    1.      ]]

Likelihood of the sequence: 0.07685993900000002
Alpha at time 1:
[0.10141617 0.00007856]

Alpha at time 2:
[0.09079196 0.00040159]

Alpha at time 3:
[0.00918375 0.00007192]

Final Forward Matrix: 
[[0.10141617 0.09079196 0.00918375]
 [0.00007856 0.00040159 0.00007192]]

Likelihood of the sequence: 0.009255664463869802
Beta at time 3:
[1. 1.]

Beta at time 2:
[0.10149473 0.10149473]

Beta at time 1:
[0.09119355 0.09119355]

Final Backward Matrix: 
[[0.09119355 0.10149473 1.        ]
 [0.09119355 0.10149473 1.        ]]

Likelihood of the sequence: 0.00925566446386980