# Markov Decision Processes

In [None]:
# Import libraries
import numpy as np
import platform
import math
from enum import Enum
from typing import Tuple, Generator, List

# Check Python version
print(f'Python version: {platform.python_version()}')

Python version: 3.10.12


# Create the test scenario

Let's create a simple map. We will define the map as a matrix, with each cell representing a state. The value of each cell indicates its category:
*  0: represents free space
*  1: represents the final goal
* -1: represents a hazard to avoid
* nan: represents a wall

In [None]:
world = np.array([[0,0,0,1],[0, float('nan'), 0, -1],[0, 0, 0, 0]])
print(world)

[[ 0.  0.  0.  1.]
 [ 0. nan  0. -1.]
 [ 0.  0.  0.  0.]]


Let's define some helper functions.

*is_valid* checks if a node with (x,y) coodinates is valid (i.e. within bounds) and it is not a wall

In [None]:
def is_valid(node:Tuple[int, int], world:np.ndarray):
    """*is_valid* checks if a node with (x,y) coodinates is valid (i.e. within bounds) and it is not a wall"""
    (x,y) = node
    (w,h) = world.shape
    return x >= 0 and x < w and y >= 0 and y < h and not math.isnan(world[node]) # and world[node] >= 0

In [None]:
print(is_valid((-1, -1), world)) # False, outside of world

print(is_valid((2, 1), world)) # OK

print(is_valid((1, 1), world)) # False, is a wall

False
True
False


We need a way to visit the neighbors of a cell:

In [None]:
def neighbor_iter(node: Tuple[int, int], world:np.ndarray)->Generator[Tuple[int, int], None, None]:
        """List of valid nodes neighouring node"""
        (x,y) = node
        n = (x-1, y)
        if is_valid(n, world):
            yield n
        n = (x+1, y)
        if is_valid(n, world):
            yield n
        n = (x, y-1)
        if is_valid(n, world):
            yield n
        n = (x, y+1)
        if is_valid(n, world):
            yield n

In [None]:
for neighbor_cell in neighbor_iter((0,0), world):
    print(neighbor_cell)

(1, 0)
(0, 1)


Exercise: Find the neighbors of the wall cell

And to draw the map

In [None]:
def draw_world(world: np.ndarray, path: List[Tuple[int, int]])->None:
    """Draw the map and a path in it. The path appears as a list of numbers in the order the path is taken."""
    w = world.copy()
    step = 1
    for cell in path:
        w[cell]=step
        step = step+1
    print("\n")
    print(w)

In [None]:
draw_world(world, [])



[[ 0.  0.  0.  1.]
 [ 0. nan  0. -1.]
 [ 0.  0.  0.  0.]]


In [None]:
draw_world(world, [(0,0), (0,1)])



[[ 1.  2.  0.  1.]
 [ 0. nan  0. -1.]
 [ 0.  0.  0.  0.]]


## Markov Decision Processes (MDP)

A Markov decision process takes into account that a robot can fail to complete a task. When planning a motion this means that the robot can end in a hazard area (i.e. stairs), even if the plan was perfect, just because it fails to complete a motion and ended in the wrong area.

We can model this situation by defining a policy $\pi$, a plan the guides the action of the robot in different circunstances. In this case the policy will be guided by the concept of utility (value):
* arriving at the destination has a certain utility,
* getting into a hazard has a negative utility,
* moving from cell to cell has a negative utility (or the more "util" plan would be to move in loops...).

We can model these values as:

In [None]:
# This constants will define our transtition model T
TM_FORWARD = 0.8      # Probability of the robot moving along the planned direction
TM_LEFT = 0.1         # Probability of moving to the left
TM_RIGHT = 0.1        # Probability of moving to the right

STEP_REWARD = -0.001  # Utility loss of moving

# Discount
GAMMA = 1.0           # Loss of utility of previous steps: GAMMA = 1 no utility is loss, GAMMA = 0 previous steps don't count

A key problem in the definition of a Markov process are transitions. The transition_test function which of the actions (move to left, right, up, down) is the most useful. It adds the utility of each action by summing the probability of moving to a neighbouring cell with the utility of this cell. The function will return the larger of this values:

