# Hidden Markov Model (HMM) with Neo4j Integration

This notebook demonstrates how to:

- **Initialize an HMM and store its structure in Neo4j**
- **Generate observations** from the model
- **Infer the most likely hidden state sequence using the Viterbi algorithm**
- **Train the model parameters using the Baum-Welch algorithm**


## Step 1: Import Libraries and Connect to Neo4j

In this step, we import the required libraries and establish a connection to the Neo4j graph database.

In [10]:
# Import required libraries
from py2neo import Graph   # For interacting with the Neo4j graph database
import numpy as np        # For numerical operations and matrix calculations
import random             # For generating random choices

# Connect to the Neo4j database with the provided credentials
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

## Step 2: Define the Hidden Markov Model (HMM) Class

The `HiddenMarkovModel` class initializes the HMM parameters and stores its structure (hidden states, observable states, transitions, and emissions) in Neo4j.

In [11]:
class HiddenMarkovModel:
    def __init__(self, graph, hidden_states, observable_states, transition_matrix, emission_matrix, initial_distribution):
        """
        Initializes a Hidden Markov Model (HMM) and stores it in a Neo4j database.
        
        Parameters:
          - graph: The Neo4j database connection.
          - hidden_states: List of hidden state labels.
          - observable_states: List of observable state labels.
          - transition_matrix: Matrix representing transition probabilities between hidden states.
          - emission_matrix: Matrix representing emission probabilities from hidden to observable states.
          - initial_distribution: Initial probability distribution for the hidden states.
        """
        self.hidden_states = hidden_states
        self.observable_states = observable_states
        self.transition_matrix = np.array(transition_matrix)
        self.emission_matrix = np.array(emission_matrix)
        self.initial_distribution = np.array(initial_distribution)
        self.graph = graph
        # Store the HMM in Neo4j (clearing any existing data)
        self.create_neo4j_hmm(clear=True, model_name="TrueModel")
    
    def create_neo4j_hmm(self, clear=False, model_name="OriginalModel"):
        """
        Creates the HMM structure in the Neo4j database.
        
        It does the following:
          1. Optionally clears the existing database nodes.
          2. Creates nodes for hidden states with their initial probabilities.
          3. Creates nodes for observable states.
          4. Creates relationships for state transitions and emissions.
        
        Parameters:
          - clear: If True, deletes existing nodes.
          - model_name: Tag to label this version of the model in Neo4j.
        """
        print(f"Storing HMM in Neo4j as {model_name}...")
        if clear:
            self.graph.run("MATCH (n) DETACH DELETE n")

        # Define node and edge labels based on the model name
        node_type = "HiddenState" if model_name == "TrueModel" else f"{model_name}Node"
        edge_type = "TRANSITION" if model_name == "TrueModel" else f"{model_name}Connection"

        # Create hidden state nodes with initial probabilities
        for state, prob in zip(self.hidden_states, self.initial_distribution):
            self.graph.run(f"""
                CREATE (:{node_type} {{name: $name, initial_prob: $prob, model: $model}})
            """, parameters={"name": state, "prob": float(prob), "model": model_name})

        # Create observable state nodes
        for state in self.observable_states:
            self.graph.run(f"""
                CREATE (:ObservableState {{name: $name, model: $model}})
            """, parameters={"name": state, "model": model_name})

        # Create transition relationships between hidden states
        for i, from_state in enumerate(self.hidden_states):
            for j, to_state in enumerate(self.hidden_states):
                prob = float(self.transition_matrix[i][j])
                if prob > 0:
                    self.graph.run(f"""
                        MATCH (a:{node_type} {{name: $from, model: $model}}), (b:{node_type} {{name: $to, model: $model}})
                        CREATE (a)-[:{edge_type} {{probability: $prob}}]->(b)
                    """, parameters={"from": from_state, "to": to_state, "prob": prob, "model": model_name})
        
        # Create emission relationships from hidden states to observable states
        for i, from_state in enumerate(self.hidden_states):
            for j, obs in enumerate(self.observable_states):
                prob = float(self.emission_matrix[i][j])
                if prob > 0:
                    self.graph.run(f"""
                        MATCH (h:{node_type} {{name: $from, model: $model}}), (o:ObservableState {{name: $to, model: $model}})
                        CREATE (h)-[:EMITS {{probability: $prob}}]->(o)
                    """, parameters={"from": from_state, "to": obs, "prob": prob, "model": model_name})
    
    def generate_observations(self, n=10):
        """
        Generates a sequence of observable states based on the HMM.
        
        The process:
          1. Choose an initial hidden state based on the initial distribution.
          2. For each step:
             - Generate an observable state using the emission probabilities.
             - Transition to the next hidden state based on the transition probabilities.
        
        Parameters:
          - n: Number of observations to generate.
        
        Returns:
          - observations: List of generated observable states.
          - order_hidden_states: Corresponding sequence of hidden states.
        """
        observations = []
        order_hidden_states = []
        # Select the initial hidden state
        state = np.random.choice(self.hidden_states, p=self.initial_distribution)
        for _ in range(n):
            # Generate an observable state from the current hidden state's emission distribution
            obs = np.random.choice(self.observable_states, p=self.emission_matrix[self.hidden_states.index(state)])
            observations.append(obs)
            order_hidden_states.append(state)
            # Transition to the next hidden state based on transition probabilities
            state = np.random.choice(self.hidden_states, p=self.transition_matrix[self.hidden_states.index(state)])
        return observations, order_hidden_states

