In [37]:
from typing import Dict, Optional, List
import numpy as np

class HMMState:
    def __init__(self, mean: List[np.ndarray], covariance: List[np.ndarray], transition: Dict["HMMState", float], label: Optional[int] = None, parent: Optional["HMMState"] = None):
        self.mean = mean
        """n_gaussians of mean vectors."""
        self.covariance = covariance
        """n_gaussians of diagonal of covariance matrix."""
        self.transition = transition
        """Transition probability to other HMMState instances."""
        self.label = label
        """The digit associated with the state. `None` if the state is the first state."""
        self.parent = parent
        """The state is the first state if the `parent` is `None`."""

    @classmethod
    def root(cls):
        """Creates a root HMMState with default parameters."""
        return cls(mean=[], covariance=[], transition={}, label=None)

    def __hash__(self) -> int:
        """Enables HMMState instances to be used as dictionary keys or in sets."""
        return id(self)

    def add_transition(self, state: "HMMState", probability: float):
        """Adds or updates a transition probability to another state."""
        self.transition[state] = probability

    def get_transition_prob(self, state: "HMMState") -> float:
        """Retrieves the transition probability to another state, if defined."""
        return self.transition.get(state, 0.0)
    
    def get_emission_prob(self, observation: np.ndarray) -> float:
        """Calculate the emission probability of an observation for this state,
        considering the state might represent multiple Gaussians."""
        total_prob = 0.0
        n_gaussians = len(self.mean)
        
        for i in range(n_gaussians):
            mean = self.mean[i]
            covariance = self.covariance[i]
            k = mean.shape[0]
            covariance_det = np.prod(covariance)
            covariance_inv = 1 / covariance
            diff = observation - mean
            exponent = -0.5 * np.sum((diff ** 2) * covariance_inv)
            coefficient = 1 / np.sqrt((2 * np.pi) ** k * covariance_det)
            total_prob += coefficient * np.exp(exponent)
        
        # Assuming equal weight for each Gaussian component
        return total_prob / n_gaussians if n_gaussians > 0 else 0

# Example of usage
if __name__ == "__main__":
    # Creating root state
    root_state = HMMState.root()
    
    # Creating another state with example data
    mean_example = [np.array([0.0, 1.0])]
    covariance_example = [np.array([1.0, 1.0])]
    state_example = HMMState(mean=mean_example, covariance=covariance_example, transition={}, label=1)
    
    # Adding a transition from root to example state
    root_state.add_transition(state_example, 0.5)

    print(root_state.get_transition_prob(state_example))  # Example of getting a transition probability


0.5


In [38]:
from typing import List, Tuple, Dict
import numpy as np
from scipy.stats import multivariate_normal

