# Viterbi Algorithm: Learning and Testing

This notebook provides an interactive learning experience for the Viterbi algorithm, including:
- Theory and intuition behind the algorithm
- Step-by-step implementation
- Practical exercises to test your understanding
- Visualization of the decoding process

## What is the Viterbi Algorithm?

The Viterbi algorithm is a **dynamic programming** algorithm used to find the most likely sequence of hidden states in a Hidden Markov Model (HMM). It's widely used in:

- **Speech Recognition**: Decoding spoken words from acoustic signals
- **Part-of-Speech Tagging**: Finding the most likely sequence of POS tags
- **DNA Sequence Analysis**: Identifying gene regions
- **Error Correction**: Decoding convolutional codes

### Key Concepts

1. **States**: Hidden states we want to discover
2. **Observations**: What we can actually see/measure
3. **Transition Probabilities**: P(next_state | current_state)
4. **Emission Probabilities**: P(observation | state)
5. **Initial Probabilities**: P(starting in each state)

## Imports

In [None]:
import numpy as np
from typing import List, Dict, Tuple
import matplotlib.pyplot as plt
import seaborn as sns

## Hidden Markov Model (HMM) Class

First, let's define a simple HMM class that stores all the parameters we need.

In [None]:
class HiddenMarkovModel:
    """A simple Hidden Markov Model implementation."""
    
    def __init__(self, states: List[str], observations: List[str],
                 start_prob: Dict[str, float],
                 trans_prob: Dict[str, Dict[str, float]],
                 emit_prob: Dict[str, Dict[str, float]]):
        """
        Initialize the HMM.
        
        Args:
            states: List of hidden states
            observations: List of possible observations
            start_prob: Initial state probabilities
            trans_prob: State transition probabilities
            emit_prob: Emission probabilities
        """
        self.states = states
        self.observations = observations
        self.start_prob = start_prob
        self.trans_prob = trans_prob
        self.emit_prob = emit_prob
        
        # Create index mappings
        self.state_idx = {s: i for i, s in enumerate(states)}
        self.obs_idx = {o: i for i, o in enumerate(observations)}
    
    def get_start_prob(self, state: str) -> float:
        return self.start_prob.get(state, 0.0)
    
    def get_trans_prob(self, from_state: str, to_state: str) -> float:
        return self.trans_prob.get(from_state, {}).get(to_state, 0.0)
    
    def get_emit_prob(self, state: str, observation: str) -> float:
        return self.emit_prob.get(state, {}).get(observation, 0.0)

## Example: Weather and Activities

Let's use a classic example: inferring the weather (hidden states) from observed activities.

- **Hidden States**: Sunny, Rainy
- **Observations**: Walk, Shop, Clean

In [None]:
# Define the Weather HMM
states = ["Sunny", "Rainy"]
observations = ["Walk", "Shop", "Clean"]

# Initial probabilities
start_prob = {
    "Sunny": 0.6,
    "Rainy": 0.4
}

# Transition probabilities
trans_prob = {
    "Sunny": {"Sunny": 0.7, "Rainy": 0.3},
    "Rainy": {"Sunny": 0.4, "Rainy": 0.6}
}

# Emission probabilities
emit_prob = {
    "Sunny": {"Walk": 0.6, "Shop": 0.3, "Clean": 0.1},
    "Rainy": {"Walk": 0.1, "Shop": 0.4, "Clean": 0.5}
}

weather_hmm = HiddenMarkovModel(states, observations, start_prob, trans_prob, emit_prob)
print("HMM Created Successfully!")
print(f"States: {weather_hmm.states}")
print(f"Observations: {weather_hmm.observations}")

## The Viterbi Algorithm: Step by Step

The algorithm works in two phases:

### Phase 1: Forward Pass (Trellis Construction)
For each time step t and state s:
```
V[t][s] = max over all previous states s' of:
    V[t-1][s'] * P(s|s') * P(observation[t]|s)
```

### Phase 2: Backtracking
Trace back through the stored pointers to find the optimal path.