## Step 3: Implement the Viterbi Algorithm

The `Viterbi` class uses dynamic programming to infer the most likely sequence of hidden states given a series of observations.

In [12]:
class Viterbi:
    def __init__(self, hmm):
        self.hmm = hmm
    
    def run(self, observations):
        """
        Applies the Viterbi algorithm to find the most likely hidden state sequence.
        
        Steps:
          1. **Initialization:** Calculate the probability for each hidden state given the first observation.
          2. **Recursion:** For each subsequent observation, update the probabilities by considering the previous state probabilities,
             transition probabilities, and the emission probability of the current observation.
          3. **Backtracking:** Trace back the path with the highest probability to determine the sequence of hidden states.
        
        Parameters:
          - observations: List of observable states.
        
        Returns:
          - most_likely_states: The inferred sequence of hidden states.
        """
        n_states = len(self.hmm.hidden_states)
        n_obs = len(observations)
        # Create a table to store the probability of the most likely path ending in each state at each time
        viterbi_table = np.zeros((n_states, n_obs))
        # Create a table to store the backpointers for the optimal path
        backpointer = np.zeros((n_states, n_obs), dtype=int)

        # Initialization for the first observation
        for s in range(n_states):
            viterbi_table[s, 0] = (self.hmm.initial_distribution[s] *
                                   self.hmm.emission_matrix[s, self.hmm.observable_states.index(observations[0])])

        # Recursion: Fill the viterbi table for each observation from time 1 onward
        for t in range(1, n_obs):
            for s in range(n_states):
                max_prob, max_state = max(
                    (
                        viterbi_table[s_prev, t - 1] *
                        self.hmm.transition_matrix[s_prev, s] *
                        self.hmm.emission_matrix[s, self.hmm.observable_states.index(observations[t])],
                        s_prev
                    )
                    for s_prev in range(n_states)
                )
                viterbi_table[s, t] = max_prob
                backpointer[s, t] = max_state

        # Backtracking: Find the most likely path by tracing back the pointers
        best_last_state = np.argmax(viterbi_table[:, -1])
        best_path = [best_last_state]
        for t in range(n_obs - 1, 0, -1):
            best_last_state = backpointer[best_last_state, t]
            best_path.insert(0, best_last_state)
        
        # Convert state indices to actual state names
        most_likely_states = [self.hmm.hidden_states[i] for i in best_path]
        return most_likely_states

## Step 4: Configure the HMM for a Specific Example

In this example, we define a simple "Paper Bag" problem:

- **Hidden States:** Represent two bags (`A` and `B`).
- **Observable States:** Represent chips (`j` and `k`).
- **Transition Matrix:** Specifies the probability of moving between bags.
- **Emission Matrix:** Specifies the probability of drawing a chip from each bag.
- **Initial Distribution:** Always starts in Bag `A`.

We then generate a sequence of observations and apply the Viterbi algorithm.

In [37]:
# Define HMM parameters for the Paper Bag problem
hidden_states_paperbag = ['A', 'B']          # Hidden states (bags)
observable_states_paperbag = ['j', 'k']        # Observable states (chips)

