## Visualization

As always, we have some code to help us visualize the problem! So let's get that out of the way!

In [40]:
from shutil import get_terminal_size
terminal_width, _ = get_terminal_size()

_visualizers = {}

def _default_visualizer(_, state):
    '''Generic visualizer for unknown problems.'''
    print(state)

class Visualizer:
    '''Visualization and printing functionality encapsulation.'''

    def __init__(self, problem):
        '''Constructor with the problem to visualize.'''
        self.problem = problem
        self.counter = 0

    def visualize(self, frontier):
        '''Visualizes the frontier at every step.'''
        self.counter += 1
        print(f'Frontier at step {self.counter}')
        for state in frontier:
            print()
            _visualizers.get(type(self.problem), _default_visualizer)(self.problem, state)
        print('-' * terminal_width)

def _tom_and_jerry_visualizer(problem, state):
    '''Custom visualizer for Tom and Jerry.'''
    for j in range(problem.bounds[1] - 1, -1, -1):
        for i in range(problem.bounds[0]):
            print(('%8s' if type(state[(i, j)]) is str else '%8.2f') % state[(i, j)], end='\t')
        print()

## Markovian Decision Processes (MDPs)

Great! Now we have yet another way to solve deterministic classical search problems! Why bother? As we mentioned earlier, this is just an introduction to decision processes. In the real world, actions are _far from deterministic_; time passes, weather changes, actuators fail, etc. Even in our simple Tom & Jerry world, as anyone who have watched the show would notice, Tom slips... A LOT. A very famous way to model this stochasticity is Markovian Decision Processes (MDPs).

In [41]:
from IPython.display import Image
Image(url='https://media.giphy.com/media/47xzuq4tImaV0ski1T/giphy-downsized-large.gif')

In an MDP, we assume the new state resulting from applying an action to a state depends only on the state and the action. That is, it does not depend on previous states. While this may seem like a reasonable assumption, it is usually the case, but it does simplify the model considerably. (It is called the Markovian assumption and it is what the _M_ in _MDP_ stands for.)

Using this assumption, we only need to change our _result_ function to be _results_! A function that returns a _distribution_ over the possible next states. (This is called the _transition model_ by the way.)

In [42]:
class MDP:
    '''
    Abstract base class for Markovian Decision Process (MDP) formulation.
    It declares the expected methods to be used to solve it.
    All the methods declared are just placeholders that throw errors if not overriden by child "concrete" classes!
    '''

    def __init__(self):
        '''Constructor that initializes the problem. Typically used to setup the state space.'''
        self.states = set()

    def actions(self, state):
        '''Returns an iterable with the applicable actions to the given state.'''
        raise NotImplementedError

    def results(self, state, action):
        '''Returns a ditribution over all possible resulting states from applying the given action to the given state.'''
        raise NotImplementedError

    def reward(self, state):
        '''Returns the reward of being in a certain state'''
        raise NotImplementedError

Next, we need to modify our previous algorithms to replace anything depending on _the next state_ with a summation over the distribution over all possible next states... and that's it! Here are the famous algorithms to solve an MDP: **value iteration**, **policy iteration**, and **Q-value iteration**!

### Value Iteration