In [None]:
def viterbi(hmm: HiddenMarkovModel, obs_sequence: List[str], 
            verbose: bool = False) -> Tuple[List[str], float, np.ndarray]:
    """
    Run the Viterbi algorithm to find the most likely state sequence.
    
    Args:
        hmm: The Hidden Markov Model
        obs_sequence: List of observations
        verbose: If True, print step-by-step details
        
    Returns:
        best_path: Most likely sequence of states
        best_prob: Probability of the best path
        trellis: The Viterbi trellis (for visualization)
    """
    n_states = len(hmm.states)
    n_obs = len(obs_sequence)
    
    # Initialize trellis and backpointer
    trellis = np.zeros((n_states, n_obs))
    backpointer = np.zeros((n_states, n_obs), dtype=int)
    
    # ========================================
    # STEP 1: Initialization (t=0)
    # ========================================
    if verbose:
        print("=" * 50)
        print(f"STEP 1: Initialization for observation '{obs_sequence[0]}'")
        print("=" * 50)
    
    for i, state in enumerate(hmm.states):
        start_p = hmm.get_start_prob(state)
        emit_p = hmm.get_emit_prob(state, obs_sequence[0])
        trellis[i, 0] = start_p * emit_p
        backpointer[i, 0] = -1  # No previous state
        
        if verbose:
            print(f"  State '{state}': P(start) * P('{obs_sequence[0]}') = {start_p} * {emit_p} = {trellis[i, 0]:.4f}")
    
    # ========================================
    # STEP 2: Recursion (t=1 to T-1)
    # ========================================
    for t in range(1, n_obs):
        if verbose:
            print(f"\n{'=' * 50}")
            print(f"STEP 2.{t}: Processing observation '{obs_sequence[t]}' at time {t}")
            print("=" * 50)
        
        for j, curr_state in enumerate(hmm.states):
            max_prob = 0
            max_state = 0
            
            if verbose:
                print(f"\n  Computing for current state '{curr_state}':")
            
            for i, prev_state in enumerate(hmm.states):
                trans_p = hmm.get_trans_prob(prev_state, curr_state)
                prob = trellis[i, t-1] * trans_p
                
                if verbose:
                    print(f"    From '{prev_state}': V[t-1] * P(trans) = {trellis[i, t-1]:.4f} * {trans_p} = {prob:.4f}")
                
                if prob > max_prob:
                    max_prob = prob
                    max_state = i
            
            emit_p = hmm.get_emit_prob(curr_state, obs_sequence[t])
            trellis[j, t] = max_prob * emit_p
            backpointer[j, t] = max_state
            
            if verbose:
                print(f"    Best from '{hmm.states[max_state]}': {max_prob:.4f} * P(emit)={emit_p} = {trellis[j, t]:.4f}")
    
    # ========================================
    # STEP 3: Termination
    # ========================================
    best_last_state = np.argmax(trellis[:, -1])
    best_prob = trellis[best_last_state, -1]
    
    if verbose:
        print(f"\n{'=' * 50}")
        print("STEP 3: Termination")
        print("=" * 50)
        print(f"  Best final state: '{hmm.states[best_last_state]}' with probability {best_prob:.6f}")
    
    # ========================================
    # STEP 4: Backtracking
    # ========================================
    best_path = [hmm.states[best_last_state]]
    current_state = best_last_state
    
    for t in range(n_obs - 1, 0, -1):
        current_state = backpointer[current_state, t]
        best_path.insert(0, hmm.states[current_state])
    
    if verbose:
        print(f"\n{'=' * 50}")
        print("STEP 4: Backtracking")
        print("=" * 50)
        print(f"  Best path: {' -> '.join(best_path)}")
    
    return best_path, best_prob, trellis

## Test the Viterbi Algorithm

Let's trace through the algorithm with a sample observation sequence.

In [None]:
# Test observation sequence
obs_sequence = ["Walk", "Shop", "Clean"]

print("Running Viterbi Algorithm")
print(f"Observations: {obs_sequence}\n")

best_path, best_prob, trellis = viterbi(weather_hmm, obs_sequence, verbose=True)

print("\n" + "=" * 50)
print("FINAL RESULT")
print("=" * 50)
print(f"Most likely weather sequence: {best_path}")
print(f"Probability: {best_prob:.6f}")

