#  MDP Value Iteration: Battery Agent Example

## Introduction

This module demonstrates the Value Iteration algorithm for solving a Markov Decision Process (MDP) representing a battery management system.
 


In [1]:
import numpy as np
from typing import List, Tuple

In [2]:

class BatteryMDP:
    """
    A Markov Decision Process model for battery charge management.
    
    States:
        0: High Charge
        1: Low Charge
    
    Actions:
        High Charge: [0: Wait, 1: Search]
        Low Charge: [0: Wait, 1: Search, 2: Recharge]
    """
    
    # State constants
    HIGH_CHARGE = 0
    LOW_CHARGE = 1
    NUM_STATES = 2
    
    # Action constants
    WAIT = 0
    SEARCH = 1
    RECHARGE = 2
    
    def __init__(
        self,
        gamma: float = 0.9,
        r_wait: float = 2.0,
        r_search: float = 3.0,
        alpha: float = 0.7,
        beta: float = 0.4,
        epsilon: float = 1e-6
    ):
        """
        Initialize the Battery MDP.
        
        Args:
            gamma: Discount factor (0 < gamma < 1)
            r_wait: Reward for waiting
            r_search: Reward for searching
            alpha: Success probability for search in high charge
            beta: Success probability for search in low charge
            epsilon: Convergence threshold
        """
        self.gamma = gamma
        self.r_wait = r_wait
        self.r_search = r_search
        self.alpha = alpha
        self.beta = beta
        self.epsilon = epsilon
        
        # Transition probabilities
        self.p_high_wait = 0.4
        self.p_high_search = 0.6
        self.p_low_wait = 0.3
        self.p_low_search = 0.5
        self.p_low_recharge = 0.2
        
        # Number of actions per state
        self.num_actions = [2, 3]  # [High Charge, Low Charge]
        
        # Initialize state values
        self.state_values = np.zeros(self.NUM_STATES)
    
    def compute_expected_value(self, state: int, action: int) -> float:
        """
        Compute the expected value for a state-action pair.
        
        Args:
            state: Current state (0: High, 1: Low)
            action: Action to take
            
        Returns:
            Expected value of taking the action in the given state
        """
        if state == self.HIGH_CHARGE:
            return self._compute_high_charge_value(action)
        else:
            return self._compute_low_charge_value(action)
    
    def _compute_high_charge_value(self, action: int) -> float:
        """Compute expected value for high charge state."""
        if action == self.WAIT:
            # Wait action: stay in high charge with probability p_high_wait
            return self.p_high_wait * (
                self.r_wait + self.gamma * self.state_values[self.HIGH_CHARGE]
            )
        elif action == self.SEARCH:
            # Search action: stay high with prob alpha, go low with prob (1-alpha)
            stay_high = self.alpha * (
                self.r_search + self.gamma * self.state_values[self.HIGH_CHARGE]
            )
            go_low = (1 - self.alpha) * (
                self.r_search + self.gamma * self.state_values[self.LOW_CHARGE]
            )
            return self.p_high_search * (stay_high + go_low)
        return 0.0
    
    def _compute_low_charge_value(self, action: int) -> float:
        """Compute expected value for low charge state."""
        if action == self.WAIT:
            # Wait action: stay in low charge
            return self.p_low_wait * (
                self.r_wait + self.gamma * self.state_values[self.LOW_CHARGE]
            )
        elif action == self.SEARCH:
            # Search action: stay low with prob beta, go high with prob (1-beta)
            # Note: penalty of -3 when transitioning to high (risky search)
            stay_low = self.beta * (
                self.r_search + self.gamma * self.state_values[self.LOW_CHARGE]
            )
            go_high = (1 - self.beta) * (
                -3 + self.gamma * self.state_values[self.HIGH_CHARGE]
            )
            return self.p_low_search * (stay_low + go_high)
        elif action == self.RECHARGE:
            # Recharge action: transition to high charge
            return self.p_low_recharge * (
                self.gamma * self.state_values[self.HIGH_CHARGE]
            )
        return 0.0
    
    def value_iteration(self, verbose: bool = True) -> Tuple[np.ndarray, int]:
        """
        Perform value iteration to find optimal state values.
        
        Args:
            verbose: Whether to print iteration progress
            
        Returns:
            Tuple of (optimal state values, number of iterations)
        """
        iteration = 0
        
        while True:
            new_values = np.copy(self.state_values)
            delta = 0.0
            iteration += 1
            
            # Update each state
            for state in range(self.NUM_STATES):
                # Compute expected value for each action
                action_values = [
                    self.compute_expected_value(state, action)
                    for action in range(self.num_actions[state])
                ]
                
                # Bellman optimality: take max over actions
                new_values[state] = max(action_values)
                
                # Track maximum change
                delta = max(delta, abs(self.state_values[state] - new_values[state]))
            
            self.state_values = new_values
            
            if verbose and iteration % 10 == 0:
                print(f"Iteration {iteration}: δ = {delta:.6f}")
            
            # Check convergence
            if delta < self.epsilon:
                break
        
        if verbose:
            print(f"\n✓ Converged in {iteration} iterations (δ = {delta:.6e})")
        
        return self.state_values, iteration
    
    def extract_policy(self) -> List[int]:
        """
        Extract the optimal policy from converged state values.
        
        Returns:
            List of optimal actions for each state
        """
        policy = []
        
        for state in range(self.NUM_STATES):
            action_values = [
                self.compute_expected_value(state, action)
                for action in range(self.num_actions[state])
            ]
            optimal_action = int(np.argmax(action_values))
            policy.append(optimal_action)
        
        return policy
    
    def print_results(self):
        """Print the optimal state values and policy."""
        state_names = ["High Charge", "Low Charge"]
        action_names = {
            self.WAIT: "Wait",
            self.SEARCH: "Search",
            self.RECHARGE: "Recharge"
        }
        
        print("\n" + "="*60)
        print("OPTIMAL STATE VALUES")
        print("="*60)
        for state in range(self.NUM_STATES):
            print(f"{state_names[state]:15s} V*(s) = {self.state_values[state]:7.4f}")
        
        policy = self.extract_policy()
        print("\n" + "="*60)
        print("OPTIMAL POLICY")
        print("="*60)
        for state in range(self.NUM_STATES):
            action_name = action_names[policy[state]]
            print(f"{state_names[state]:15s} π*(s) = {action_name}")
        print("="*60)


In [3]:
def main():
    """Main execution function."""
    print("MDP Value Iteration: Battery Agent Example")
    print("="*60)
    
    # Create and solve the MDP
    mdp = BatteryMDP(
        gamma=0.9,
        r_wait=2.0,
        r_search=3.0,
        alpha=0.7,
        beta=0.4,
        epsilon=1e-6
    )
    
    print(f"\nInitial State Values: {mdp.state_values}")
    print("\nRunning Value Iteration...")
    
    # Perform value iteration
    optimal_values, num_iterations = mdp.value_iteration(verbose=True)
    
    # Display results
    mdp.print_results()


In [4]:
if __name__ == "__main__":
    main()

MDP Value Iteration: Battery Agent Example

Initial State Values: [0. 0.]

Running Value Iteration...
Iteration 10: δ = 0.000419

✓ Converged in 17 iterations (δ = 4.683466e-07)

OPTIMAL STATE VALUES
High Charge     V*(s) =  3.1080
Low Charge      V*(s) =  0.8219

OPTIMAL POLICY
High Charge     π*(s) = Search
Low Charge      π*(s) = Wait