# Transition probabilities (Bag -> Bag)
transition_matrix_paperbag = [
    [0.40, 0.60],  # In Bag A: 40% chance to remain, 60% chance to switch to Bag B
    [0.80, 0.20]   # In Bag B: 80% chance to remain, 20% chance to switch to Bag A
]

# Emission probabilities (Bag -> Chip)
emission_matrix_paperbag = [
    [2/5, 3/5],  # From Bag A: 40% chance for chip 'j', 60% for chip 'k'
    [1/5, 4/5]   # From Bag B: 20% chance for chip 'j', 80% for chip 'k'
]

# Initial probability distribution (always start in Bag A)
initial_distribution_paperbag = [1.0, 0.0]

# Reconnect to Neo4j (if needed) and initialize the HMM
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
hmm = HiddenMarkovModel(graph, hidden_states_paperbag, observable_states_paperbag,
                        transition_matrix_paperbag, emission_matrix_paperbag, initial_distribution_paperbag)

# Generate a sequence of observations along with the true hidden state order
observations, true_order = hmm.generate_observations(10)
print("\n=== Observations ===")
print(observations)

print("\n=== True Order ===")
print(true_order)

# Run the Viterbi algorithm to infer the hidden state sequence from observations
viterbi_model = Viterbi(hmm)
viterbi_path = viterbi_model.run(observations)
print("\n=== Viterbi Path ===")
print(viterbi_path)

Storing HMM in Neo4j as TrueModel...

=== Observations ===
['j', 'k', 'k', 'j', 'k', 'k', 'k', 'k', 'k', 'k']

=== True Order ===
['A', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'A', 'A']

=== Viterbi Path ===
['A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A']


In [38]:
import numpy as np

# Define HMM parameters for the Paper Bag problem
hidden_states = ['A', 'B']          # Hidden states (bags)
observable_states = ['j', 'k']      # Observable states (chips)

# Transition probabilities (Bag -> Bag)
transition_matrix = np.array([
    [0.40, 0.60],  # In Bag A: 40% chance to remain, 60% chance to switch to Bag B
    [0.80, 0.20]   # In Bag B: 80% chance to remain, 20% chance to switch to Bag A
])

# Emission probabilities (Bag -> Chip)
emission_matrix = np.array([
    [2/5, 3/5],  # From Bag A: 40% chance for chip 'j', 60% for chip 'k'
    [1/5, 4/5]   # From Bag B: 20% chance for chip 'j', 80% for chip 'k'
])

# Initial probability distribution (always start in Bag A)
initial_distribution = np.array([1.0, 0.0])

# Define number of runs and sequence length
num_runs = 10
sequence_length = 10

# Function to generate an observation sequence and true states
def generate_observations():
    true_states = []
    observations = []

    # Start in Bag A
    current_state = 0  # 'A' corresponds to index 0, 'B' to index 1

    for _ in range(sequence_length):
        true_states.append(hidden_states[current_state])
        # Generate observation based on emission probabilities
        observation = np.random.choice(observable_states, p=emission_matrix[current_state])
        observations.append(observation)
        # Transition to next state based on transition probabilities
        current_state = np.random.choice([0, 1], p=transition_matrix[current_state])

    return observations, true_states

# Viterbi Algorithm Implementation
def viterbi(observations):
    num_states = len(hidden_states)
    num_obs = len(observations)
    
    # Viterbi path probabilities and backpointers
    viterbi_probs = np.zeros((num_states, num_obs))
    backpointers = np.zeros((num_states, num_obs), dtype=int)

    # Initialize with initial distribution
    obs_index = observable_states.index(observations[0])
    viterbi_probs[:, 0] = initial_distribution * emission_matrix[:, obs_index]

    # Dynamic programming step
    for t in range(1, num_obs):
        obs_index = observable_states.index(observations[t])
        for s in range(num_states):
            prob_transition = viterbi_probs[:, t - 1] * transition_matrix[:, s]
            best_prev_state = np.argmax(prob_transition)
            viterbi_probs[s, t] = prob_transition[best_prev_state] * emission_matrix[s, obs_index]
            backpointers[s, t] = best_prev_state

    # Backtrack to get the most probable state sequence
    best_final_state = np.argmax(viterbi_probs[:, -1])
    best_path = [best_final_state]

    for t in range(num_obs - 1, 0, -1):
        best_final_state = backpointers[best_final_state, t]
        best_path.insert(0, best_final_state)

    # Convert index path to hidden state labels
    inferred_states = [hidden_states[i] for i in best_path]
    return inferred_states