## Visualization: Viterbi Trellis

Visualize the trellis to see how probabilities propagate through time.

In [None]:
def visualize_trellis(hmm: HiddenMarkovModel, obs_sequence: List[str], 
                      trellis: np.ndarray, best_path: List[str]):
    """Visualize the Viterbi trellis as a heatmap with the best path highlighted."""
    
    fig, ax = plt.subplots(figsize=(12, 5))
    
    # Create heatmap
    sns.heatmap(trellis, annot=True, fmt=".4f", cmap="YlOrRd",
                xticklabels=obs_sequence, yticklabels=hmm.states,
                ax=ax, cbar_kws={'label': 'Probability'})
    
    # Highlight best path
    for t, state in enumerate(best_path):
        state_idx = hmm.state_idx[state]
        ax.add_patch(plt.Rectangle((t, state_idx), 1, 1, 
                                    fill=False, edgecolor='blue', linewidth=3))
    
    ax.set_xlabel("Observations (Time)")
    ax.set_ylabel("Hidden States")
    ax.set_title("Viterbi Trellis (Blue boxes = Best Path)")
    
    plt.tight_layout()
    plt.show()

visualize_trellis(weather_hmm, obs_sequence, trellis, best_path)

---
# Exercises: Test Your Understanding

Complete the following exercises to test your understanding of the Viterbi algorithm.

## Exercise 1: Manual Calculation

Given the observation sequence `["Clean", "Clean"]`, manually calculate the Viterbi probabilities.

Fill in the `???` values below:

In [None]:
# Exercise 1: Fill in your calculations

# Time t=0, Observation="Clean"
# V[Sunny, 0] = P(start=Sunny) * P(Clean|Sunny) = 0.6 * 0.1 = ???
# V[Rainy, 0] = P(start=Rainy) * P(Clean|Rainy) = 0.4 * 0.5 = ???

v_sunny_0 = None  # Replace with your answer
v_rainy_0 = None  # Replace with your answer

# Time t=1, Observation="Clean"
# V[Sunny, 1] = max(V[Sunny,0]*P(S->S), V[Rainy,0]*P(R->S)) * P(Clean|Sunny)
# V[Rainy, 1] = max(V[Sunny,0]*P(S->R), V[Rainy,0]*P(R->R)) * P(Clean|Rainy)

v_sunny_1 = None  # Replace with your answer
v_rainy_1 = None  # Replace with your answer

# What is the most likely state sequence?
predicted_sequence = None  # Replace with ["???", "???"]

In [None]:
# Check your answers
def check_exercise_1(v_sunny_0, v_rainy_0, v_sunny_1, v_rainy_1, predicted_sequence):
    obs = ["Clean", "Clean"]
    correct_path, correct_prob, correct_trellis = viterbi(weather_hmm, obs, verbose=False)
    
    print("Checking your answers...\n")
    
    all_correct = True
    
    # Check t=0
    if v_sunny_0 is not None and abs(v_sunny_0 - correct_trellis[0, 0]) < 0.001:
        print("V[Sunny, 0]: CORRECT")
    else:
        print(f"V[Sunny, 0]: INCORRECT (Expected: {correct_trellis[0, 0]})")
        all_correct = False
    
    if v_rainy_0 is not None and abs(v_rainy_0 - correct_trellis[1, 0]) < 0.001:
        print("V[Rainy, 0]: CORRECT")
    else:
        print(f"V[Rainy, 0]: INCORRECT (Expected: {correct_trellis[1, 0]})")
        all_correct = False
    
    # Check t=1
    if v_sunny_1 is not None and abs(v_sunny_1 - correct_trellis[0, 1]) < 0.001:
        print("V[Sunny, 1]: CORRECT")
    else:
        print(f"V[Sunny, 1]: INCORRECT (Expected: {correct_trellis[0, 1]})")
        all_correct = False
    
    if v_rainy_1 is not None and abs(v_rainy_1 - correct_trellis[1, 1]) < 0.001:
        print("V[Rainy, 1]: CORRECT")
    else:
        print(f"V[Rainy, 1]: INCORRECT (Expected: {correct_trellis[1, 1]})")
        all_correct = False
    
    # Check sequence
    if predicted_sequence == correct_path:
        print("Predicted sequence: CORRECT")
    else:
        print(f"Predicted sequence: INCORRECT (Expected: {correct_path})")
        all_correct = False
    
    if all_correct:
        print("\nExcellent! All answers are correct!")
    else:
        print("\nSome answers need correction. Try again!")