In [None]:
def transition_test(node: Tuple[int, int], world: np.ndarray, value: np.ndarray) -> float:
    """This function will test all posible actions considering the transition model and their feasibility"""

    lst = []        # This list will store the value of each action
    (x,y) = node    # Let's split the coordinates of the state for easier usage

    # Move to left

    sum = 0
    forward_node = (x, y-1)
    left_node    = (x+1, y)
    right_node   = (x-1, y)
    if is_valid(forward_node, world):
        sum +=  TM_FORWARD * value[forward_node]
    if is_valid(left_node, world):
        sum += TM_LEFT * value[left_node]
    if is_valid(right_node, world):
        sum += TM_RIGHT * value[right_node]

    lst.append(sum)

    # Move Up

    sum = 0
    forward_node = (x+1, y)
    left_node    = (x, y-1)
    right_node   = (x, y+1)
    if is_valid(forward_node, world):
        sum += TM_FORWARD * value[forward_node]
    if is_valid(left_node, world):
        sum += TM_LEFT * value[left_node]
    if is_valid(right_node, world):
        sum += TM_RIGHT * value[right_node]

    lst.append(sum)

    # Move Right

    sum = 0
    forward_node = (x, y+1)
    left_node    = (x-1, y)
    right_node   = (x+1, y)
    if is_valid(forward_node, world):
        sum += TM_FORWARD * value[forward_node]
    if is_valid(left_node, world):
        sum += TM_LEFT * value[left_node]
    if is_valid(right_node, world):
        sum += TM_RIGHT * value[right_node]

    lst.append(sum)

    # Move Down

    sum = 0
    forward_node = (x-1, y)
    left_node    = (x, y+1)
    right_node   = (x, y-1)
    if is_valid(forward_node, world):
        sum += TM_FORWARD * value[forward_node]
    if is_valid(left_node, world):
        sum += TM_LEFT * value[left_node]
    if is_valid(right_node, world):
        sum += TM_RIGHT * value[right_node]

    lst.append(sum)

    return max(lst)

The markov function computes the utility of our map. It iterates the utility of each cell according to $u_{i,j}=r_{i,j}+\Gamma*T_{i,j}$, where $r$ is the expected rewars, $\Gamma$ is the decay value of the iteration and $T_{i,j}$ is the utility of the transition:

In [None]:
def markov(world:np.ndarray, rewards: np.ndarray)->np.ndarray:
    """Compute the utility of the map. Rewards and utility are stores in maps with the same shape as the map"""
    value = np.zeros_like(world)

    while True:
        up = np.copy(value) # Make a copy of the current utility for comparison

        # For each cell of the map
        for x in range(0, world.shape[0]):
            for y in range(0, world.shape[1]):
                if not math.isnan(world[x,y]): # If it's not a wall
                    if world[x,y] == 1: # Don't change the utility of the goal...
                        value[x,y]=1
                    elif world[x,y] == -1: # ... or the hazards
                        value[x,y]=-1
                    else: # For all other cells, update the utility using Bellman's Equation: V(𝑠)=𝑅(𝑠)+max⁡ ∑𝑠′[𝑇(𝑠, 𝑎, 𝑠^′ )𝑉(𝑠^′)]
                        value[x,y] = rewards[x,y]+GAMMA*transition_test((x,y), world, up)
                else: # if it's a wall then we fix its utility to a fixed low value
                    value[x,y] = -10000

        #print(utility)
        #print(rewards)

        if np.linalg.norm(value-up)<1e-7: # Stop when the change is small enough
            return value

The trace function uses the utility matrix to generate a path. It starts searching for the motions with larger utility at each step (simulating a gradient ascent)

In [None]:
def trace(node:Tuple[int, int], world:np.ndarray, value:np.ndarray)->List[Tuple[int, int]]:
    """Trace the path"""
    curr = node
    u = value[node]
    path = [node]
    next = ()
    while True:
        for node in neighbor_iter(curr, world):
            if u < value[node]:
                u = value[node]
                next = node
        if next == ():
            return path
        else:
            path.append(next)
            curr = next
            next = ()

## Example 1

We can also set-up a reward map for our problem

In [None]:
rewards = np.array([[STEP_REWARD, STEP_REWARD,STEP_REWARD, 1],[STEP_REWARD, float('NaN'), STEP_REWARD, -1],[STEP_REWARD, STEP_REWARD, STEP_REWARD, STEP_REWARD]])
print(rewards)

[[-0.001 -0.001 -0.001  1.   ]
 [-0.001    nan -0.001 -1.   ]
 [-0.001 -0.001 -0.001 -0.001]]


Finally we can run the code and generate a path:

In [None]:
value = markov(world, rewards)

path = trace((2,1), world, value)
print(path)

draw_world(world, path)

[(2, 1), (2, 2), (1, 2), (0, 2), (0, 3)]


[[ 0.  0.  4.  5.]
 [ 0. nan  3. -1.]
 [ 0.  1.  2.  0.]]


