## Reinforcement Learning with Value Iteration

These are the same maps from the A\* search notebook, but the "physics" of the world have changed. In the previous notebook, the world was deterministic. When the agent moved "south", it went "south". When it moved "east", it went "east". Now, the agent only succeeds in going where it wants to go *sometimes*. There is a probability distribution over the possible states so that when the agent moves "south", there is a small probability that it will go "east", "north", or "west" instead and have to move from there.

There are a variety of ways to handle this problem. For example, if using A\* search, if the agent finds itself off the solution, you can simply calculate a new solution from where the agent ended up. Although this sounds like a really bad idea, it has actually been shown to work really well in video games that use formal planning algorithms. When these algorithms were first designed, this was unthinkable. Thank you, Moore's Law!

Another approach is to use Reinforcement Learning which covers problems where there is some kind of general uncertainty in the actions. We're going to model that uncertainty a bit unrealistically here to demonstrate how the algorithm works.

As far as RL is concerned, there are a variety of options: model-based and model-free, Value Iteration, Q-Learning and SARSA. Ths notebook demonstrates Value Iteration.

(above description by S. Butcher, 2022)

## The World Representation

The symbols that form the grid have a special meaning as they specify the type of the terrain and the cost to enter a grid cell with that type of terrain:

```
token   terrain    cost 
.       plains     1
*       forest     3
^       hills      5
~       swamp      7
x       mountains  impassible
```

When you go from a plains node to a forest node it costs 3. When you go from a forest node to a plains node, it costs 1. You can think of the grid as a big graph. Each grid cell (terrain symbol) is a node and there are edges to the north, south, east and west (except at the edges).

There are quite a few differences between A\* Search and Reinforcement Learning but one of the most salient is that A\* Search returns a plan of N steps that gets us from A to Z, for example, A->C->E->G.... Reinforcement Learning, on the other hand, returns  a *policy* that tells us the best thing to do in **every state.**

For example, the policy might say that the best thing to do in A is go to C. However, we might find ourselves in D instead. But the policy covers this possibility, it might say, D->E. Trying this action might land us in C and the policy will say, C->E, etc. At least with offline learning, everything will be learned in advance (in online learning, you can only learn by doing and so you may act according to a known but suboptimal policy).

Nevertheless, if you were asked for a "best case" plan from (0, 0) to (n-1, n-1), you could (and will) be able to read it off the policy because there is a best action for every state.

We have the same costs as before. Note that we've negated them this time because RL requires negative costs and positive rewards:

In [1]:
costs = { '.': -1, '*': -3, '^': -5, '~': -7}
costs

{'.': -1, '*': -3, '^': -5, '~': -7}

and a list of offsets for `cardinal_moves`. You'll need to work this into your **actions**, A, parameter.

In [2]:
cardinal_moves = [(0,-1), (1,0), (0,1), (-1,0)]

For Value Iteration, we require knowledge of the *transition* function, as a probability distribution.

The transition function, T, for this problem is 0.70 for the desired direction, and 0.10 each for the other possible directions. That is, if the agent selects "north" then 70% of the time, it will go "north" but 10% of the time it will go "east", 10% of the time it will go "west", and 10% of the time it will go "south". If agent is at the edge of the map, it simply bounces back to the current state.

(above descriptions by S. butcher, 2022)

The following cells implement the value iteration algorithm and return a policy describing what the agent should do in **every** board state. 

## Helpers

In [3]:
import math
from pprint import pprint
from typing import Dict, Tuple, List
from copy import deepcopy

In [4]:
# function provided by S. butcher, 2022
def read_world(filename):
    result = []
    with open(filename) as f:
        for line in f.readlines():
            if len(line) > 0:
                result.append(list(line.strip()))
    return result

<a id="get_rewards"></a>
## get_rewards