class HMM:
    def __init__(self):
        self.states: List[HMMState] = []
        self.observations: List[np.ndarray] = []
        self.state_index: Dict[HMMState, int] = {}

    def add_state(self, state: HMMState):
        """Adds a state to the HMM."""
        self.states.append(state)
        self.state_index[state] = len(self.states) - 1

    def set_observations(self, observations: List[np.ndarray]):
        """Sets the sequence of observations for the HMM."""
        self.observations = observations

    def viterbi(self, observations):
        """
        Viterbi Algorithm to find the most probable state sequence.
        Args:
            observations (List[np.ndarray]): A list of observation vectors.
        Returns:
            Tuple[List[int], float]: The most likely state sequence and its probability.
        """
        n_states = len(self.states)
        n_observations = len(observations)
        
        # Initialize matrices
        dp = np.zeros((n_states, n_observations))  # Probability matrix
        backpointer = np.zeros((n_states, n_observations), dtype=int)  # Backpointer matrix
        
        # Initial probabilities
        for s in range(n_states):
            dp[s, 0] = self.states[s].get_emission_prob(observations[0]) * (1.0 / n_states)
        
        # Viterbi algorithm
        for t in range(1, n_observations):
            for s in range(n_states):
                (prob, state) = max(
                    (dp[k, t - 1] * self.states[k].get_transition_prob(self.states[s]) * self.states[s].get_emission_prob(observations[t]), k)
                    for k in range(n_states)
                )
                dp[s, t] = prob
                backpointer[s, t] = state
        
        # Reconstruct the state sequence
        last_state = np.argmax(dp[:, -1])
        path = [last_state]
        for t in range(n_observations - 1, 0, -1):
            last_state = backpointer[last_state, t]
            path.insert(0, last_state)
        
        # Convert state indexes to labels
        best_path = [self.states[i].label for i in path]
        max_prob = np.max(dp[:, -1])
        
        return best_path, max_prob
    
    def train(self, sequences, n_iterations=100, convergence_tol=1e-6):
        """
        Train the HMM using the Expectation-Maximization algorithm.
        
        Args:
            sequences (List[List[np.ndarray]]): List of all observation sequences.
            n_iterations (int): Number of iterations to run the EM algorithm.
            convergence_tol (float): The convergence tolerance. Training stops when the change in log-likelihood is less than this value.
        """
        previous_log_likelihood = None
        
        for iteration in range(n_iterations):
            # E Step
            all_alphas = [self._forward(seq) for seq in sequences]
            all_betas = [self._backward(seq) for seq in sequences]
            
            # Compute log-likelihood
            log_likelihood = sum(
                np.log(np.sum(alphas[-1])) for alphas in all_alphas
            )
            
            # Check for convergence
            if previous_log_likelihood is not None and abs(log_likelihood - previous_log_likelihood) < convergence_tol:
                print(f"Model converged after {iteration} iterations.")
                break
            previous_log_likelihood = log_likelihood
            
            # M Step
            self._update_states(sequences, all_alphas, all_betas)
            self._update_transitions(sequences, all_alphas, all_betas)
            
            # Optionally print the log-likelihood every few iterations to monitor the training progress
            if iteration % 10 == 0:
                print(f"Iteration {iteration}: Log Likelihood = {log_likelihood}")
        
        print("Training complete.")

    def _forward(self, obs_seq):
        """
        Forward algorithm for calculating the probability of the observation sequence.

        Args:
            obs_seq (List[np.ndarray]): Observation sequence.

        Returns:
            np.ndarray: Matrix of probabilities (states x observations).
        """
        n_states = len(self.states)
        n_observations = len(obs_seq)
        alpha = np.zeros((n_states, n_observations))

        # Initialization
        for s in range(n_states):
            alpha[s, 0] = self.states[s].get_emission_prob(obs_seq[0]) * (1.0 / n_states)

        # Induction
        for t in range(1, n_observations):
            for s in range(n_states):
                alpha[s, t] = self.states[s].get_emission_prob(obs_seq[t]) * sum(
                    alpha[prev_s, t - 1] * self.states[prev_s].get_transition_prob(self.states[s])
                    for prev_s in range(n_states)
                )
        
        return alpha

    def _backward(self, obs_seq):
        """
        Backward algorithm for calculating the probability of the future observations given current state.

        Args:
            obs_seq (List[np.ndarray]): Observation sequence.

        Returns:
            np.ndarray: Matrix of probabilities (states x observations).
        """
        n_states = len(self.states)
        n_observations = len(obs_seq)
        beta = np.zeros((n_states, n_observations))

        # Initialization
        beta[:, n_observations - 1] = 1  # Set all to 1 for the final probabilities

        # Induction
        for t in range(n_observations - 2, -1, -1):
            for s in range(n_states):
                beta[s, t] = sum(
                    self.states[s].get_transition_prob(self.states[next_s]) *
                    self.states[next_s].get_emission_prob(obs_seq[t + 1]) * beta[next_s, t + 1]
                    for next_s in range(n_states)
                )
        
        return beta

    def _update_states(self, sequences, all_alphas, all_betas):
        """
        Update the emission probabilities (means and covariances) for each state.
        
        Args:
            sequences (List[List[np.ndarray]]): List of all observation sequences.
            all_alphas (List[np.ndarray]): List of alpha matrices from the forward algorithm.
            all_betas (List[np.ndarray]): List of beta matrices from the backward algorithm.
        """
        for s in range(len(self.states)):
            # Accumulators for means and variances
            weighted_sum = 0
            weighted_square_sum = 0
            normalizer = 0

            for seq_idx, obs_seq in enumerate(sequences):
                alphas = all_alphas[seq_idx]
                betas = all_betas[seq_idx]
                for t in range(len(obs_seq)):
                    # Calculate the weight for this observation at time t
                    weight = alphas[s, t] * betas[s, t]
                    # Update the accumulators
                    weighted_sum += weight * obs_seq[t]
                    weighted_square_sum += weight * np.outer(obs_seq[t], obs_seq[t])
                    normalizer += weight

            # Update the means and variances for the state
            if normalizer > 0:
                new_mean = weighted_sum / normalizer
                new_covariance = weighted_square_sum / normalizer - np.outer(new_mean, new_mean)
                self.states[s].mean = [new_mean]  # Assuming a single Gaussian for simplicity
                self.states[s].covariance = [np.diag(new_covariance)]  # Assuming diagonal covariance for simplicity

    def _update_transitions(self, sequences, all_alphas, all_betas):
        """
        Update the transition probabilities between states.
        
        Args:
            sequences (List[List[np.ndarray]]): List of all observation sequences.
            all_alphas (List[np.ndarray]): List of alpha matrices from the forward algorithm.
            all_betas (List[np.ndarray]): List of beta matrices from the backward algorithm.
        """
        for s in range(len(self.states)):
            for next_s in range(len(self.states)):
                numerator = 0
                denominator = 0

                for seq_idx, obs_seq in enumerate(sequences):
                    alphas = all_alphas[seq_idx]
                    betas = all_betas[seq_idx]
                    for t in range(len(obs_seq) - 1):
                        numerator += (alphas[s, t] *
                                    self.states[s].get_transition_prob(self.states[next_s]) *
                                    self.states[next_s].get_emission_prob(obs_seq[t + 1]) *
                                    betas[next_s, t + 1])

                        denominator += alphas[s, t] * betas[s, t]

                # Update the transition probability from state s to state next_s
                if denominator > 0:
                    self.states[s].transition[self.states[next_s]] = numerator / denominator

    
    def decode(self, sequence):
        """
        Decode a sequence of observations and return the most probable state sequence using the Viterbi algorithm.
        Args:
            sequence (List[np.ndarray]): An observation sequence.
        Returns:
            List[int]: The most likely state sequence.
        """
        return self.viterbi(sequence)[0]

    def evaluate(self, sequences, labels):
        """
        Evaluate the HMM on a test set.
        Args:
            sequences (List[List[np.ndarray]]): A list of observation sequences.
            labels (List[List[int]]): The true state sequences for each observation sequence.
        Returns:
            float, float: The sentence accuracy and the word accuracy.
        """
        correct_sentences = 0
        correct_words = 0
        total_words = 0

        for obs_seq, true_states in zip(sequences, labels):
            predicted_states = self.decode(obs_seq)

            if predicted_states == true_states:
                correct_sentences += 1

            correct_words += sum(p == t for p, t in zip(predicted_states, true_states))
            total_words += len(true_states)

        sentence_accuracy = correct_sentences / len(sequences)
        word_accuracy = correct_words / total_words

        return sentence_accuracy, word_accuracy




In [39]:
digit_states = {i: HMMState.root() for i in range(10)}

for i in range(10):
    for j in range(10):
        if i != j: 
            digit_states[i].add_transition(digit_states[j], 0.1)

silence_state = HMMState.root()
for state in digit_states.values():
    state.add_transition(silence_state, 0.05) 
    silence_state.add_transition(state, 0.05)

hmm_model = HMM()
for state in digit_states.values():
    hmm_model.add_state(state)
hmm_model.add_state(silence_state)