In [None]:
print(value)

[[ 5.94456522e-01  6.85000000e-01  8.57500000e-01  1.00000000e+00]
 [ 4.74565217e-01 -1.00000000e+04  5.85000000e-01 -1.00000000e+00]
 [ 4.22056930e-01  4.34047567e-01  5.43809513e-01  3.34047567e-01]]


## Example 2
Let's generate a second map

In [None]:
world2 = np.array([[0,0,0,0,1],[float('nan'), 0, float('nan'), float('nan'), -1],[0, 0, 0, 0, 0], [0, float('nan'), float('nan'), 0, -1], [0, 0, 0, 0, 0]])
print(world2)

[[ 0.  0.  0.  0.  1.]
 [nan  0. nan nan -1.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0. -1.]
 [ 0.  0.  0.  0.  0.]]


In [None]:
rewards2 = (world2 == 0).astype(float)*STEP_REWARD+(world2 == 1).astype(float)+(world2 == -1).astype(float)
print(rewards2)

[[-0.001 -0.001 -0.001 -0.001  1.   ]
 [ 0.    -0.001  0.     0.     1.   ]
 [-0.001 -0.001 -0.001 -0.001 -0.001]
 [-0.001  0.     0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001 -0.001]]


In [None]:
value = markov(world2, rewards2)

In [None]:
path = trace((4,0), world2, value)
print(path)

draw_world(world2, path)

[(4, 0), (3, 0), (2, 0), (2, 1), (1, 1), (0, 1), (0, 2), (0, 3), (0, 4)]


[[ 0.  6.  7.  8.  9.]
 [nan  5. nan nan -1.]
 [ 3.  4.  0.  0.  0.]
 [ 2. nan nan  0. -1.]
 [ 1.  0.  0.  0.  0.]]


Let's trace a path from point (4,4)

In [None]:
path = trace((4,4), world2, value)
print(path)

draw_world(world2, path)

[(4, 4), (4, 3), (4, 2), (4, 1), (4, 0), (3, 0), (2, 0), (2, 1), (1, 1), (0, 1), (0, 2), (0, 3), (0, 4)]


[[ 0. 10. 11. 12. 13.]
 [nan  9. nan nan -1.]
 [ 7.  8.  0.  0.  0.]
 [ 6. nan nan  0. -1.]
 [ 5.  4.  3.  2.  1.]]


## Example 3

Now let's test the examples changing some parameters

In [None]:
TM_FORWARD = 0.98
TM_LEFT = 0.01
TM_RIGHT = 0.01

STEP_REWARD = -0.01
GAMMA = 0.8

In [None]:
rewards = (world == 0).astype(float)*STEP_REWARD+(world == 1).astype(float)+(world == -1).astype(float)
print(rewards)

[[-0.01 -0.01 -0.01  1.  ]
 [-0.01  0.   -0.01  1.  ]
 [-0.01 -0.01 -0.01 -0.01]]


In [None]:
value = markov(world, rewards)
print(value)

[[ 4.63645356e-01  6.00532363e-01  7.78740259e-01  1.00000000e+00]
 [ 3.53497959e-01 -1.00000000e+04  5.92532363e-01 -1.00000000e+00]
 [ 2.69948100e-01  3.50712736e-01  4.60092776e-01  3.42712736e-01]]


In [None]:
path = trace((2,1), world, value)
print(path)

draw_world(world, path)

[(2, 1), (2, 2), (1, 2), (0, 2), (0, 3)]


[[ 0.  0.  4.  5.]
 [ 0. nan  3. -1.]
 [ 0.  1.  2.  0.]]


In [None]:
rewards2 = (world2 == 0).astype(float)*STEP_REWARD+(world2 == 1).astype(float)+(world2 == -1).astype(float)*(-1.0)
print(rewards2)
utility2 = np.zeros_like(world2)

value = markov(world2, rewards2)

[[-0.01 -0.01 -0.01 -0.01  1.  ]
 [ 0.   -0.01  0.    0.   -1.  ]
 [-0.01 -0.01 -0.01 -0.01 -0.01]
 [-0.01  0.    0.   -0.01 -1.  ]
 [-0.01 -0.01 -0.01 -0.01 -0.01]]


In [None]:
path = trace((4,4), world2, value)
print(path)

draw_world(world2, path)

[(4, 4), (4, 3), (3, 3), (2, 3), (2, 2), (2, 1), (1, 1), (0, 1), (0, 2), (0, 3), (0, 4)]


