In [9]:
import numpy as np

def policy_evaluation(S, A, SR, P, pi, theta=1e-6, gamma=1.0):
    V = np.zeros((GRID_SIZE+1, GRID_SIZE), dtype=float)

    while True:
        delta = 0.0
        for s in S:
            v = V[s]
            V[s] = sum(pi(s, a) * sum(P(s, a, s_prime, r) * (r + gamma * V[s_prime]) for s_prime, r in SR(s, a)) for a in A)
            delta = max(delta, abs(v - V[s]))

        if delta < theta: break

    return V


def pi(s, a):
    return .25

def get_next_state_and_reward(s, a):
    # New state's transitions: left->12, up->13, right->14, down->15 (itself)
    if s == (4, 1):
        if a == (0, -1):  # left
            return (3, 0), -1
        elif a == (-1, 0):  # up
            return (3, 1), -1
        elif a == (0, 1):  # right
            return (3, 2), -1
        elif a == (1, 0):  # down
            return (4, 1), -1

    if any((s[0] == t[0] and s[1] == t[1]) for t in TERMINAL_STATES):
        return s, 0
    
    next_state = (s[0] + a[0], s[1] + a[1])
    if next_state[0] < 0 or next_state[0] >= GRID_SIZE or next_state[1] < 0 or next_state[1] >= GRID_SIZE:
        next_state = s
    
    return next_state, -1

def transition_prob(s, a, s_prime, r):
    actual_next_state, actual_reward = get_next_state_and_reward(s, a)
    return 1.0 if (s_prime == actual_next_state and r == actual_reward) else 0.0

def SR(s, a):
    next_state, reward = get_next_state_and_reward(s, a)
    return [(next_state, reward)]

GRID_SIZE = 4
TERMINAL_STATES = [(0, 0), (3, 3)]
A = [
    (0, 1),
    (0, -1),
    (1, 0),
    (-1, 0)
]
S = [
    (i, j)
    for i in range(GRID_SIZE)
    for j in range(GRID_SIZE)
]
S.append((4, 1))


V = policy_evaluation(S, A, SR, P=transition_prob, pi=pi, theta=1e-6, gamma=1.0).round(2)
print(V)

[[  0. -14. -20. -22.]
 [-14. -18. -20. -20.]
 [-20. -20. -18. -14.]
 [-22. -20. -14.   0.]
 [  0. -20.   0.   0.]]


In [10]:
import numpy as np

def policy_evaluation(S, A, SR, P, pi, theta=1e-6, gamma=1.0):
    V = np.zeros((GRID_SIZE+1, GRID_SIZE), dtype=float)

    while True:
        delta = 0.0
        for s in S:
            v = V[s]
            V[s] = sum(pi(s, a) * sum(P(s, a, s_prime, r) * (r + gamma * V[s_prime]) for s_prime, r in SR(s, a)) for a in A)
            delta = max(delta, abs(v - V[s]))

        if delta < theta: break

    return V


def pi(s, a):
    return .25

def get_next_state_and_reward(s, a):
    if any((s[0] == t[0] and s[1] == t[1]) for t in TERMINAL_STATES):
        return s, 0

    # New state's transitions: left->12, up->13, right->14, down->15 (itself)
    if s == (4, 1):
        if a == (0, -1):  # left
            return (3, 0), -1
        elif a == (-1, 0):  # up
            return (3, 1), -1
        elif a == (0, 1):  # right
            return (3, 2), -1
        elif a == (1, 0):  # down
            return (4, 1), -1

    next_state = (s[0] + a[0], s[1] + a[1])
    # Allow transitions into the new state from original states (e.g., 13 -> down -> 15)
    if next_state not in S:
        next_state = s

    return next_state, -1

def transition_prob(s, a, s_prime, r):
    actual_next_state, actual_reward = get_next_state_and_reward(s, a)
    return 1.0 if (s_prime == actual_next_state and r == actual_reward) else 0.0

def SR(s, a):
    next_state, reward = get_next_state_and_reward(s, a)
    return [(next_state, reward)]

GRID_SIZE = 4
TERMINAL_STATES = [(0, 0), (3, 3)]
A = [
    (0, 1),
    (0, -1),
    (1, 0),
    (-1, 0)
]
S = [
    (i, j)
    for i in range(GRID_SIZE)
    for j in range(GRID_SIZE)
]
S.append((4, 1))


V = policy_evaluation(S, A, SR, P=transition_prob, pi=pi, theta=1e-6, gamma=1.0).round(2)
print(V)

[[  0. -14. -20. -22.]
 [-14. -18. -20. -20.]
 [-20. -20. -18. -14.]
 [-22. -20. -14.   0.]
 [  0. -20.   0.   0.]]
