# Lab 4: Markov Models and Applications

## Learning Objectives

By the end of this lab, you will:
- Understand Markov chains and the Markov property
- Build and analyze Hidden Markov Models (HMMs)
- Implement forward-backward algorithm
- Apply Viterbi algorithm for sequence decoding
- Use HMMs for time series prediction
- Apply to real-world sequential data

## What are Markov Models?

**Markov Models** handle sequential data where:
- Current state depends on previous state(s)
- Future is independent of past given present

**Markov Property**: "The future is independent of the past given the present"

$$P(X_t | X_1, ..., X_{t-1}) = P(X_t | X_{t-1})$$

## Types of Markov Models

1. **Markov Chain**: States are fully observable
2. **Hidden Markov Model (HMM)**: States are hidden, we only see observations

## Real-World Applications

- 🗣️ **Speech Recognition**: Audio → words
- 🌤️ **Weather Prediction**: Today's weather → tomorrow's
- 📈 **Finance**: Market states and price movements
- 🧬 **Bioinformatics**: DNA sequences and gene finding
- 📝 **Natural Language**: Part-of-speech tagging
- 🤖 **Robotics**: Robot localization and tracking


In [None]:
# Import libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
import pandas as pd

# Set random seed
np.random.seed(42)

# Plot settings
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)

## Part 1: Markov Chains

### Components of a Markov Chain

1. **States**: S = {s₁, s₂, ..., sₙ}
2. **Initial Distribution**: π = P(X₀ = sᵢ)
3. **Transition Matrix**: A where Aᵢⱼ = P(Xₜ = sⱼ | Xₜ₋₁ = sᵢ)

**Properties**:
- Each row of A sums to 1 (probability distribution)
- Aᵢⱼ ≥ 0 for all i, j


In [None]:
class MarkovChain:
    """A simple Markov Chain implementation."""
    
    def __init__(self, states: List[str], 
                 transition_matrix: np.ndarray,
                 initial_distribution: np.ndarray = None):
        """
        Initialize Markov Chain.
        
        Args:
            states: List of state names
            transition_matrix: Transition probability matrix
            initial_distribution: Initial state probabilities
        """
        self.states = states
        self.n_states = len(states)
        self.transition_matrix = transition_matrix
        
        if initial_distribution is None:
            self.initial_distribution = np.ones(self.n_states) / self.n_states
        else:
            self.initial_distribution = initial_distribution
        
        # Validate
        assert transition_matrix.shape == (self.n_states, self.n_states)
        assert np.allclose(transition_matrix.sum(axis=1), 1.0)
        assert np.allclose(self.initial_distribution.sum(), 1.0)
    
    def next_state(self, current_state: str) -> str:
        """Sample next state given current state."""
        current_idx = self.states.index(current_state)
        next_idx = np.random.choice(self.n_states, p=self.transition_matrix[current_idx])
        return self.states[next_idx]
    
    def generate_sequence(self, length: int) -> List[str]:
        """Generate a sequence of states."""
        sequence = []
        
        # Sample initial state
        current = np.random.choice(self.states, p=self.initial_distribution)
        sequence.append(current)
        
        # Generate sequence
        for _ in range(length - 1):
            current = self.next_state(current)
            sequence.append(current)
        
        return sequence
    
    def state_distribution(self, steps: int) -> np.ndarray:
        """Calculate state distribution after n steps."""
        distribution = self.initial_distribution.copy()
        for _ in range(steps):
            distribution = distribution @ self.transition_matrix
        return distribution
    
    def stationary_distribution(self, max_iter: int = 1000) -> np.ndarray:
        """Find stationary distribution (if it exists)."""
        distribution = self.initial_distribution.copy()
        for _ in range(max_iter):
            new_distribution = distribution @ self.transition_matrix
            if np.allclose(distribution, new_distribution):
                return new_distribution
            distribution = new_distribution
        return distribution
    
    def visualize_transition_matrix(self):
        """Visualize transition probabilities."""
        plt.figure(figsize=(10, 8))
        sns.heatmap(self.transition_matrix, annot=True, fmt='.2f',
                   xticklabels=self.states, yticklabels=self.states,
                   cmap='YlOrRd', cbar_kws={'label': 'Probability'})
        plt.title('Transition Probability Matrix', fontsize=14, fontweight='bold')
        plt.xlabel('Next State', fontsize=12, fontweight='bold')
        plt.ylabel('Current State', fontsize=12, fontweight='bold')
        plt.tight_layout()
        plt.show()


# Example: Weather Markov Chain
print("Weather Markov Chain Example")
print("=" * 60)

