In [1]:
import numpy as np
from grid import Grid

g = Grid()
g.print()

-------------------------------
| 0,0 | 0,1 | 0,2 | 0,3 | 0,4 | 
-------------------------------
| 1,0 | 1,1 | 1,2 | 1,3 | 1,4 | 
-------------------------------
| 2,0 | 2,1 | 2,2 | 2,3 | 2,4 | 
-------------------------------
| 3,0 | 3,1 | 3,2 | 3,3 | 3,4 | 
-------------------------------
| 4,0 | 4,1 | 4,2 | 4,3 | 4,4 | 
-------------------------------


# Transitions Tests

In [2]:
pos = (0,0)
custom_grid = g.grid.copy()
custom_grid[pos] = 0.1
g.print(custom_grid)
print()

custom_grid = g.grid.copy()
s,r =  g.transition(pos, g.east)
custom_grid[s] = r+0.1
print(s)
g.print(custom_grid)

-------------------------------
| 0.1 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------

(0, 1)
-------------------------------
| 0.0 | 0.1 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------
| 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 
-------------------------------


  if display == 'index':


# Value Function - System of linear  functions

In [3]:

import numpy as np

pi = np.ones((g.size[0], g.size[1], len(g.ACTIONS)))
pi.fill(0.25)

def v(state=None, pi=None, gamma=0.9):
    A = -1 * np.eye(g.size[0] * g.size[1])
    b = np.zeros(g.size[0] * g.size[1])

    for i in range(g.size[0]):
        for j in range(g.size[1]):
            s = (i, j)  # for each state s
            index_s = np.ravel_multi_index(s, g.size)
            for a, action in enumerate(g.ACTIONS):
                s_, r = g.transition(s, action)
                index_s_ = np.ravel_multi_index(s_, g.size)
                pi_a_s = pi[s][a]
                A[index_s, index_s_] += pi_a_s * gamma
                b[index_s] -= pi_a_s * r
    v_matrix = np.round(np.linalg.solve(A, b).reshape(*g.size), decimals=2)
    if state is None:
        return v_matrix
    else:
        return v_matrix[state]

print(v(pi=pi))
print(v((0,1), pi=pi))

[[ 3.31  8.79  4.43  5.32  1.49]
 [ 1.52  2.99  2.25  1.91  0.55]
 [ 0.05  0.74  0.67  0.36 -0.4 ]
 [-0.97 -0.44 -0.35 -0.59 -1.18]
 [-1.86 -1.35 -1.23 -1.42 -1.98]]
8.79


# Policy Evaluation - System of functions

In [4]:
pi = np.ones((g.size[0], g.size[1], len(g.ACTIONS)))
pi.fill(0.25)

def policy_eval(pi, gamma=0.9):
    A = -1 * np.eye(g.size[0] * g.size[1])
    b = np.zeros(g.size[0] * g.size[1])

    for i in range(g.size[0]):
        for j in range(g.size[1]):
            s = (i, j)  # for each state s
            index_s = np.ravel_multi_index(s, g.size)
            for a, action in enumerate(g.ACTIONS):
                s_, r = g.transition(s, action)
                index_s_ = np.ravel_multi_index(s_, g.size)
                pi_a_s = pi[s][a]
                A[index_s, index_s_] += pi_a_s * gamma
                b[index_s] -= pi_a_s * r
    return np.round(np.linalg.solve(A, b).reshape(*g.size), decimals=2)

pi = np.ones((g.size[0], g.size[1], len(g.ACTIONS)))
pi.fill(0.25)

V=policy_eval(pi)

print(V)

[[ 3.31  8.79  4.43  5.32  1.49]
 [ 1.52  2.99  2.25  1.91  0.55]
 [ 0.05  0.74  0.67  0.36 -0.4 ]
 [-0.97 -0.44 -0.35 -0.59 -1.18]
 [-1.86 -1.35 -1.23 -1.42 -1.98]]


# Policy Evaluation - Iteractive Police Evaluation 

In [5]:


# bellman_expectation = lambda V, pi_s, s, p_s=1, gamma=0.9: sum(list(map(lambda a, s_, r: pi[s][a]*p_s*(r + gamma*V[s_]), *list(zip(*[(a, *g.transition(s, action)) for a, action in enumerate(g.ACTIONS)])))))              
def bellman_expectation(V, pi_s, s, p_s=1, gamma=0.9):
    value = 0
    for a, action in enumerate(g.ACTIONS):
        s_, r = g.transition(s, action)
        value += pi_s[a] * p_s*(r + gamma*V[s_])
    return value