*The get_rewards function determines the reward or cost of taking an action at a specific state. This function returns the reward if the action results in the goal state or the cost of moving to that terrain for a non-terminal action.* **Used by**: [value_iteration](#value_iteration)

* **world** List[List[str]]: the game world  
* **current_position** Tuple[int]: the row and column of the current position
* **action** Tuple[int]: the current action provided as an offset to the current position
* **goal** Tuple[int]: the goal state
* **costs** Dict[str, int]: the costs for different terrains in the game world
* **reward** int: the reward for reaching the goal state

**returns** int

In [5]:
def get_reward(world: List[List[str]], current_position: Tuple[int], action: Tuple[int], goal: Tuple[int], costs: dict[str, int], reward: int) -> int:
    row, col = current_position[0], current_position[1]
    position = (row + action[0], col + action[1])
    if position == goal:
        return reward
    world_state = world[position[0]][position[1]]
    return costs[world_state]

In [6]:
#assertions/ unit tests
test_world = [
    ["*", "*", "*"],
    ["~", "~", "."],
    [".", ".", "."]
]

assert get_reward(test_world, (0, 0), (0, 1), (2, 2), costs, 10) == -3
# goal state
assert get_reward(test_world, (2, 1), (0, 1), (2, 2), costs, 10) == 10
assert get_reward(test_world, (0, 0), (1, 0), (2, 2), costs, 10) == -7

<a id="get_valid_actions"></a>
## get_valid_actions

*The get_valid_actions function takes a list of possible actions, the current position (as row and col parameters), and the world. The function applies each action to the current state and returns a list of actions that do not result in an out-of-bounds or invalid position.* **Used by**: [value_iteration](#value_iteration)

* **actions** List[Tuple[int]]: the possible actions as a list of tuples containing the position offsets for the action (ex. (-1, 0) is the up action)
* **current_position** Tuple[int]: the row and column of the current state
* **world** List[List[str]]: the game world  

**returns** List[Tuple[int]]

In [7]:
def get_valid_actions(actions: List[Tuple[int]], current_position: Tuple[int], world: List[List[int]]) -> List[Tuple[int]]:
    row, col = current_position[0], current_position[1]
    valid_actions = []
    for action in actions:
        position = (row + action[0], col + action[1])
        if position[0] < len(world) and position[0] >= 0 and position[1] < len(world[0]) and position[1] >= 0 and world[position[0]][position[1]] != 'x':
            valid_actions.append(action)
    return valid_actions

In [8]:
#assertions/ unit tests
test_world = [
    ["*", "*", "*"],
    ["~", "~", "."],
    [".", ".", "."]
]

assert get_valid_actions(cardinal_moves, (0,0), test_world) == [(1,0), (0, 1)]
assert get_valid_actions(cardinal_moves, (1,1), test_world) == cardinal_moves
assert get_valid_actions(cardinal_moves, (2,0), test_world) == [(0,1), (-1, 0)]

<a id="get_max_delta"></a>
## get_max_delta

*The get_max_delta function takes the values produced by the current iteration of the value iteration algorithm and compares them (state by state) to the values produced by the previous iteration. The function returns the maximum change (of any board state) between the previous and current iterations.* **Used by**: [value_iteration](#value_iteration)

* **v** List[List[float]]: the values produced by the current iteration of the value iteration algorithm
* **v_previous** List[List[float]]: the values produced by the previous iteration of the value iteration algorithm

**returns** float

In [9]:
def get_max_delta(v: List[List[float]], v_previous: List[List[float]]) -> float:
    differences = []
    for sublist1, sublist2 in zip(v, v_previous):
        for num1, num2 in zip(sublist1, sublist2):
            differences.append(abs(num1 - num2))
    return max(differences)

In [10]:
#assertions/ unit tests
prev_test_values_1 = [[2, 3],[4, 5]]
test_values_1 = [[1, -2], [1, 1]]

assert get_max_delta(test_values_1, prev_test_values_1) == 5

prev_test_values_2 = [[2, 3],[4, 5]]
test_values_2 = [[2, 3], [4, 5]]

assert get_max_delta(test_values_2, prev_test_values_2) == 0

prev_test_values_3 = [[2, 3],[4, 5]]
test_values_3 = [[3, 3], [4, 5]]

assert get_max_delta(test_values_3, prev_test_values_3) == 1

<a id="apply_transition_model"></a>
## apply_transition_model

*The apply_transition_model function calculates the value of an action in the current state. This transition model is for the stochastic value iteration algorithm, and it applies the prob_desired and prob_other parameters in the value calculation to account for the agent making an unexpected move in the current state. The algorithm calculates the value for the expected action as (prob_desired * the value of the resulting state in v_previous). The algorithm also calculates the summation of (prob_other * the value of the resulting state in v_previous) for all actions other than the current action. Not all actions are valid moves. Therefore, this function makes a static calculation of (prob_other * -10) for invalid moves from the current state, such as going out-of-bounds or moving to an invalid board state. The function adds these static calculations to the summation of unexpected actions. The function returns gamma (the discount factor) multiplied by (expected + unexpected).* **Used by**: [value_iteration](#value_iteration)

* **gamma** float: the discount factor
* **current_position** Tuple[int]: the row and column of the current state
* **v_previous** List[List[float]]: the values produced by the previous iteration of the value iteration algorithm
* **prob_desired** float: the probability of the agent taking the current action
* **prob_other** float: the probability of the agent taking an action that is not the current action
* **action** Tuple[int]: the current action
* **valid_actions** List[Tuple[int]]: a list of valid actions from the current state
* **actions** List[Tuple[int]]: a list of all possible 

**returns** float

In [11]:
def apply_transition_model(gamma: float, current_position: Tuple[int], v_previous: List[List[float]], prob_desired: float, prob_other: float, action: Tuple[int], valid_actions: List[Tuple[int]], actions: List[Tuple[int]]) -> float:
    row, col = current_position[0], current_position[1]
    position = (row + action[0], current_position[1] + col)
    expected = prob_desired * v_previous[row+action[0]][col+action[1]]
    unexpected = sum([prob_other * v_previous[row+curr_action[0]][col+curr_action[1]] for curr_action in valid_actions if curr_action != action])
    # calculate the probability of bouncing off an invalid action as a static cost greater than the worst valid action
    for i in range(len(actions) - len(valid_actions)):
        unexpected += prob_other * -10
    return gamma * (expected + unexpected)

In [12]:
#assertions/ unit tests
v_previous_1 = [[5.00, 0.00, 0.00, 0.00, 0.00, 0.00, 5.00]]

assert apply_transition_model(0.9, (0, 1), v_previous_1, 0.9, 0.1, (0, -1), [(0, -1), (0, 1)], [(0, -1), (0, 1)]) == 4.05

v_previous_2 = [[5.00, 4.05, 0.00, 0.00, 0.00, 4.05, 5.00]]

assert apply_transition_model(0.9, (0, 2), v_previous_2, 0.9, 0.1, (0, -1), [(0, -1), (0, 1)], [(0, -1), (0, 1)]) == 3.2805

v_previous_3 = [[5.00, 4.05, 3.2805, 0.00, 3.2805, 4.05, 5.00]]

assert round(apply_transition_model(0.9, (0, 3), v_previous_3, 0.9, 0.1, (0, -1), [(0, -1), (0, 1)], [(0, -1), (0, 1)]), 2) == 2.95

<a id="value_iteration"></a>
## value_iteration

*The value_iteration function applies the stochastic value iteration algorithm to return a policy reflecting the best move for every board state, given a goal state. The value iteration algorithm finds the optimal path to a goal when run until convergence. The algorithm terminates when the maximum change between any cell's previous and current values is less than the epsilon parameter.*

* **world** List[List[str]]: the current world
* **costs** Dict[str, int]: the costs of non-terminal actions
* **goal** Tuple[int]: the coordinates of the goal state
* **rewards** int: the reward or payout for reaching the goal state
* **actions** List[Tuple[int]]: a list of all possible actions
* **gamma** float: the discount factor used in value calculations
* **epsilon** float: the rate of change that determines convergence. The algorithm returns a policy when the maximum change between the previous values and the current values is less than epsilon. 

**returns** float

---

In [13]:
def value_iteration(world, costs, goal, rewards, actions, gamma, epsilon):
    v_previous = [[0 for i in world[0]] for row in world]
    iterations = 0
    while True:
        policy = {}
        v = [[0 for i in world[0]] for row in world]
        for row in range(len(world)):
            for col in range(len(world[0])):
                q = {}
                if world[row][col] == "x":
                    policy[(row, col)] = None
                    continue
                if (row, col) == goal:
                    policy[(row, col)] = (0, 0)
                    continue
                    
                valid_actions = get_valid_actions(actions, (row, col), world)
                for action in valid_actions:
                    action_payout = get_reward(world, (row, col), action, goal, costs, reward)
                    q[action] = action_payout + apply_transition_model(gamma, (row, col), v_previous, 0.70, 0.10, action, valid_actions, actions)
                policy[(row, col)] = max(q, key=lambda k: q[k])
                v[row][col] = q[policy[(row, col)]]
        max_delta = get_max_delta(v, v_previous)
        if max_delta < epsilon:
            print(f"Iterations: {iterations}, Maximum Change: {max_delta}") 
            return policy
        iterations += 1
        v_previous = deepcopy(v)

<a id="pretty_print_policy"></a>
## pretty_print_policy

*The pretty_print_policy function takes the policy returned by the value iteration algorithm and prints the action the agent should take for every board state.*

* **cols** int: the number of columns in the world
* **rows** int: the number of rows in the world
* **policy** Dict[Tuple[int], Tuple[int]: a policy that maps every board state to the best action for that state
* **goal** Tuple[int]: the goal state

**returns** None

In [14]:
def pretty_print_policy(cols: int, rows: int, policy: Dict[Tuple[int], Tuple[int]], goal: Tuple[int]) -> None:
    symbols = {(0, 1): '⏩', (0, -1): '⏪', (-1, 0): '⏫', (1, 0): '⏬', (0, 0): '🎁'}
    output = [[0 for col in range(cols)] for row in range(rows)]
    for position, action in policy.items():
        row, col = position[0], position[1]
        output[row][col] = '❌' if action == None else symbols[action]
    # print the map by unpacking the list at each row
    for row in output:
        print(*row)    

## Value Iteration

### Small World

In [15]:
small_world = read_world( "Datasets/small.txt")

In [16]:
from pprint import pprint
goal = (len(small_world)-1, len(small_world[0])-1)
gamma = 0.9
reward = 10
small_policy = value_iteration(small_world, costs, goal, reward, cardinal_moves, gamma, 0.01)
assert len(small_policy) == len(small_world) * len(small_world[0])

Iterations: 23, Maximum Change: 0.007250134098737426


In [17]:
cols = len(small_world[0])
rows = len(small_world)
pretty_print_policy(cols, rows, small_policy, goal)

⏬ ⏩ ⏩ ⏩ ⏩ ⏬
⏬ ⏪ ⏫ ⏫ ⏩ ⏬
⏬ ⏪ ⏬ ⏩ ⏩ ⏬
⏬ ⏪ ⏬ ❌ ⏩ ⏬
⏬ ⏬ ⏬ ⏬ ⏬ ⏬
⏩ ⏩ ⏩ ⏩ ⏬ ⏬
⏩ ⏩ ⏩ ⏩ ⏩ 🎁


### Large World

In [18]:
large_world = read_world("Datasets/large.txt")

In [19]:
goal = (len(large_world)-1, len(large_world[0])-1) # Lower Right Corner FILL ME IN
gamma = 0.9
reward = 30000
large_policy = value_iteration(large_world, costs, goal, reward, cardinal_moves, gamma, 0.01)

Iterations: 100, Maximum Change: 0.00822413675506084


In [20]:
cols = len(large_world[0])
rows = len(large_world)

pretty_print_policy( cols, rows, large_policy, goal)

⏬ ⏬ ⏬ ⏬ ⏪ ⏪ ⏬ ⏩ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏪ ⏪ ⏪ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬
⏬ ⏬ ⏬ ⏪ ⏪ ⏩ ⏪ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏪ ❌ ❌ ❌ ❌ ❌ ❌ ❌ ⏬ ⏬
⏬ ⏬ ⏬ ⏪ ❌ ❌ ⏩ ⏩ ⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ❌ ❌ ❌ ⏬ ⏬ ⏪ ❌ ❌ ⏬ ⏬
⏬ ⏬ ⏪ ⏪ ⏪ ❌ ❌ ❌ ⏬ ⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏪ ❌ ❌ ⏬ ⏬
⏬ ⏪ ⏪ ⏪ ⏪ ❌ ❌ ⏬ ⏬ ⏬ ⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏬ ❌ ❌ ❌ ⏬ ⏬
⏬ ⏪ ⏫ ⏪ ❌ ❌ ⏬ ⏬ ⏬ ⏬ ⏪ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ❌ ⏬ ⏬ ⏪
⏬ ⏬ ⏬ ❌ ❌ ⏩ ⏩ ⏬ ⏬ ⏬ ⏪ ⏪ ❌ ❌ ❌ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏬ ⏪
⏩ ⏬ ⏬ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏪ ⏪ ⏪ ⏪ ⏪ ❌ ❌ ❌ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏪
⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏪ ⏪ ⏪ ⏪ ⏪ ⏪ ❌ ❌ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏪
⏩ ⏩ ⏫ ⏩ ⏩ ⏩ ⏩ ⏬ ⏪ ⏪ ⏪ ❌ ❌ ❌ ❌ ⏩ ⏩ ⏬ ⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏪
⏩ ⏫ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏪ ❌ ❌ ❌ ⏬ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ❌ ❌ ❌ ⏩ ⏬ ⏬ ⏪
⏫ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ❌ ❌ ⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ❌ ❌ ⏬ ⏬ ⏬ ⏪
⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ❌ ❌ ⏬ ⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ❌ ⏩ ⏬ ⏬ ⏬ ⏪
⏬ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏪ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏪
⏩ ⏩ ⏩ ⏫ ❌ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ⏪ ⏫ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏫ ⏫ ❌ ⏩ ⏬ ⏬ ⏪
⏩ ⏫ ⏫ ❌ ❌ ❌ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ❌ ❌ ❌ ⏫ ⏩ ⏫ ⏫ ⏫ ⏫ ❌ ❌ ⏬ ⏬ ⏬ ⏪
⏩ ⏫ ❌ ❌ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ❌ ❌ ❌ ⏫ ⏫ ❌ ❌ ❌ ⏬ ⏬ ⏬ ⏬ ⏪
⏩ ⏬ ⏬ ❌ ❌ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏬ ⏬ ⏬ ⏬ ❌ ❌ ❌ ❌ ⏬ ⏬ ⏬ ⏬ ⏬ ⏬ ⏪
⏩ ⏬ ⏬ ❌ ❌ ❌ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ ⏩ 