## Visualization

As always, we have some code to help us visualize the problem! So let's get that out of the way!

In [2]:
from shutil import get_terminal_size
import random
terminal_width, _ = get_terminal_size()

_visualizers = {}

def _default_visualizer(_, state):
    '''Generic visualizer for unknown problems.'''
    print(state)

class Visualizer:
    '''Visualization and printing functionality encapsulation.'''

    def __init__(self, problem):
        '''Constructor with the problem to visualize.'''
        self.problem = problem
        self.counter = 0

    def visualize(self, frontier):
        '''Visualizes the frontier at every step.'''
        self.counter += 1
        print(f'Frontier at step {self.counter}')
        for state in frontier:
            print()
            _visualizers.get(type(self.problem), _default_visualizer)(self.problem, state)
        print('-' * terminal_width)

def _tom_and_jerry_visualizer(problem, state):
    '''Custom visualizer for Tom and Jerry.'''
    for j in range(problem.bounds[1] - 1, -1, -1):
        for i in range(problem.bounds[0]):
            print(('%8s' if type(state[(i, j)]) is str else '%8.2f') % state[(i, j)], end='\t')
        print()

## Markovian Decision Processes (MDPs)

Great! Now we have yet another way to solve deterministic classical search problems! Why bother? As we mentioned earlier, this is just an introduction to decision processes. In the real world, actions are _far from deterministic_; time passes, weather changes, actuators fail, etc. Even in our simple Tom & Jerry world, as anyone who have watched the show would notice, Tom slips... A LOT. A very famous way to model this stochasticity is Markovian Decision Processes (MDPs).

In [3]:
from IPython.display import Image
Image(url='https://media.giphy.com/media/47xzuq4tImaV0ski1T/giphy-downsized-large.gif')

In an MDP, we assume the new state resulting from applying an action to a state depends only on the state and the action. That is, it does not depend on previous states. While this may seem like a reasonable assumption, it is usually the case, but it does simplify the model considerably. (It is called the Markovian assumption and it is what the _M_ in _MDP_ stands for.)

Using this assumption, we only need to change our _result_ function to be _results_! A function that returns a _distribution_ over the possible next states. (This is called the _transition model_ by the way.)

In [4]:
class MDP:
    '''
    Abstract base class for Markovian Decision Process (MDP) formulation.
    It declares the expected methods to be used to solve it.
    All the methods declared are just placeholders that throw errors if not overriden by child "concrete" classes!
    '''

    def __init__(self):
        '''Constructor that initializes the problem. Typically used to setup the state space.'''
        self.states = set()

    def actions(self, state):
        '''Returns an iterable with the applicable actions to the given state.'''
        raise NotImplementedError

    def results(self, state, action):
        '''Returns a ditribution over all possible resulting states from applying the given action to the given state.'''
        raise NotImplementedError

    def reward(self, state):
        '''Returns the reward of being in a certain state'''
        raise NotImplementedError

Next, we need to modify our previous algorithms to replace anything depending on _the next state_ with a summation over the distribution over all possible next states... and that's it! Here are the famous algorithms to solve an MDP: **value iteration**, **policy iteration**, and **Q-value iteration**!

### Value Iteration