# Uncomment the line below after filling in your answers
# check_exercise_1(v_sunny_0, v_rainy_0, v_sunny_1, v_rainy_1, predicted_sequence)

## Exercise 2: Implement Log-Space Viterbi

The standard Viterbi can suffer from numerical underflow with long sequences. Implement a log-space version that uses log probabilities instead.

**Hint**: 
- Replace multiplication with addition: `log(a*b) = log(a) + log(b)`
- Use `np.log()` for converting probabilities
- Handle zero probabilities with `-np.inf`

In [None]:
def viterbi_log(hmm: HiddenMarkovModel, obs_sequence: List[str]) -> Tuple[List[str], float]:
    """
    Log-space Viterbi algorithm.
    
    TODO: Implement this function
    
    Args:
        hmm: The Hidden Markov Model
        obs_sequence: List of observations
        
    Returns:
        best_path: Most likely sequence of states
        best_log_prob: Log probability of the best path
    """
    n_states = len(hmm.states)
    n_obs = len(obs_sequence)
    
    # Initialize with log probabilities
    trellis = np.full((n_states, n_obs), -np.inf)
    backpointer = np.zeros((n_states, n_obs), dtype=int)
    
    # TODO: Implement initialization (t=0)
    # Hint: trellis[i, 0] = log(start_prob) + log(emit_prob)
    
    # YOUR CODE HERE
    pass
    
    # TODO: Implement recursion (t=1 to T-1)
    # Hint: Use addition instead of multiplication
    
    # YOUR CODE HERE
    pass
    
    # TODO: Implement termination and backtracking
    
    # YOUR CODE HERE
    best_path = []
    best_log_prob = 0.0
    
    return best_path, best_log_prob

In [None]:
# Test your log-space implementation
def test_viterbi_log():
    obs = ["Walk", "Shop", "Clean"]
    
    # Get reference result
    ref_path, ref_prob, _ = viterbi(weather_hmm, obs)
    
    # Get your result
    your_path, your_log_prob = viterbi_log(weather_hmm, obs)
    
    print(f"Reference path: {ref_path}")
    print(f"Your path: {your_path}")
    print(f"Reference log prob: {np.log(ref_prob):.6f}")
    print(f"Your log prob: {your_log_prob:.6f}")
    
    if your_path == ref_path and abs(your_log_prob - np.log(ref_prob)) < 0.001:
        print("\nCongratulations! Your implementation is correct!")
    else:
        print("\nNot quite right. Check your implementation.")

# Uncomment after implementing
# test_viterbi_log()

## Exercise 3: Part-of-Speech Tagging

Create an HMM for Part-of-Speech tagging and use Viterbi to tag a sentence.

**Task**: Define an HMM with:
- States: `["NOUN", "VERB", "DET", "ADJ"]`
- Some sample words and their emission probabilities
- Reasonable transition probabilities

In [None]:
# Exercise 3: Create a POS tagging HMM

pos_states = ["NOUN", "VERB", "DET", "ADJ"]
words = ["the", "cat", "sat", "big", "dog", "runs", "a", "happy"]

# TODO: Define start probabilities
# Hint: Sentences often start with determiners or nouns
pos_start_prob = {
    "NOUN": None,  # Fill in
    "VERB": None,  # Fill in
    "DET": None,   # Fill in
    "ADJ": None    # Fill in
}

# TODO: Define transition probabilities
# Hint: Think about what typically follows each POS
pos_trans_prob = {
    "NOUN": {"NOUN": None, "VERB": None, "DET": None, "ADJ": None},
    "VERB": {"NOUN": None, "VERB": None, "DET": None, "ADJ": None},
    "DET": {"NOUN": None, "VERB": None, "DET": None, "ADJ": None},
    "ADJ": {"NOUN": None, "VERB": None, "DET": None, "ADJ": None}
}