states = ['Sunny', 'Cloudy', 'Rainy']

# Transition matrix
# Rows: current state, Columns: next state
transition = np.array([
    [0.7, 0.2, 0.1],  # Sunny → Sunny, Cloudy, Rainy
    [0.3, 0.4, 0.3],  # Cloudy → Sunny, Cloudy, Rainy
    [0.2, 0.3, 0.5]   # Rainy → Sunny, Cloudy, Rainy
])

initial = np.array([0.5, 0.3, 0.2])

weather_mc = MarkovChain(states, transition, initial)

# Generate a sequence
sequence = weather_mc.generate_sequence(30)
print(f"Generated weather sequence (30 days):")
print(' → '.join(sequence[:15]))
print(' → '.join(sequence[15:]))
print()

# Calculate distribution after steps
for steps in [1, 5, 10, 50]:
    dist = weather_mc.state_distribution(steps)
    print(f"After {steps:2d} steps:")
    for state, prob in zip(states, dist):
        print(f"  P({state}) = {prob:.4f}")
    print()

# Stationary distribution
stationary = weather_mc.stationary_distribution()
print("Stationary distribution (long-run behavior):")
for state, prob in zip(states, stationary):
    print(f"  P({state}) = {prob:.4f}")

# Visualize
weather_mc.visualize_transition_matrix()

## Part 2: Hidden Markov Models (HMMs)

### What makes it "Hidden"?

In HMMs:
- **States** are hidden (not directly observable)
- **Observations** are emitted from states (what we see)

### HMM Components

1. **Hidden States**: S = {s₁, ..., sₙ}
2. **Observations**: O = {o₁, ..., oₘ}
3. **Initial Distribution**: π
4. **Transition Matrix**: A (between hidden states)
5. **Emission Matrix**: B where Bᵢⱼ = P(observe oⱼ | state sᵢ)

### Three Fundamental Problems

1. **Evaluation**: Given observations, what's P(observations | model)?
2. **Decoding**: Given observations, what's the most likely state sequence?
3. **Learning**: Given observations, what are the best model parameters?


In [None]:
class HiddenMarkovModel:
    """Hidden Markov Model implementation."""
    
    def __init__(self, states: List[str], observations: List[str],
                 transition_matrix: np.ndarray,
                 emission_matrix: np.ndarray,
                 initial_distribution: np.ndarray = None):
        """
        Initialize HMM.
        
        Args:
            states: Hidden state names
            observations: Observation names
            transition_matrix: State transition probabilities
            emission_matrix: Observation emission probabilities
            initial_distribution: Initial state probabilities
        """
        self.states = states
        self.observations = observations
        self.n_states = len(states)
        self.n_obs = len(observations)
        
        self.A = transition_matrix  # Transition
        self.B = emission_matrix    # Emission
        
        if initial_distribution is None:
            self.pi = np.ones(self.n_states) / self.n_states
        else:
            self.pi = initial_distribution
        
        # Validate
        assert self.A.shape == (self.n_states, self.n_states)
        assert self.B.shape == (self.n_states, self.n_obs)
        assert np.allclose(self.A.sum(axis=1), 1.0)
        assert np.allclose(self.B.sum(axis=1), 1.0)
        assert np.allclose(self.pi.sum(), 1.0)
    
    def generate_sequence(self, length: int) -> Tuple[List[str], List[str]]:
        """
        Generate a sequence of states and observations.
        
        Returns:
            (state_sequence, observation_sequence)
        """
        states_seq = []
        obs_seq = []
        
        # Initial state
        state_idx = np.random.choice(self.n_states, p=self.pi)
        states_seq.append(self.states[state_idx])
        
        # Initial observation
        obs_idx = np.random.choice(self.n_obs, p=self.B[state_idx])
        obs_seq.append(self.observations[obs_idx])
        
        # Generate sequence
        for _ in range(length - 1):
            # Next state
            state_idx = np.random.choice(self.n_states, p=self.A[state_idx])
            states_seq.append(self.states[state_idx])
            
            # Observation from new state
            obs_idx = np.random.choice(self.n_obs, p=self.B[state_idx])
            obs_seq.append(self.observations[obs_idx])
        
        return states_seq, obs_seq
    
    def forward(self, observations: List[str]) -> Tuple[np.ndarray, float]:
        """
        Forward algorithm to compute P(observations).
        
        Args:
            observations: Sequence of observations
        
        Returns:
            (alpha matrix, total probability)
        """
        T = len(observations)
        alpha = np.zeros((T, self.n_states))
        
        # Initialization
        obs_idx = self.observations.index(observations[0])
        alpha[0] = self.pi * self.B[:, obs_idx]
        
        # Recursion
        for t in range(1, T):
            obs_idx = self.observations.index(observations[t])
            for j in range(self.n_states):
                alpha[t, j] = np.sum(alpha[t-1] * self.A[:, j]) * self.B[j, obs_idx]
        
        # Termination
        prob = np.sum(alpha[T-1])
        
        return alpha, prob
    
    def viterbi(self, observations: List[str]) -> Tuple[List[str], float]:
        """
        Viterbi algorithm to find most likely state sequence.
        
        Args:
            observations: Sequence of observations
        
        Returns:
            (most_likely_states, probability)
        """
        T = len(observations)
        delta = np.zeros((T, self.n_states))
        psi = np.zeros((T, self.n_states), dtype=int)
        
        # Initialization
        obs_idx = self.observations.index(observations[0])
        delta[0] = self.pi * self.B[:, obs_idx]
        
        # Recursion
        for t in range(1, T):
            obs_idx = self.observations.index(observations[t])
            for j in range(self.n_states):
                probs = delta[t-1] * self.A[:, j]
                psi[t, j] = np.argmax(probs)
                delta[t, j] = np.max(probs) * self.B[j, obs_idx]
        
        # Backtracking
        path = [0] * T
        path[T-1] = np.argmax(delta[T-1])
        
        for t in range(T-2, -1, -1):
            path[t] = psi[t+1, path[t+1]]
        
        # Convert to state names
        state_path = [self.states[i] for i in path]
        prob = np.max(delta[T-1])
        
        return state_path, prob
    
    def __repr__(self):
        return f"HMM({self.n_states} states, {self.n_obs} observations)"