# Run the experiment 10 times and compute statistics
total_errors = 0
all_errors = []

for _ in range(num_runs):
    observations, true_states = generate_observations()
    inferred_states = viterbi(observations)
    errors = sum(1 for true, inferred in zip(true_states, inferred_states) if true != inferred)
    total_errors += errors
    all_errors.append(errors)

average_errors = total_errors / num_runs
min_errors = min(all_errors)
max_errors = max(all_errors)
std_dev_errors = np.std(all_errors)

print(f'Average number of errors: {average_errors}')
print(f'Minimum number of errors: {min_errors}')
print(f'Maximum number of errors: {max_errors}')
print(f'Standard deviation of errors: {std_dev_errors}')


Average number of errors: 3.3
Minimum number of errors: 1
Maximum number of errors: 7
Standard deviation of errors: 1.9000000000000001


## Step 5: Implement the Baum-Welch Algorithm for Parameter Training

The `BaumWelch` class applies the Expectation-Maximization (EM) technique to update the HMM parameters based on observed data.

### Key Steps in Baum-Welch:
1. **Forward Pass (Alpha):** Calculate the probability of partial observations given the model.
2. **Backward Pass (Beta):** Calculate the probability of future observations given the current state.
3. **Expectation (Gamma and Xi):**
   - **Gamma:** Probability of being in a state at a certain time.
   - **Xi:** Probability of transitioning between states at consecutive times.
4. **Maximization:** Update the initial state distribution, transition matrix, and emission matrix.
5. **Neo4j Update:** Save the updated model in Neo4j under a new model name.

In [27]:
import numpy as np

class BaumWelch:
    def __init__(self, hmm):
        self.hmm = hmm
    
    def run(self, observations, n_iterations=100):
        """
        Runs the Baum-Welch algorithm to train the HMM parameters.
        
        Parameters:
          - observations: List of observable states.
          - n_iterations: Number of iterations to run the EM algorithm.
        
        Returns:
          - Updated transition and emission matrices.
        """
        n_states = len(self.hmm.hidden_states)
        n_obs = len(observations)
        obs_indices = [self.hmm.observable_states.index(obs) for obs in observations]

        for _ in range(n_iterations):
            alpha = np.zeros((n_states, n_obs))
            beta = np.zeros((n_states, n_obs))
            gamma = np.zeros((n_states, n_obs))
            xi = np.zeros((n_states, n_states, n_obs - 1))
            
            # Forward Pass
            alpha[:, 0] = self.hmm.initial_distribution * self.hmm.emission_matrix[:, obs_indices[0]]
            for t in range(1, n_obs):
                alpha[:, t] = (alpha[:, t - 1] @ self.hmm.transition_matrix) * self.hmm.emission_matrix[:, obs_indices[t]]
            
            # Backward Pass
            beta[:, -1] = 1
            for t in range(n_obs - 2, -1, -1):
                beta[:, t] = self.hmm.transition_matrix @ (self.hmm.emission_matrix[:, obs_indices[t + 1]] * beta[:, t + 1])
            
            # Compute Gamma
            gamma = (alpha * beta) / np.sum(alpha * beta, axis=0, keepdims=True)
            
            # Compute Xi
            for t in range(n_obs - 1):
                denominator = np.sum(alpha[:, t] * self.hmm.transition_matrix * self.hmm.emission_matrix[:, obs_indices[t + 1]] * beta[:, t + 1])
                if denominator == 0:
                    denominator = 1e-10  # Avoid division by zero
                xi[:, :, t] = (alpha[:, t, None] * self.hmm.transition_matrix * self.hmm.emission_matrix[:, obs_indices[t + 1]] * beta[:, t + 1]) / denominator
            
            # Update Initial Distribution
            self.hmm.initial_distribution = gamma[:, 0]
            
            # Update Transition Matrix
            denom = np.sum(gamma[:, :-1], axis=1, keepdims=True)
            denom[denom == 0] = 1e-10  # Avoid division by zero
            self.hmm.transition_matrix = np.sum(xi, axis=2) / denom
            
            # Ensure row normalization
            self.hmm.transition_matrix /= self.hmm.transition_matrix.sum(axis=1, keepdims=True)
            
            # Update Emission Matrix
            for k in range(len(self.hmm.observable_states)):
                mask = np.array(obs_indices) == k
                denom = np.sum(gamma, axis=1, keepdims=True)
                denom[denom == 0] = 1e-10  # Avoid division by zero
                self.hmm.emission_matrix[:, k] = np.sum(gamma[:, mask], axis=1) / denom.squeeze()
            
            # Ensure emission matrix row normalization
            self.hmm.emission_matrix /= self.hmm.emission_matrix.sum(axis=1, keepdims=True)
            
        # Store updated parameters in Neo4j
        self.hmm.create_neo4j_hmm(clear=False, model_name="BaumWelch")
        
        return self.hmm.transition_matrix, self.hmm.emission_matrix