[[ 0.  8.  9. 10. 11.]
 [nan  7. nan nan -1.]
 [ 0.  6.  5.  4.  0.]
 [ 0. nan nan  3. -1.]
 [ 0.  0.  0.  2.  1.]]


# Exercise

Create your own adventure. Create a map at least of size 5x5 and set several walls and hazards. Also define a cell goal and transition parameters as well. Test it and analyze the results.

- Is it possible to set several goals?
- Could the goals have different values (utility)? What would happen?

- What information do we need to provide to the algorithm?
- Is it realistical to have this information?

- We have worked with a world where every state 's' and action 'a' have discrete values. Could we work with continuous states or actions?


## First map, with just one goal

In [None]:
map = np.array([[0,0,float('nan'),0,1],[float('nan'), 0, float('nan'), 0, -1],[0, 0, 0, 0, 0], [0, float('nan'), float('nan'), 0, -1], [0, 0, 0, 0, 0]])
print(map)


[[ 0.  0. nan  0.  1.]
 [nan  0. nan  0. -1.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0. -1.]
 [ 0.  0.  0.  0.  0.]]


## Set Parameters

In [None]:
# This constants will define our transtition model T
TM_FORWARD = 0.8      # Probability of the robot moving along the planned direction
TM_LEFT = 0.1         # Probability of moving to the left
TM_RIGHT = 0.1   # Probability of moving to the right

STEP_REWARD = -0.001  # Utility loss of moving

# Discount
GAMMA = 1.0           # Loss of utility of previous steps: GAMMA = 1 no utility is loss, GAMMA = 0 previous steps don't count

 ## Set rewards with parameters

In [None]:
rewardsmap = (map == 0).astype(float)*STEP_REWARD+(map == 1).astype(float)+(map == -1).astype(float)
print(rewardsmap)

[[-0.001 -0.001  0.    -0.001  1.   ]
 [ 0.    -0.001  0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001 -0.001]
 [-0.001  0.     0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001 -0.001]]


In [None]:
value = markov(map, rewardsmap)
print(value)

[[ 2.02123039e-01  2.53903814e-01 -1.00000000e+04  8.57500000e-01
   1.00000000e+00]
 [-1.00000000e+04  2.93364387e-01 -1.00000000e+04  5.85000000e-01
  -1.00000000e+00]
 [ 3.18765637e-01  3.67955486e-01  4.24523809e-01  5.31904762e-01
   2.24523809e-01]
 [ 2.54012510e-01 -1.00000000e+04 -1.00000000e+04  3.24523809e-01
  -1.00000000e+00]
 [ 2.20957157e-01  1.87471647e-01  2.35589559e-01  2.95736959e-01
   1.35589559e-01]]


In [None]:
path = trace((4,0), map, value)
print(path)

draw_world(map, path)

[(4, 0), (3, 0), (2, 0), (2, 1), (2, 2), (2, 3), (1, 3), (0, 3), (0, 4)]


[[ 0.  0. nan  8.  9.]
 [nan  0. nan  7. -1.]
 [ 3.  4.  5.  6.  0.]
 [ 2. nan nan  0. -1.]
 [ 1.  0.  0.  0.  0.]]


## 2.Map with more than one goal

In [None]:
map2 = np.array([[0,0,float('nan'),0,1],[float('nan'), 0, float('nan'), 0, -1],[0, 0, 0, 0, 0], [0, float('nan'), float('nan'), 0, -1], [0, 0, 0, 0, 1]])
print(map2)

[[ 0.  0. nan  0.  1.]
 [nan  0. nan  0. -1.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0. -1.]
 [ 0.  0.  0.  0.  1.]]


In [None]:
rewardsmap2 = (map2 == 0).astype(float)*STEP_REWARD+(map2 == 1).astype(float)+(map2 == -1).astype(float)
print(rewardsmap2)

[[-0.001 -0.001  0.    -0.001  1.   ]
 [ 0.    -0.001  0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001 -0.001]
 [-0.001  0.     0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001  1.   ]]


In [None]:
value2 = markov(map2, rewardsmap2)
print(value2)

[[ 2.02122965e-01  2.53903799e-01 -1.00000000e+04  8.57500000e-01
   1.00000000e+00]
 [-1.00000000e+04  2.93364378e-01 -1.00000000e+04  5.85000000e-01
  -1.00000000e+00]
 [ 3.38647721e-01  3.67955484e-01  4.24523808e-01  5.31904762e-01
   2.24523808e-01]
 [ 3.78565217e-01 -1.00000000e+04 -1.00000000e+04  5.85000000e-01
  -1.00000000e+00]
 [ 4.74456522e-01  5.47000000e-01  6.85000000e-01  8.57500000e-01
   1.00000000e+00]]