# Example: Dishonest Casino
print("Dishonest Casino HMM")
print("=" * 60)
print("A casino has two dice:")
print("- Fair die: Equal probability for all numbers")
print("- Loaded die: Biased towards 6")
print("The dealer secretly switches between them!")
print()

states = ['Fair', 'Loaded']
observations = ['1', '2', '3', '4', '5', '6']

# Transition: dealer switches dice occasionally
transition = np.array([
    [0.95, 0.05],  # Fair → Fair, Loaded
    [0.10, 0.90]   # Loaded → Fair, Loaded
])

# Emission: probability of each number
emission = np.array([
    [1/6, 1/6, 1/6, 1/6, 1/6, 1/6],  # Fair: uniform
    [1/10, 1/10, 1/10, 1/10, 1/10, 1/2]  # Loaded: biased to 6
])

initial = np.array([0.5, 0.5])

casino_hmm = HiddenMarkovModel(states, observations, transition, emission, initial)

# Generate a sequence
true_states, rolls = casino_hmm.generate_sequence(30)
print("Generated rolls:")
print(' '.join(rolls))
print()
print("True states (hidden):")
print(' '.join([s[0] for s in true_states]))
print()

# Use Viterbi to infer states
inferred_states, prob = casino_hmm.viterbi(rolls)
print("Inferred states:")
print(' '.join([s[0] for s in inferred_states]))
print(f"\nProbability: {prob:.2e}")

# Calculate accuracy
correct = sum(1 for t, i in zip(true_states, inferred_states) if t == i)
accuracy = correct / len(true_states)
print(f"Accuracy: {accuracy*100:.1f}%")

## Part 3: Weather Prediction with HMM

Let's build a practical weather prediction system!


In [None]:
print("Weather Prediction HMM")
print("=" * 60)

# Hidden states: actual weather conditions
weather_states = ['Sunny', 'Rainy']

# Observations: what we see (temperature, humidity)
weather_obs = ['Hot', 'Warm', 'Cold']

# State transitions
weather_transition = np.array([
    [0.8, 0.2],  # Sunny → Sunny, Rainy
    [0.4, 0.6]   # Rainy → Sunny, Rainy
])

# Emission probabilities
weather_emission = np.array([
    [0.6, 0.3, 0.1],  # Sunny → Hot, Warm, Cold
    [0.1, 0.4, 0.5]   # Rainy → Hot, Warm, Cold
])

weather_initial = np.array([0.6, 0.4])

weather_hmm = HiddenMarkovModel(
    weather_states, weather_obs,
    weather_transition, weather_emission, weather_initial
)

# Simulate a week of observations
true_weather, observations = weather_hmm.generate_sequence(7)

print("Week of observations:")
for day, obs in enumerate(observations, 1):
    print(f"  Day {day}: Temperature is {obs}")
print()

# Infer actual weather
inferred_weather, _ = weather_hmm.viterbi(observations)