In [28]:
# Reinitialize HMM parameters for Baum-Welch training (same setup as before)
hidden_states_paperbag = ['A', 'B']
observable_states_paperbag = ['j', 'k']

transition_matrix_paperbag = [
    [0.40, 0.60],
    [0.80, 0.20]
]

emission_matrix_paperbag = [
    [2/5, 3/5],
    [1/5, 4/5]
]

initial_distribution_paperbag = [1.0, 0.0]

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
hmm = HiddenMarkovModel(graph, hidden_states_paperbag, observable_states_paperbag,
                        transition_matrix_paperbag, emission_matrix_paperbag, initial_distribution_paperbag)

# Generate a longer sequence of observations for training the model
observations, _ = hmm.generate_observations(n=100)
print("Generated Observations:", observations)

# Train the HMM using the Baum-Welch algorithm
baum_welch = BaumWelch(hmm)
updated_transition_matrix, updated_emission_matrix = baum_welch.run(observations)
print("Initial Transition Matrix:\n", transition_matrix_paperbag)
print("Final Updated Transition Matrix:\n", updated_transition_matrix)
print("Initial Emission Matrix:\n", emission_matrix_paperbag)
print("Final Updated Emission Matrix:\n", updated_emission_matrix)

Storing HMM in Neo4j as TrueModel...
Generated Observations: ['k', 'k', 'j', 'k', 'k', 'j', 'j', 'j', 'k', 'k', 'j', 'j', 'j', 'k', 'j', 'k', 'j', 'j', 'k', 'j', 'k', 'k', 'k', 'j', 'j', 'j', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'j', 'k', 'k', 'k', 'j', 'j', 'k', 'k', 'j', 'k', 'k', 'j', 'k', 'j', 'j', 'k', 'k', 'j', 'k', 'k', 'k', 'k', 'k', 'k', 'j', 'j', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'j', 'j', 'k', 'j', 'k', 'k', 'k', 'k', 'j', 'j', 'j', 'k', 'k', 'k', 'k', 'k', 'k', 'j', 'k', 'j', 'k', 'j', 'k', 'k', 'k', 'k', 'j', 'k', 'j', 'k', 'k', 'j']
Storing HMM in Neo4j as BaumWelch...
Initial Transition Matrix:
 [[0.4, 0.6], [0.8, 0.2]]
Final Updated Transition Matrix:
 [[0.32353953 0.67646047]
 [0.80560111 0.19439889]]
Initial Emission Matrix:
 [[0.4, 0.6], [0.2, 0.8]]
Final Updated Emission Matrix:
 [[0.3564481  0.6435519 ]
 [0.36428269 0.63571731]]


## Conclusion

This notebook provided a detailed walkthrough of:

- **Storing an HMM in Neo4j:** Creating nodes for hidden and observable states and linking them with transition and emission relationships.
- **Generating Observations:** Simulating a series of observable outputs based on the HMM parameters.
- **Inference with Viterbi:** Determining the most likely sequence of hidden states given a series of observations.
- **Training with Baum-Welch:** Refining the model parameters using the EM approach and updating the model in Neo4j.

Feel free to adjust or extend this notebook for your specific use case.