$$U_{i+1}(s) = r_s + \gamma \times \max_{a \in A_s} \sum_{s'} P(s'|s,a) U_{i}(s')$$

$$\pi^*(s) = \argmax_{a \in A_s} \sum_{s'} P(s'|s,a) U_{i}(s')$$

In [43]:
def value_iteration(mdp, init_utility=None, error=1e-6, verbose=False):
    if verbose: visualizer = Visualizer(mdp)
    utility = init_utility if init_utility is not None else {state: 0 for state in mdp.states | {None}}
    converged = False
    while not converged:
        if verbose: visualizer.visualize([utility])
        old_utility, converged = utility.copy(), True
        for state in mdp.states:
            utility[state] = mdp.reward(state) + mdp.discount * max(sum(p * old_utility[next_state] for next_state, p in mdp.results(state, action).items()) for action in mdp.actions(state))
            if abs(utility[state] - old_utility[state]) > error: converged = False
    return utility

def policy_from_values(mdp, utility, verbose=True):
    if verbose: visualizer = Visualizer(mdp)
    policy = {}
    for state in mdp.states:
        policy[state] = max((action for action in mdp.actions(state)), key=lambda action: sum(p * utility[next_state] for next_state, p in mdp.results(state, action).items()))
    if verbose: visualizer.visualize([policy])
    return policy

### Policy Iteration

$$U_{i+1}(s) = r_s + \gamma \times \sum_{s'} P(s'|s,a) U_{i}(s')$$

$$\pi_{i+1}(s) = \argmax_{a \in A_s} \sum_{s'} P(s'|s,a) U_{i+1}(s')$$

In [44]:
from random import choice

def policy_evaluation(mdp, policy, init_utility=None, error=1e-6, verbose=False):
    """implement the equation here"""
    if verbose: visualizer = Visualizer(mdp)
    utility = init_utility if init_utility is not None else {state: 0 for state in mdp.states}
    converged = False

    while not converged:
        if verbose: visualizer.visualize([utility])
        old_utility, converged = utility.copy(), True

        for state in mdp.states:
            action = policy[state]
            utility[state] = mdp.reward(state) + mdp.discount * sum(
                p * old_utility[next_state] for next_state, p in mdp.results(state, action).items()
            )
            if abs(utility[state] - old_utility[state]) > error:
                converged = False
    return utility

def policy_iteration(mdp, verbose=False):
    """implement the equation here"""
    policy = {state: choice(mdp.actions(state)) for state in mdp.states}
    if verbose: visualizer = Visualizer(mdp)

    is_policy_stable = False
    while not is_policy_stable:
        utility = policy_evaluation(mdp, policy)
        is_policy_stable = True

        for state in mdp.states:
            old_action = policy[state]
            policy[state] = max(
                mdp.actions(state),
                key=lambda action: sum(p * utility[next_state] for next_state, p in mdp.results(state, action).items())
            )
            if old_action != policy[state]:
                is_policy_stable = False

        if verbose: visualizer.visualize([policy])
    return policy

### Q-Value Iteration

$$Q_{i+1}(s, a) = r_s + \gamma \times \sum_{s'} P(s'|s,a) U_{i}(s')$$
$$\equiv$$
$$Q_{i+1}(s, a) = r_s + \gamma \times \sum_{s'} P(s'|s,a) \max_{a' \in A_{s'}} Q_{i}(s', a')$$

$$\pi^*(s) = \argmax_{a \in A_s} Q(s,a)$$

In [45]:
def q_value_iteration(mdp, init_q=None, error=1e-6,verbose=False):
    """implement the equation here"""
    if verbose: visualizer = Visualizer(mdp)
    q = init_q if init_q is not None else {state: {action: 0 for action in mdp.actions(state)} for state in mdp.states}
    converged = False
    while not converged:
        if verbose: visualizer.visualize([{state: max(actions.values()) for state, actions in q.items()}])
        old_q = {state: q[state].copy() for state in mdp.states}
        converged = True
        for state in mdp.states:
            for action in mdp.actions(state):
                q[state][action] = mdp.reward(state) + mdp.discount * sum(
                    p * max(old_q[next_state].values()) for next_state, p in mdp.results(state, action).items()
                )
                if abs(q[state][action] - old_q[state][action]) > error:
                    converged = False
    return q

def policy_from_q(mdp, q, verbose=True):
    """implement the equation here"""
    policy = {}
    for state in mdp.states:
        policy[state] = max(mdp.actions(state), key=lambda action: q[state][action])
    if verbose: visualizer=Visualizer(mdp)
    if verbose: visualizer.visualize([policy])

    return policy

### Example: Slippery Tom & Jerry

Let's model the slippery motion in the above GIF. When Tom wants to move in a certain direction, he either moves or slips and stays put.

In [46]:
class SlipperyTomAndJerry(MDP):
    '''Slippery Tom & Jerry world'''

    def __init__(self, tom, jerry, spike, bounds, max_reward, discount, p):
        self.tom = tom
        self.jerry = jerry
        self.spike = spike
        self.bounds = bounds
        self.max_reward = max_reward
        self.discount = discount
        self.p = p
        self.states = {(i, j) for i in range(bounds[0]) for j in range(bounds[1])}

    def actions(self, state):
        if state is None: return []
        if state == self.jerry: return ['catch!']
        if state == self.spike: return ['ouch!']
        return ['up', 'down', 'left', 'right']

    def results(self, state, action):
        if   action ==    'up': return {(state[0], min(state[1] + 1, self.bounds[1] - 1)): 1.0-self.p, state: self.p}
        elif action ==  'down': return {(state[0], max(state[1] - 1, 0)): 1.0-self.p, state: self.p}
        elif action ==  'left': return {(max(state[0] - 1, 0), state[1]): 1.0-self.p, state: self.p}
        elif action == 'right': return {(min(state[0] + 1, self.bounds[0] - 1), state[1]): 1.0-self.p, state: self.p}
        else: return {}

    def reward(self, state):
        if state == self.jerry: return  self.max_reward
        if state == self.spike: return -self.max_reward
        return 0

_visualizers[SlipperyTomAndJerry] = _tom_and_jerry_visualizer

In [47]:
slippery_tom_and_jerry = SlipperyTomAndJerry((0, 0), (5, 5), (5, 4), (7, 7), 1000, 0.1, 0.1)

In [48]:
values = value_iteration(slippery_tom_and_jerry, verbose=True)
policy_from_values(slippery_tom_and_jerry, values, verbose=True)

Frontier at step 1

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 2

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	 1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	-1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.0

{(4, 0): 'up',
 (3, 4): 'up',
 (4, 3): 'up',
 (3, 1): 'up',
 (5, 4): 'ouch!',
 (4, 6): 'down',
 (5, 1): 'up',
 (0, 2): 'up',
 (0, 5): 'right',
 (2, 2): 'up',
 (1, 0): 'up',
 (1, 6): 'down',
 (2, 5): 'right',
 (1, 3): 'up',
 (6, 2): 'up',
 (6, 5): 'left',
 (4, 2): 'up',
 (3, 0): 'up',
 (4, 5): 'right',
 (3, 3): 'up',
 (5, 0): 'up',
 (5, 6): 'down',
 (3, 6): 'down',
 (5, 3): 'left',
 (0, 1): 'up',
 (2, 4): 'up',
 (1, 2): 'up',
 (0, 4): 'up',
 (2, 1): 'up',
 (1, 5): 'right',
 (6, 1): 'up',
 (6, 4): 'up',
 (3, 2): 'up',
 (4, 1): 'up',
 (3, 5): 'right',
 (5, 2): 'up',
 (4, 4): 'up',
 (5, 5): 'catch!',
 (0, 0): 'up',
 (1, 1): 'up',
 (0, 3): 'up',
 (2, 0): 'up',
 (1, 4): 'up',
 (0, 6): 'down',
 (2, 3): 'up',
 (2, 6): 'down',
 (6, 0): 'up',
 (6, 6): 'down',
 (6, 3): 'up'}

In [49]:

policy = policy_iteration(slippery_tom_and_jerry, verbose=True)
utility = policy_evaluation(slippery_tom_and_jerry, policy, verbose=True)

Frontier at step 1

      up	      up	      up	      up	      up	    down	    down	
      up	      up	      up	      up	   right	  catch!	    left	
      up	      up	      up	      up	      up	   ouch!	      up	
      up	      up	      up	      up	      up	    down	      up	
      up	      up	      up	      up	      up	      up	      up	
      up	      up	      up	      up	      up	      up	      up	
      up	      up	      up	      up	      up	      up	      up	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 2

      up	      up	      up	      up	    down	    down	    down	
      up	      up	      up	   right	   right	  catch!	    left	
      up	      up	      up	   right	      up	   ouch!	      up	
      up	      up	      up	   right	      up	    left	      up	
      up	      up	      up	   right	      up	    left	      up	
      up	      up	      up	   right	      up	    left	      up	
      u

In [50]:
q_values = q_value_iteration(slippery_tom_and_jerry,verbose=True)

policy_from_q(slippery_tom_and_jerry, q_values, verbose=True)

Frontier at step 1

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 2

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	 1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	-1000.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.0

{(4, 0): 'up',
 (3, 4): 'up',
 (4, 3): 'up',
 (3, 1): 'up',
 (5, 4): 'ouch!',
 (4, 6): 'down',
 (5, 1): 'up',
 (0, 2): 'up',
 (0, 5): 'right',
 (2, 2): 'up',
 (1, 0): 'up',
 (1, 6): 'down',
 (2, 5): 'right',
 (1, 3): 'up',
 (6, 2): 'up',
 (6, 5): 'left',
 (4, 2): 'up',
 (3, 0): 'up',
 (4, 5): 'right',
 (3, 3): 'up',
 (5, 0): 'up',
 (5, 6): 'down',
 (3, 6): 'down',
 (5, 3): 'left',
 (0, 1): 'up',
 (2, 4): 'up',
 (1, 2): 'up',
 (0, 4): 'up',
 (2, 1): 'up',
 (1, 5): 'right',
 (6, 1): 'up',
 (6, 4): 'up',
 (3, 2): 'up',
 (4, 1): 'up',
 (3, 5): 'right',
 (5, 2): 'up',
 (4, 4): 'up',
 (5, 5): 'catch!',
 (0, 0): 'up',
 (1, 1): 'up',
 (0, 3): 'up',
 (2, 0): 'up',
 (1, 4): 'up',
 (0, 6): 'down',
 (2, 3): 'up',
 (2, 6): 'down',
 (6, 0): 'up',
 (6, 6): 'down',
 (6, 3): 'up'}

In [51]:
class YourCustomizedProblem(MDP):
    '''Implement here yours customized problem'''
    # Avoid the obstacle
    def __init__(self, robot, goal, obstacle, grid, max_reward, discount, p):
        self.robot = robot
        self.obstacle = obstacle
        self.goal = goal
        self.grid = grid
        self.max_reward = max_reward
        self.discount = discount
        self.p = p
        self.states = {(i, j) for i in range(grid[0]) for j in range(grid[1])}

    def actions(self, state):
        if state is None: return []
        if state == self.goal: return ['goal!']
        if state in self.obstacle: return ['!OBSTACLE!']
        return ['up', 'down', 'left', 'right']

    def results(self, state, action):
      if   action ==    'up': return {(state[0], min(state[1] + 1, self.grid[1] - 1)): 1.0-self.p, state: self.p}
      elif action ==  'down': return {(state[0], max(state[1] - 1, 0)): 1.0-self.p, state: self.p}
      elif action ==  'left': return {(max(state[0] - 1, 0), state[1]): 1.0-self.p, state: self.p}
      elif action == 'right': return {(min(state[0] + 1, self.grid[0] - 1), state[1]): 1.0-self.p, state: self.p}
      else: return {}

    def reward(self, state):
        if state == self.goal: return  self.max_reward
        if state in self.obstacle: return -self.max_reward
        return 0

In [52]:
# Catching Goal Visualizer
def _CatchingGoal_visualizer(problem, state):
    for j in range(problem.grid[1] - 1, -1, -1):
        for i in range(problem.grid[0]):
            print(('%8s' if type(state[(i, j)]) is str else '%8.2f') % state.get((i, j), ''), end='\t')
        print()

_visualizers[YourCustomizedProblem] = _CatchingGoal_visualizer

In [61]:
import random

def generate_mdp_parameters(grid_min=4, grid_max=8, obstacle_ratio=0.2):
    """
    Automatically generate realistic MDP parameters for YourCustomizedProblem.
    """
    #Random grid size
    grid_size = (random.randint(grid_min, grid_max), random.randint(grid_min, grid_max))
    x_max, y_max = grid_size

    # Start and goal
    robot = (0, 0)
    goal = (x_max - 1, y_max - 1)

    # Generate random obstacles 
    all_cells = [(i, j) for i in range(x_max) for j in range(y_max)]
    free_cells = [c for c in all_cells if c not in [robot, goal]]
    num_obstacles = int(len(all_cells) * obstacle_ratio)
    obstacles = random.sample(free_cells, num_obstacles)

    # Reward and discount setup
    max_reward = random.choice([5, 10, 20])
    discount = round(random.uniform(0.85, 0.99), 2)

    # Noise (uncertainty)
    noise = round(random.uniform(0.1, 0.3), 2)

    # Print the generated parameters
    print(f"🤖 Robot start: {robot}")
    print(f"🎯 Goal: {goal}")
    print(f"🚧 Obstacles: {obstacles}")
    print(f"🗺️ Grid size: {grid_size}")
    print(f"💰 Max reward: {max_reward}")
    print(f"📉 Discount: {discount}")
    print(f"🌪️ Noise: {noise}")

    return robot, goal, obstacles, grid_size, max_reward, discount, noise

params = generate_mdp_parameters()

# Creating an automatic problem 
CustomizedProblem = YourCustomizedProblem(*params)


🤖 Robot start: (0, 0)
🎯 Goal: (6, 3)
🚧 Obstacles: [(2, 2), (4, 2), (3, 1), (5, 3), (1, 2)]
🗺️ Grid size: (7, 4)
💰 Max reward: 5
📉 Discount: 0.88
🌪️ Noise: 0.29


In [60]:
values = value_iteration(CustomizedProblem, verbose=True)
policy_from_values(CustomizedProblem, values, verbose=True)

Frontier at step 1

    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	    0.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 2

    0.00	    0.00	    0.00	  -20.00	    0.00	   20.00	
    0.00	    0.00	    0.00	    0.00	  -20.00	    0.00	
    0.00	    0.00	    0.00	  -20.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	  -20.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 3

    0.00	    0.00	    0.00	  -20.00	   14.04	   20.00	
    0.00	    0.00	    0.00	    0.00	  -20.00	   14.04	
    0.00	    0.00	    0.00	  -20.00	    0.00	    0.00	
    0.00	    0.00	    0.00	    0.00	    0.00	  -20.00	
--------------------------------------

{(4, 0): 'up',
 (4, 3): 'right',
 (3, 1): '!OBSTACLE!',
 (5, 1): 'up',
 (0, 2): 'down',
 (2, 2): 'down',
 (1, 0): 'right',
 (1, 3): 'down',
 (4, 2): '!OBSTACLE!',
 (3, 0): 'right',
 (3, 3): '!OBSTACLE!',
 (5, 0): '!OBSTACLE!',
 (5, 3): 'goal!',
 (0, 1): 'down',
 (1, 2): 'down',
 (2, 1): 'down',
 (3, 2): 'left',
 (4, 1): 'right',
 (5, 2): 'up',
 (0, 0): 'right',
 (1, 1): 'down',
 (0, 3): 'down',
 (2, 0): 'right',
 (2, 3): 'down'}

In [69]:
mdp = YourCustomizedProblem((0, 0), (2, 2), [(1, 1), (0, 2)], (3, 3), 10, 0.9, 0.2)
policy = policy_iteration(mdp, verbose=True)
utility = policy_evaluation(mdp, policy, verbose=True)


Frontier at step 1

!OBSTACLE!	   right	   goal!	
    down	!OBSTACLE!	      up	
      up	    down	      up	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 2

!OBSTACLE!	   right	   goal!	
    down	!OBSTACLE!	      up	
      up	   right	      up	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 3

!OBSTACLE!	   right	   goal!	
    down	!OBSTACLE!	      up	
   right	   right	      up	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 4

!OBSTACLE!	   right	   goal!	
    down	!OBSTACLE!	      up	
   right	   right	      up	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 1

    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	
    0.00

In [70]:
q_values = q_value_iteration(mdp,verbose=True)

policy_from_q(mdp, q_values, verbose=True)

Frontier at step 1

    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	
    0.00	    0.00	    0.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 2

  -10.00	    0.00	   10.00	
    0.00	  -10.00	    0.00	
    0.00	    0.00	    0.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 3

  -10.00	    7.20	   10.00	
    0.00	  -10.00	    7.20	
    0.00	    0.00	    0.00	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 4

  -10.00	    8.50	   10.00	
    0.00	  -10.00	    8.50	
    0.00	    0.00	    5.18	
------------------------------------------------------------------------------------------------------------------------
Frontier at step 5

  -10.00	    8.73	   10.00	
    0.00	  -10.00	    8.73	
    0.00	    3.73	    7.

{(0, 1): 'down',
 (1, 2): 'right',
 (2, 1): 'up',
 (0, 0): 'right',
 (1, 1): '!OBSTACLE!',
 (2, 0): 'up',
 (0, 2): '!OBSTACLE!',
 (2, 2): 'goal!',
 (1, 0): 'right'}