print("Comparison:")
print("Day | Observation | True Weather | Inferred")
print("-" * 50)
for day, (obs, true, inf) in enumerate(zip(observations, true_weather, inferred_weather), 1):
    match = "✓" if true == inf else "✗"
    print(f" {day}  |    {obs:5s}    |   {true:6s}    | {inf:6s} {match}")

# Calculate forward probabilities
alpha, total_prob = weather_hmm.forward(observations)

print(f"\nP(observations | model) = {total_prob:.6e}")

## Part 4: Part-of-Speech Tagging

A classic NLP application of HMMs!


In [None]:
print("Part-of-Speech Tagging with HMM")
print("=" * 60)

# Hidden states: POS tags
pos_tags = ['NOUN', 'VERB', 'ADJ']

# Observations: words
words = ['dog', 'cat', 'run', 'jump', 'big', 'small']

# Tag transitions (simplified)
pos_transition = np.array([
    [0.3, 0.5, 0.2],  # NOUN → NOUN, VERB, ADJ
    [0.6, 0.1, 0.3],  # VERB → NOUN, VERB, ADJ
    [0.8, 0.1, 0.1]   # ADJ → NOUN, VERB, ADJ
])

# Emission probabilities: P(word | tag)
pos_emission = np.array([
    [0.4, 0.4, 0.05, 0.05, 0.05, 0.05],  # NOUN
    [0.05, 0.05, 0.4, 0.4, 0.05, 0.05],  # VERB
    [0.05, 0.05, 0.05, 0.05, 0.4, 0.4]   # ADJ
])

pos_initial = np.array([0.5, 0.3, 0.2])

pos_hmm = HiddenMarkovModel(
    pos_tags, words,
    pos_transition, pos_emission, pos_initial
)

# Tag some sentences
sentences = [
    ['big', 'dog', 'run'],
    ['small', 'cat', 'jump'],
    ['dog', 'jump']
]

print("Tagging sentences:\n")
for sentence in sentences:
    tags, prob = pos_hmm.viterbi(sentence)
    print(" ".join(sentence))
    print(" ".join(tags))
    print(f"Log probability: {np.log(prob):.2f}")
    print()

## Part 5: Time Series Analysis

Using HMMs for market regime detection.


In [None]:
print("Market Regime Detection")
print("=" * 60)

# Hidden states: market regimes
regimes = ['Bull', 'Bear', 'Sideways']

# Observations: daily returns (simplified)
returns = ['Up', 'Down', 'Flat']

# Regime transitions
regime_transition = np.array([
    [0.7, 0.2, 0.1],   # Bull
    [0.2, 0.7, 0.1],   # Bear
    [0.3, 0.3, 0.4]    # Sideways
])

# Return distributions per regime
regime_emission = np.array([
    [0.6, 0.2, 0.2],   # Bull: mostly up
    [0.2, 0.6, 0.2],   # Bear: mostly down
    [0.3, 0.3, 0.4]    # Sideways: mostly flat
])

regime_initial = np.array([0.4, 0.3, 0.3])

market_hmm = HiddenMarkovModel(
    regimes, returns,
    regime_transition, regime_emission, regime_initial
)

# Generate market data
true_regimes, observed_returns = market_hmm.generate_sequence(50)

# Infer regimes
inferred_regimes, _ = market_hmm.viterbi(observed_returns)

# Visualize
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), sharex=True)

# Encode states as numbers for plotting
regime_map = {'Bull': 2, 'Sideways': 1, 'Bear': 0}
true_encoded = [regime_map[r] for r in true_regimes]
inferred_encoded = [regime_map[r] for r in inferred_regimes]

days = range(len(true_regimes))

# True regimes
ax1.plot(days, true_encoded, 'o-', linewidth=2, markersize=6, label='True Regime')
ax1.set_yticks([0, 1, 2])
ax1.set_yticklabels(['Bear', 'Sideways', 'Bull'])
ax1.set_ylabel('Market Regime', fontweight='bold')
ax1.set_title('True Market Regimes', fontweight='bold', fontsize=13)
ax1.grid(alpha=0.3)
ax1.legend()

# Inferred regimes
ax2.plot(days, inferred_encoded, 's-', linewidth=2, markersize=6, 
        color='orange', label='Inferred Regime')
ax2.set_yticks([0, 1, 2])
ax2.set_yticklabels(['Bear', 'Sideways', 'Bull'])
ax2.set_xlabel('Day', fontweight='bold')
ax2.set_ylabel('Market Regime', fontweight='bold')
ax2.set_title('Inferred Market Regimes (Viterbi)', fontweight='bold', fontsize=13)
ax2.grid(alpha=0.3)
ax2.legend()