def policy_eval(V=None, pi=None, theta=1e-4, gamma=0.9, inplace=True):
    newV = V if inplace else np.zeros_like(V)
    while True:
        delta=0
        for i in range(g.size[0]):
            for j in range(g.size[1]):
                s = (i, j) 
                v = newV[s]
                newV[s] = bellman_expectation(newV, pi[s], s)
                delta = max(delta, np.abs(v-newV[s]))
        if delta < theta: 
            return np.round(newV, decimals=2, out=newV)

V = np.zeros(g.size)
pi = np.ones((g.size[0], g.size[1], len(g.ACTIONS)))
pi.fill(0.25)
policy_eval(V, pi)

print(V)

[[ 3.31  8.79  4.43  5.32  1.49]
 [ 1.52  2.99  2.25  1.91  0.55]
 [ 0.05  0.74  0.67  0.36 -0.4 ]
 [-0.97 -0.44 -0.35 -0.59 -1.18]
 [-1.86 -1.34 -1.23 -1.42 -1.97]]


# Policy Iteration

In [6]:


def policy_eval(V=None, pi=None, theta=1e-4, gamma=0.9, inplace=True):
    newV = V if inplace else np.zeros_like(V)
    while True:
        delta=0
        for i in range(g.size[0]):
            for j in range(g.size[1]):
                s = (i, j) 
                v = newV[s]
                s_, r = g.transition(s, g.ACTIONS[pi[s]])
                newV[s] = r + gamma*V[s_]
                delta = max(delta, np.abs(v-newV[s]))
        if delta < theta: 
            return np.round(newV, decimals=2, out=newV)

V = np.zeros(g.size)
pi = np.zeros(g.size, dtype=int)
policy_eval(V, pi)

print(V)

[[-10.    24.42 -10.    18.45 -10.  ]
 [ -9.    21.98  -9.    16.61  -9.  ]
 [ -8.1   19.78  -8.1   14.94  -8.1 ]
 [ -7.29  17.8   -7.29  13.45  -7.29]
 [ -6.56  16.02  -6.56  12.11  -6.56]]


In [7]:
def policy_iteration(V, pi):
    bellman_expectation_for_action = lambda V, s_, r, p_s=1, gamma=0.9: p_s*(r + gamma*V[s_])              

    policy_stable = False
    while not policy_stable:
        policy_eval(V, pi)
        for i in range(g.size[0]):
            for j in range(g.size[1]):
                s = (i,j)
                old_action = pi[s]
                acts = [bellman_expectation_for_action(V, *g.transition(s, a)) for a in g.ACTIONS]
                pi[s] = np.argmax(acts)
                if old_action == pi[s]:
                    policy_stable = True
    return V, pi

V = np.zeros(g.size)
pi = np.zeros(g.size, dtype=int)
policy_iteration(V, pi)
print(V)
pi

[[-10.    24.42 -10.    18.45 -10.  ]
 [ -9.    21.98  -9.    16.61  -9.  ]
 [ -8.1   19.78  -8.1   14.94  -8.1 ]
 [ -7.29  17.8   -7.29  13.45  -7.29]
 [ -6.56  16.02  -6.56  12.11  -6.56]]


array([[2, 0, 3, 0, 3],
       [2, 0, 3, 0, 3],
       [2, 0, 3, 0, 3],
       [2, 0, 3, 0, 3],
       [2, 0, 3, 0, 3]])

In [8]:
def print_policy(policy):
    maps = {g.west:"\u2190", g.north:"\u2191", g.east:"\u2192", g.south:"\u2193"}
    row = [[maps[g.ACTIONS[i]] for i in line] for line in policy]
    render = [' '.join(line) for line in row]
    print('\n'.join(render))
    
print_policy(pi)

→ ↑ ← ↑ ←
→ ↑ ← ↑ ←
→ ↑ ← ↑ ←
→ ↑ ← ↑ ←
→ ↑ ← ↑ ←


In [9]:
V = np.zeros(g.size)
pi = np.zeros(g.size, dtype=int)

for _ in range(100):
    policy_iteration(V, pi)
    # print(V)
    # print(pi)
    # print_policy(pi)

print(V)
print(pi)
print_policy(pi)

[[21.98 24.42 21.98 19.42 17.48]
 [19.78 21.98 19.78 17.8  16.02]
 [17.8  19.78 17.8  16.02 14.42]
 [16.02 17.8  16.02 14.42 12.98]
 [14.42 16.02 14.42 12.98 11.68]]
[[2 0 3 0 3]
 [0 0 0 3 3]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
→ ↑ ← ↑ ←
↑ ↑ ↑ ← ←
↑ ↑ ↑ ↑ ↑
↑ ↑ ↑ ↑ ↑
↑ ↑ ↑ ↑ ↑