# TODO: Define emission probabilities
# Hint: "the", "a" -> DET, "cat", "dog" -> NOUN, etc.
pos_emit_prob = {
    "NOUN": {w: None for w in words},
    "VERB": {w: None for w in words},
    "DET": {w: None for w in words},
    "ADJ": {w: None for w in words}
}

# Uncomment after filling in the probabilities
# pos_hmm = HiddenMarkovModel(pos_states, words, pos_start_prob, pos_trans_prob, pos_emit_prob)

In [None]:
# Test your POS tagger
def test_pos_tagger():
    sentences = [
        ["the", "cat", "runs"],
        ["a", "big", "dog", "sat"],
        ["the", "happy", "cat"]
    ]
    
    print("POS Tagging Results:")
    print("=" * 50)
    
    for sentence in sentences:
        path, prob, _ = viterbi(pos_hmm, sentence)
        print(f"\nSentence: {' '.join(sentence)}")
        print(f"POS Tags: {' '.join(path)}")
        print(f"Tagged: {' '.join(f'{w}/{t}' for w, t in zip(sentence, path))}")

# Uncomment after creating the POS HMM
# test_pos_tagger()

## Exercise 4: Compare Viterbi with Greedy Decoding

Implement a greedy decoder that picks the most likely state at each time step independently (without considering the path). Compare the results with Viterbi.

In [None]:
def greedy_decode(hmm: HiddenMarkovModel, obs_sequence: List[str]) -> Tuple[List[str], float]:
    """
    Greedy decoder that picks the most likely state at each step.
    
    TODO: Implement this function
    
    For each observation, pick the state with highest:
    P(observation | state) * P(state)
    
    This ignores transition probabilities between states!
    """
    path = []
    
    # YOUR CODE HERE
    
    return path, 0.0  # Return path and probability

In [None]:
# Compare Viterbi vs Greedy
def compare_decoders():
    test_sequences = [
        ["Walk", "Walk", "Walk"],
        ["Clean", "Clean", "Clean"],
        ["Walk", "Shop", "Clean"],
        ["Clean", "Walk", "Shop", "Walk"]
    ]
    
    print("Comparison: Viterbi vs Greedy Decoding")
    print("=" * 60)
    
    for obs in test_sequences:
        viterbi_path, viterbi_prob, _ = viterbi(weather_hmm, obs)
        greedy_path, greedy_prob = greedy_decode(weather_hmm, obs)
        
        print(f"\nObservations: {obs}")
        print(f"Viterbi: {viterbi_path} (prob: {viterbi_prob:.6f})")
        print(f"Greedy:  {greedy_path} (prob: {greedy_prob:.6f})")
        print(f"Same result: {viterbi_path == greedy_path}")

# Uncomment after implementing greedy_decode
# compare_decoders()

---
## Solutions

Run the cells below to see the solutions (try the exercises first!).

In [None]:
# Solution for Exercise 1
def show_solution_1():
    print("Exercise 1 Solution:")
    print("=" * 50)
    print("\nTime t=0, Observation='Clean':")
    print("V[Sunny, 0] = 0.6 * 0.1 = 0.06")
    print("V[Rainy, 0] = 0.4 * 0.5 = 0.20")
    print("\nTime t=1, Observation='Clean':")
    print("V[Sunny, 1] = max(0.06*0.7, 0.20*0.4) * 0.1 = max(0.042, 0.08) * 0.1 = 0.008")
    print("V[Rainy, 1] = max(0.06*0.3, 0.20*0.6) * 0.5 = max(0.018, 0.12) * 0.5 = 0.060")
    print("\nBest path: ['Rainy', 'Rainy']")
    
    # Verify
    obs = ["Clean", "Clean"]
    path, prob, _ = viterbi(weather_hmm, obs)
    print(f"\nVerification: {path} with prob {prob:.6f}")

# Uncomment to see solution
# show_solution_1()