plt.tight_layout()
plt.show()

# Calculate accuracy
accuracy = sum(1 for t, i in zip(true_regimes, inferred_regimes) if t == i) / len(true_regimes)
print(f"\nRegime detection accuracy: {accuracy*100:.1f}%")

## Part 6: Using hmmlearn Library

Let's use a professional library for more sophisticated HMMs.


In [None]:
try:
    from hmmlearn import hmm
    HMMLEARN_AVAILABLE = True
except ImportError:
    print("hmmlearn not installed. Run: pip install hmmlearn")
    HMMLEARN_AVAILABLE = False

if HMMLEARN_AVAILABLE:
    print("Stock Price Modeling with hmmlearn")
    print("=" * 60)
    
    # Generate synthetic stock returns
    np.random.seed(42)
    n_samples = 200
    
    # Two regimes: high volatility and low volatility
    regime = np.random.choice([0, 1], size=n_samples, p=[0.7, 0.3])
    returns = np.zeros(n_samples)
    
    for i in range(n_samples):
        if regime[i] == 0:  # Low volatility
            returns[i] = np.random.normal(0.001, 0.01)
        else:  # High volatility
            returns[i] = np.random.normal(-0.002, 0.05)
    
    # Reshape for hmmlearn
    X = returns.reshape(-1, 1)
    
    # Fit Gaussian HMM
    model = hmm.GaussianHMM(n_components=2, covariance_type="full", n_iter=100)
    model.fit(X)
    
    # Predict hidden states
    hidden_states = model.predict(X)
    
    print("Model Parameters:")
    print(f"Transition matrix:\n{model.transmat_}")
    print(f"\nMeans: {model.means_.flatten()}")
    print(f"Variances: {np.sqrt(model.covars_.flatten())}")
    print()
    
    # Visualize
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
    
    # Returns
    ax1.plot(returns, alpha=0.7)
    ax1.set_ylabel('Returns', fontweight='bold')
    ax1.set_title('Stock Returns', fontweight='bold', fontsize=13)
    ax1.grid(alpha=0.3)
    
    # Hidden states
    ax2.plot(hidden_states, drawstyle='steps-post', linewidth=2)
    ax2.set_xlabel('Time', fontweight='bold')
    ax2.set_ylabel('Hidden State', fontweight='bold')
    ax2.set_title('Inferred Volatility Regime', fontweight='bold', fontsize=13)
    ax2.set_yticks([0, 1])
    ax2.set_yticklabels(['Low Vol', 'High Vol'])
    ax2.grid(alpha=0.3)
    
    plt.tight_layout()
    plt.show()
else:
    print("Install hmmlearn to run this section: pip install hmmlearn")

## Exercises

### Exercise 1: Robot Localization
Build an HMM where:
- Hidden states: Robot's actual position (Left, Center, Right)
- Observations: Noisy sensor readings
Use Viterbi to track the robot's path.

In [None]:
# TODO: Build robot localization HMM
# Your code here
pass

### Exercise 2: Sentiment Analysis
Create an HMM for sentiment tracking:
- Hidden states: Sentiment (Positive, Negative, Neutral)
- Observations: Words in reviews
Track sentiment evolution in a sequence of reviews.

In [None]:
# TODO: Build sentiment tracking HMM
# Your code here
pass

### Exercise 3: Compare Algorithms
Compare forward algorithm vs Viterbi on the same observation sequence.
Explain when each is appropriate.

In [None]:
# TODO: Compare forward and Viterbi algorithms
# Your code here
pass

## Summary

### Key Takeaways

1. **Markov Property** - Future independent of past given present
2. **Markov Chains** - Simple sequential models with observable states
3. **Hidden Markov Models** - States hidden, observations visible
4. **Forward Algorithm** - Calculate probability of observation sequence
5. **Viterbi Algorithm** - Find most likely state sequence
6. **Real Applications** - Speech, NLP, finance, robotics

### When to Use HMMs

**Good for**:
- Sequential data with temporal dependencies
- Hidden structure you want to infer
- Well-defined state spaces
- Time series with regime changes

**Not ideal for**:
- Long-range dependencies (use RNNs/LSTMs)
- Very high-dimensional observations
- Non-Markovian processes

### Looking Ahead

You've now completed Week 3: Uncertainty!

Next steps:
- **Week 4**: Optimization - Local search, genetic algorithms
- **Week 5**: Machine Learning - Supervised and unsupervised learning
- Apply probabilistic reasoning to learning algorithms