$$U_{i+1}(s) = r_s + \gamma \times \max_{a \in A_s} \sum_{s'} P(s'|s,a) U_{i}(s')$$

$$\pi^*(s) = \argmax_{a \in A_s} \sum_{s'} P(s'|s,a) U_{i}(s')$$

In [5]:
def value_iteration(mdp, init_utility=None, error=1e-6, verbose=False):
    """
    Perform value iteration to solve the given Markov Decision Process (MDP).

    Parameters:
    - mdp: The Markov Decision Process to solve.
    - init_utility: Starting utility values for each state (defaults to 0).
    - error: Threshold for stopping iterations when utilities converge.
    - verbose: If True, display the process step-by-step.

    Returns:
    - utility: A dictionary of final utility values for each state.
    """
    # Initialize visualization if verbose mode is on
    if verbose:
        visualizer = Visualizer(mdp)

    # Initialize utility values for all states
    utility = init_utility or {state: 0 for state in mdp.states}

    # Loop until utilities convergence
    while True:
        # Visualize current utilities if verbose mode is on
        if verbose:
            visualizer.visualize([utility])

        # Make a copy of the current utilities to compare changes later
        old_utility = utility.copy()
        converged = True

        # Update utilities for each state
        for state in mdp.states:
            # Get the immediate reward for the state
            reward = mdp.reward(state)

            # Calculate the maximum expected utility for all actions

            max_utility = float('-inf')
            for action in mdp.actions(state):
                # Calculate the utility of this action by summing over possible outcomes
                action_utility = sum(
                    probability * old_utility[next_state] # p*s' , utility of the previous calculation used to ensure calculations ae consistant
                    for next_state, probability in mdp.results(state, action).items()
                )
                # mdp.results(state, action)-> retrieves the possible outcomes of taking a specific action in a given state.
                # keys -> next state , vlaues -> The probability of ending up in each of those states.
                max_utility = max(max_utility, action_utility)

            # Update the utility of the state
            utility[state] = reward + mdp.discount * max_utility

            # Check if the utility has converged
            if abs(utility[state] - old_utility[state]) > error:
                converged = False

        # If no significant changes occurred, stop the loop
        if converged:
            break

    return utility


def policy_from_values(mdp, utility, verbose=True):
    """
    Derive the optimal policy from the utility values.

    Parameters:
    - mdp: The Markov Decision Process to solve.
    - utility: A dictionary of utility values for each state.
    - verbose: If True, display the derived policy step-by-step.

    Returns:
    - policy: A dictionary mapping each state to its best action.
    """
    # Initialize visualization if verbose mode is on
    if verbose:
        visualizer = Visualizer(mdp)

    # Create an empty policy dictionary
    policy = {}

    # Determine the best action for each state
    for state in mdp.states:
        best_action = None
        best_action_value = float('-inf')

        for action in mdp.actions(state):
            # Calculate the utility of taking this action
            action_value = sum(
                probability * utility[next_state]
                for next_state, probability in mdp.results(state, action).items()
            )

            # Keep track of the action with the highest value
            if action_value > best_action_value:
                best_action = action
                best_action_value = action_value

        # Assign the best action to the state
        policy[state] = best_action

    # Visualize the policy if verbose mode is on
    if verbose:
        visualizer.visualize([policy])

    return policy


### Policy Iteration

$$U_{i+1}(s) = r_s + \gamma \times \sum_{s'} P(s'|s,a) U_{i}(s')$$

$$\pi_{i+1}(s) = \argmax_{a \in A_s} \sum_{s'} P(s'|s,a) U_{i+1}(s')$$

In [6]:
def policy_evaluation(mdp, policy, init_utility=None, error=1e-6, verbose=False):
    """
    Evaluate a given policy to compute utility values for all states.
    """
    # Initialize visualization if verbose mode is enabled
    if verbose:
        visualizer = Visualizer(mdp)

    # Initialize utility values for each state
    utility = init_utility or {state: 0 for state in mdp.states}
    converged = False

    while not converged:
        if verbose:
            visualizer.visualize([utility])

        old_utility = utility.copy()  # Save the current utility values
        converged = True

        # Update utility for each state based on the given policy
        for state in mdp.states:
            action = policy[state]  # Action to follow for the current state
            if action is not None:  # Only calculate if an action exists
                utility[state] = mdp.reward(state) + mdp.discount * sum(
                    probability * old_utility[next_state]
                    for next_state, probability in mdp.results(state, action).items()
                )

                # Check if utility has changed significantly
                if abs(utility[state] - old_utility[state]) > error:
                    converged = False

    return utility  # Return the updated utility values


def policy_iteration(mdp, verbose=False):
    """
    Perform policy iteration to find the optimal policy for the given MDP.
    """
    # Initialize visualization if verbose mode is enabled
    if verbose:
        visualizer = Visualizer(mdp)

    # Initialize policy deterministically by choosing the first available action
    policy = {state: (list(mdp.actions(state))[0] if mdp.actions(state) else None) for state in mdp.states}

    stable = False  # Assume the policy is not stable initially

    while not stable:
        # Evaluate the current policy to get utility values
        utility = policy_evaluation(mdp, policy, verbose=verbose)

        stable = True  # Assume the policy will remain stable
        # Improve the policy for each state
        for state in mdp.states:
            actions = mdp.actions(state)
            if not actions:  # Skip states with no available actions
                continue

            # Find the best action for the state based on utility
            best_action = max(
                actions,
                key=lambda action: sum(
                    probability * utility[next_state]
                    for next_state, probability in mdp.results(state, action).items()
                )
            )

            # Update the policy if the best action differs from the current policy
            if best_action != policy[state]:
                policy[state] = best_action
                stable = False  # Mark the policy as unstable

        if verbose:
            visualizer.visualize([policy])  # Visualize the policy if verbose mode is enabled

    return policy


### Q-Value Iteration

$$Q_{i+1}(s, a) = r_s + \gamma \times \sum_{s'} P(s'|s,a) U_{i}(s')$$
$$\equiv$$
$$Q_{i+1}(s, a) = r_s + \gamma \times \sum_{s'} P(s'|s,a) \max_{a' \in A_{s'}} Q_{i}(s', a')$$

$$\pi^*(s) = \argmax_{a \in A_s} Q(s,a)$$

In [7]:
def q_value_iteration(mdp, init_q=None, error=1e-6):
    """
    Perform Q-value iteration to solve the Markov Decision Process (MDP).

    Parameters:
    - mdp: The environment.
    - init_q: Starting Q-values for each (state, action) pair, default is 0 for all.
    - error: Small threshold to decide when to stop updating.

    Returns:
    - q: Dictionary with final Q-values for each (state, action) pair.
    """

    # Initialize Q-values for each state-action pair to 0
    q = init_q or {state: {action: 0 for action in mdp.actions(state)} for state in mdp.states}
    converged = False

    # Update Q-values until changes are below the error threshold
    while not converged:
        old_q = {state: q[state].copy() for state in q}  # Copy the current Q-values
        converged = True  # Assume convergence until proven otherwise

        for state in mdp.states:
            for action in mdp.actions(state):
                # Compute the new Q-value for (state, action)
                q[state][action] = mdp.reward(state) + mdp.discount * sum(
                    probability * max(old_q[next_state].values())  # Use the max Q-value of the next state
                    for next_state, probability in mdp.results(state, action).items()
                )

                # Check if the Q-value has significantly changed
                if abs(q[state][action] - old_q[state][action]) > error:
                    converged = False  # If a significant change occurred, keep iterating

    return q


def policy_from_q(mdp, q, verbose=True):
    """
    Derive the optimal policy from Q-values.

    Parameters:
    - mdp: The environment.
    - q: The final Q-values for each (state, action).
    - verbose: If True, show the policy step-by-step.

    Returns:
    - policy: A dictionary mapping each state to the best action.
    """

    if verbose:
        visualizer = Visualizer(mdp)

    policy = {}  # Initialize an empty policy dictionary

    for state in mdp.states:
        # Find the action with the highest Q-value for this state
        best_action = max(q[state], key=q[state].get)
        policy[state] = best_action

    if verbose:
        visualizer.visualize([policy])  # Visualize the policy if verbose

    return policy


### Example: Slippery Tom & Jerry

Let's model the slippery motion in the above GIF. When Tom wants to move in a certain direction, he either moves or slips and stays put.

In [8]:
class SlipperyTomAndJerry(MDP):
    '''Slippery Tom & Jerry world'''

    def __init__(self, tom, jerry, spike, bounds, max_reward, discount, p):
        self.tom = tom
        self.jerry = jerry
        self.spike = spike
        self.bounds = bounds
        self.max_reward = max_reward
        self.discount = discount
        self.p = p
        self.states = {(i, j) for i in range(bounds[0]) for j in range(bounds[1])}

    def actions(self, state):
        if state is None: return []
        if state == self.jerry: return ['catch!']
        if state == self.spike: return ['ouch!']
        return ['up', 'down', 'left', 'right']

    def results(self, state, action):
        if   action ==    'up': return {(state[0], min(state[1] + 1, self.bounds[1] - 1)): 1.0-self.p, state: self.p}
        elif action ==  'down': return {(state[0], max(state[1] - 1, 0)): 1.0-self.p, state: self.p}
        elif action ==  'left': return {(max(state[0] - 1, 0), state[1]): 1.0-self.p, state: self.p}
        elif action == 'right': return {(min(state[0] + 1, self.bounds[0] - 1), state[1]): 1.0-self.p, state: self.p}
        else: return {}

    def reward(self, state):
        if state == self.jerry: return  self.max_reward
        if state == self.spike: return -self.max_reward
        return 0

_visualizers[SlipperyTomAndJerry] = _tom_and_jerry_visualizer

In [9]:
slippery_tom_and_jerry = SlipperyTomAndJerry((0, 0), (5, 5), (5, 4), (7, 7), 1000, 0.1, 0.1)

In [10]:
# Perform value iteration
values = value_iteration(slippery_tom_and_jerry, verbose=True)
policy_from_values(slippery_tom_and_jerry, values, verbose=True)


Frontier at step 1

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
----------------------------------------------------------------------------------------------------
Frontier at step 2

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	 1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	-1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	

{(4, 0): 'up',
 (3, 4): 'up',
 (4, 3): 'up',
 (3, 1): 'up',
 (5, 4): 'ouch!',
 (4, 6): 'down',
 (5, 1): 'up',
 (0, 2): 'up',
 (0, 5): 'right',
 (2, 2): 'up',
 (1, 0): 'up',
 (1, 6): 'down',
 (2, 5): 'right',
 (1, 3): 'up',
 (6, 2): 'up',
 (6, 5): 'left',
 (4, 2): 'up',
 (3, 0): 'up',
 (4, 5): 'right',
 (3, 3): 'up',
 (5, 0): 'up',
 (5, 6): 'down',
 (3, 6): 'down',
 (5, 3): 'left',
 (0, 1): 'up',
 (2, 4): 'up',
 (1, 2): 'up',
 (0, 4): 'up',
 (2, 1): 'up',
 (1, 5): 'right',
 (6, 1): 'up',
 (6, 4): 'up',
 (3, 2): 'up',
 (4, 1): 'up',
 (3, 5): 'right',
 (5, 2): 'up',
 (4, 4): 'up',
 (5, 5): 'catch!',
 (0, 0): 'up',
 (1, 1): 'up',
 (0, 3): 'up',
 (2, 0): 'up',
 (1, 4): 'up',
 (0, 6): 'down',
 (2, 3): 'up',
 (2, 6): 'down',
 (6, 0): 'up',
 (6, 6): 'down',
 (6, 3): 'up'}

In [11]:
# Perform Policy Iteration

optimal_policy = policy_iteration(slippery_tom_and_jerry, verbose=True)
print("Optimal Policy:")
for state, action in optimal_policy.items():
    print(f"State {state}: Best Action = {action}")


Frontier at step 1

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
----------------------------------------------------------------------------------------------------
Frontier at step 2

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	 1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	-1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	

In [12]:

# Perform Q-value iteration
q_values = q_value_iteration(slippery_tom_and_jerry)
policy = policy_from_q(slippery_tom_and_jerry, q_values, verbose=True)


Frontier at step 1

    down	    down	    down	    down	    down	    down	    down	
   right	   right	   right	   right	   right	  catch!	    left	
      up	      up	      up	      up	      up	   ouch!	      up	
      up	      up	      up	      up	      up	    left	      up	
      up	      up	      up	      up	      up	      up	      up	
      up	      up	      up	      up	      up	      up	      up	
      up	      up	      up	      up	      up	      up	      up	
----------------------------------------------------------------------------------------------------


In [13]:
class YourCustomizedProblem(MDP):
    '''Implement here yours customized problem'''