In [None]:
# Solution for Exercise 2: Log-space Viterbi
def viterbi_log_solution(hmm: HiddenMarkovModel, obs_sequence: List[str]) -> Tuple[List[str], float]:
    """Log-space Viterbi algorithm - Solution."""
    n_states = len(hmm.states)
    n_obs = len(obs_sequence)
    
    trellis = np.full((n_states, n_obs), -np.inf)
    backpointer = np.zeros((n_states, n_obs), dtype=int)
    
    # Initialization
    for i, state in enumerate(hmm.states):
        start_p = hmm.get_start_prob(state)
        emit_p = hmm.get_emit_prob(state, obs_sequence[0])
        if start_p > 0 and emit_p > 0:
            trellis[i, 0] = np.log(start_p) + np.log(emit_p)
    
    # Recursion
    for t in range(1, n_obs):
        for j, curr_state in enumerate(hmm.states):
            max_log_prob = -np.inf
            max_state = 0
            
            for i, prev_state in enumerate(hmm.states):
                trans_p = hmm.get_trans_prob(prev_state, curr_state)
                if trans_p > 0:
                    log_prob = trellis[i, t-1] + np.log(trans_p)
                    if log_prob > max_log_prob:
                        max_log_prob = log_prob
                        max_state = i
            
            emit_p = hmm.get_emit_prob(curr_state, obs_sequence[t])
            if emit_p > 0:
                trellis[j, t] = max_log_prob + np.log(emit_p)
            backpointer[j, t] = max_state
    
    # Termination
    best_last_state = np.argmax(trellis[:, -1])
    best_log_prob = trellis[best_last_state, -1]
    
    # Backtracking
    best_path = [hmm.states[best_last_state]]
    current_state = best_last_state
    
    for t in range(n_obs - 1, 0, -1):
        current_state = backpointer[current_state, t]
        best_path.insert(0, hmm.states[current_state])
    
    return best_path, best_log_prob

# Uncomment to test solution
# obs = ["Walk", "Shop", "Clean"]
# path, log_prob = viterbi_log_solution(weather_hmm, obs)
# print(f"Path: {path}, Log Prob: {log_prob:.6f}")

In [None]:
# Solution for Exercise 4: Greedy Decoder
def greedy_decode_solution(hmm: HiddenMarkovModel, obs_sequence: List[str]) -> Tuple[List[str], float]:
    """Greedy decoder - Solution."""
    path = []
    total_prob = 1.0
    
    for t, obs in enumerate(obs_sequence):
        best_state = None
        best_prob = 0
        
        for state in hmm.states:
            if t == 0:
                prob = hmm.get_start_prob(state) * hmm.get_emit_prob(state, obs)
            else:
                # Use marginal probability (sum over previous states)
                prob = hmm.get_emit_prob(state, obs)
            
            if prob > best_prob:
                best_prob = prob
                best_state = state
        
        path.append(best_state)
        total_prob *= best_prob
    
    return path, total_prob

# Test the solution
def compare_decoders_solution():
    test_sequences = [
        ["Walk", "Walk", "Walk"],
        ["Clean", "Clean", "Clean"],
        ["Walk", "Shop", "Clean"],
    ]
    
    print("Comparison: Viterbi vs Greedy Decoding (Solution)")
    print("=" * 60)
    
    for obs in test_sequences:
        viterbi_path, viterbi_prob, _ = viterbi(weather_hmm, obs)
        greedy_path, greedy_prob = greedy_decode_solution(weather_hmm, obs)
        
        print(f"\nObservations: {obs}")
        print(f"Viterbi: {viterbi_path}")
        print(f"Greedy:  {greedy_path}")
        print(f"Same: {viterbi_path == greedy_path}")

# Uncomment to see solution
# compare_decoders_solution()

---
## Summary

### Key Takeaways:

1. **Viterbi Algorithm** finds the most likely sequence of hidden states given observations
2. **Dynamic Programming** makes it efficient: O(S² × T) instead of O(S^T)
3. **Log-space** implementation prevents numerical underflow
4. **Viterbi vs Greedy**: Viterbi considers the entire path, greedy only local decisions

### Common Applications:
- Speech Recognition
- Part-of-Speech Tagging
- Named Entity Recognition
- DNA Sequence Analysis
- Error Correction in Communications