In [31]:
import numpy as np

# Grid world size
grid_size = 5
# Initialize value function
V = np.zeros((grid_size, grid_size))
V_new = np.zeros_like(V)
# Reward for each state
R = np.zeros((grid_size, grid_size))
# Terminal states
R[3, 2] = 1
# Forbidden states
R[1, 1] = R[1, 2] = R[2, 2] = R[3, 1] = R[3, 3] = R[4, 1] = -1
# Discount factor
gamma = 0.9

# Define a deterministic policy
def policy(state):
    x, y = state
    if y == 0 and x != 0:
        return 'up'
    if x == 0 and y != 4:
        return 'right'
    if y == 4 and x != 4:
        return 'down'
    if state in [(1,1), (4,2)]:
        return 'up'
    if state in [(1,2),(1,3),(2,3),(3,1),(4,1)]:
        return 'right'
    if state in [(2,1),(3,3),(4,3),(4,4)]:
        return  'left'
    if state in [(2,2)]:
        return 'down'
    if state in [(3,2)]:
        return 'stop'

def get_next_state(state, action):
    x, y = state
    if action == 'up':
        return (max(x-1, 0), y)
    elif action == 'down':
        return (min(x+1, grid_size-1), y)
    elif action == 'left':
        return (x, max(y-1, 0))
    elif action == 'right':
        return (x, min(y+1, grid_size-1))
    elif action == 'stop':
        return (x, y)

# Policy evaluation using value iteration
delta = 1
theta = 1e-4

while delta > theta:
    delta = 0
    for x in range(grid_size):
        for y in range(grid_size):
            v = V[x, y]
            action = policy((x, y))
            next_state = get_next_state((x, y), action)
            x_n, y_n = next_state
            new_v = R[x_n, y_n] + gamma * V[next_state]  # 这里R理论上应该和s a s‘都有关 简化起见 只是给了固定值
            V_new[x, y] = new_v
            delta = max(delta, abs(v - new_v))
    V = V_new

print("Value function:")
print(np.round(V, decimals=1)) # bad policy就要重写reward了 

Value function:
[[ 3.5  3.9  4.3  4.8  5.3]
 [ 3.1  3.5  4.8  5.3  5.9]
 [ 2.8  2.5 10.   5.9  6.6]
 [ 2.5 10.  10.  10.   7.3]
 [ 2.3  9.  10.   9.   8.1]]


In [2]:
import numpy as np
import random

# Define a upgradeable policy
class Policy:
    def __init__(self, grid_size):
        # 0 ↑       1 →         2 ↓         3 ←         4: stop
        self.policy = {}
        # for x in range(grid_size): # 无需初始化策略 但gamma降低到0的话 需要随机初始化 否则有几个点没有策略了
        #     for y in range(grid_size):
        #         self.policy.update({(x, y): random.randint(0,4)})
    
    def upgrade(self, x, y, action):
        self.policy[x, y] = action

class Reward:
    def __init__(self, grid_size):
        self.grid_size = grid_size
        self.R = np.zeros((grid_size, grid_size))
        # Terminal states
        self.R[3, 2] = 1
        # Forbidden states
        self.R[1, 1] = self.R[1, 2] = self.R[2, 2] = self.R[3, 1] = self.R[3, 3] = self.R[4, 1] = -1
        # Boundary action
    def __call__(self, state, action, next_state):
        x, y = state
        x_n, y_n = next_state
        if x == 0 and action == 0:
            return -1 # 最上面往上走
        elif x == self.grid_size-1 and action == 2:
            return -1 # 最下面往下走
        elif y == 0 and action == 3:
            return  -1 # 最左边往左走
        elif y == self.grid_size-1 and action == 1:
            return  -1 # 最右边往右走
        elif (x == 0 and y == 0 and action == 3) \
                or (x == 0 and y == self.grid_size-1 and action == 1) \
                or (x == self.grid_size-1 and y == 0 and action == 3) \
                or (x == self.grid_size-1 and y == self.grid_size-1 and action == 1):
            return -1
        else:
            return self.R[x_n, y_n]

def get_next_state(state, action, grid_size):
    x, y = state
    if action == 0:
        return (max(x-1, 0), y)
    elif action == 1:
        return (x, min(y+1, grid_size-1))
    elif action == 2:
        return (min(x+1, grid_size-1), y)
    elif action == 3:
        return (x, max(y-1, 0))
    elif action == 4:
        return (x, y)

# Grid world size
grid_size = 5
# Initialize value function
V = np.zeros((grid_size, grid_size))
V_new = np.zeros_like(V)
# Discount factor
gamma = 0.9
# Policy evaluation using value iteration
delta = 1
theta = 1e-9
policy = Policy(grid_size)
R = Reward(grid_size)

while delta > theta:
    delta = 0
    for x in range(grid_size):
        for y in range(grid_size):
            v = V[x, y]
            max_v = 0
            for action in range(0,5): # 遍历所有策略 换句话说 策略固定 action value 就是 state value
                next_state = get_next_state((x, y), action, grid_size)
                new_v = R((x, y), action, next_state) + gamma * V[next_state]
                if new_v > max_v: 
                    policy.upgrade(x, y, action)
                    max_v = new_v
            V_new[x, y] = max_v
            delta = max(delta, abs(v - max_v))
    V = V_new

print("Value function:")
print(np.round(V, decimals=1))


Value function:
[[ 5.8  5.6  6.2  6.5  5.8]
 [ 6.5  7.2  8.   7.2  6.5]
 [ 7.2  8.  10.   8.   7.2]
 [ 8.  10.  10.  10.   8. ]
 [ 7.2  9.  10.   9.   8.1]]


上面的算法又叫 Value iteration