In [None]:
path2 = trace((4,0), map2, value2)
print(path2)

draw_world(map2, path2)

[(4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]


[[ 0.  0. nan  0.  1.]
 [nan  0. nan  0. -1.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0. -1.]
 [ 1.  2.  3.  4.  5.]]


In [None]:
path2 = trace((0,0), map2, value2)
print(path2)

draw_world(map2, path2)

[(0, 0), (0, 1), (1, 1), (2, 1), (2, 2), (2, 3), (1, 3), (0, 3), (0, 4)]


[[ 1.  2. nan  8.  9.]
 [nan  3. nan  7. -1.]
 [ 0.  4.  5.  6.  0.]
 [ 0. nan nan  0. -1.]
 [ 0.  0.  0.  0.  1.]]


## As we can see, it will always take the shortest path to the reward, depending on where it starts. Now we will try increasing the distant reward significantly to see which one it chooses.

## 3. Setting higher reward

In [None]:
map3 = np.array([[0,0,float('nan'),0,1],[float('nan'), 0, float('nan'), 0, -1],[0, 0, 0, 0, 0], [0, float('nan'), float('nan'), 0, -1], [0, 0, 0, 0, 2]])
print(map3)

[[ 0.  0. nan  0.  1.]
 [nan  0. nan  0. -1.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0. -1.]
 [ 0.  0.  0.  0.  2.]]


In [None]:
rewardsmap3 = (map3 == 0).astype(float)*STEP_REWARD+(map3 == 1 ).astype(float)+(map3 == 2 ).astype(float)+(map3 == -1).astype(float)
print(rewardsmap3)

[[-0.001 -0.001  0.    -0.001  1.   ]
 [ 0.    -0.001  0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001 -0.001]
 [-0.001  0.     0.    -0.001  1.   ]
 [-0.001 -0.001 -0.001 -0.001  1.   ]]


In [None]:
value3 = markov(map3, rewardsmap3)
print(value3)

[[ 6.95983719e-01  8.71229689e-01 -1.00000000e+04  1.17503256e+00
   1.00000000e+00]
 [-1.00000000e+04  1.00328915e+00 -1.00000000e+04  1.34504070e+00
  -1.00000000e+00]
 [ 1.11568777e+00  1.25536147e+00  1.44504070e+00  1.80755092e+00
   1.24504070e+00]
 [ 1.12398625e+00 -1.00000000e+04 -1.00000000e+04  1.92442847e+00
  -1.00000000e+00]
 [ 1.40623282e+00  1.61854278e+00  2.02442847e+00  2.53178562e+00
   2.92542847e+00]]


In [None]:
path3 = trace((4,0), map3, value3)
print(path3)

draw_world(map3, path3)

[(4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]


[[ 0.  0. nan  0.  1.]
 [nan  0. nan  0. -1.]
 [ 0.  0.  0.  0.  0.]
 [ 0. nan nan  0. -1.]
 [ 1.  2.  3.  4.  5.]]


In [None]:
path3 = trace((0,0), map3, value3)
print(path3)

draw_world(map3, path3)

[(0, 0), (0, 1), (1, 1), (2, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)]


[[ 1.  2. nan  0.  1.]
 [nan  3. nan  0. -1.]
 [ 0.  4.  5.  6.  0.]
 [ 0. nan nan  7. -1.]
 [ 0.  0.  0.  8.  9.]]


## As we can see, the second path has changed. By doubling the reward at the bottom, we can see that it now prefers the lower path, even though it previously chose the upper one. The reward influences the surrounding cells, and therefore, to follow the same path, the algorithm prefers the one with a higher reward.


## Question  3

## The algorithm relies on key data to operate within a Markov Decision Process (MDP). The state space (S) identifies all possible situations, the action space (A) specifies the agent's options, and the transition probabilities (P) indicate how actions affect states. The reward function (R) assigns values to state-action interactions, and the discount factor (γ) weighs future rewards.


## Question 4

## In controlled environments, providing accurate information is feasible, but in dynamic contexts, obtaining exact probabilities and rewards can be challenging. In real-world MDPs, approximations and assumptions are often used to simplify the problem.


## Question 5

## Although we have focused on discrete states and actions, the MDP can be adapted to continuous spaces. Techniques such as discretization can be applied to continuous states, and there are specific algorithms for continuous MDPs, extending the applicability of the framework to more diverse domains.