# 一、 Policy iteration

In [1]:
import numpy as np

from copy import copy


def policy_eval(env, values, policies, upper_bound):
    iteration=0
    delta=upper_bound
    while delta>=upper_bound:
        delta=0
        tempv=np.zeros(env.state_space)
        for s in env.states:
            probs=policies.retrieve(s)
            #print('probs={}'.format(probs))
            v=0
            for action in env.actions:
                env.set_state(s)
                _,done,reward,state=env.step(act=action)
                v+=probs[action]*(reward+env.gamma*values.get(state)[0])
            tempv[s]=v
            delta=max(delta,abs(v-values.get(s)))
        for s in env.states: 
            values.update(s=s,value=tempv[s])
        iteration += 1
        # print('\r> iteration: {} delta: {}'.format(iteration, delta), flush=True, end="")
        print('\r> delta={},iteration={}'.format(delta,iteration), flush=True, end="")
    
    ## 遍历棋盘中的每一个点，更新Value Function
    ##                         Bellman Equation
    


In [2]:
def policy_improve(env, values, policies):
    ## 遍历棋盘中的每一个点，更新Policy := action指向max(value(s'))
    stable=1
    for s in env.states:
        maximun=0.
        point=0
        r=0
        for i,action in enumerate(env.actions):
            env.set_state(s)
            _,done,reward,state=env.step(act=action)
            val=values.get(state[0])
            #print('val={}'.format(val))
            #print('s={},i={},val={}'.format(s,i,val))
            if val>maximun:
                maximun=val
                #print('max={},equal={}'.format(maximun,maximun==val))
                point=i
                r=reward[0]
            elif val==maximun:
                if reward[0]>r:
                    maximun=val
                    point=i
                    r=reward[0]
            #print('i={},val={},maximun={},point={}'.format(i,val,maximun,point))
        #print('point={}'.format(point))
        probs=policies.retrieve(s)
        new_policy = [0.] * env.action_space
        new_policy[point] = 1.
        policies.update(s, new_policy)
        if probs==new_policy:
            stable=1
        else:
            stable=0
    return stable
    



        


# 二、value Iteration

In [3]:
def value_iter(env, values, upper_bound):
    ## 遍历棋盘中的每一个点，向max(value(s'))更新，并且更新更新Policy := action指向max(value(s'))
    ##                       这个意思是Bellman Optimal Equation
    delta=upper_bound+1
    iteration=0
    while delta>=upper_bound:
        delta=0
        tempv=np.zeros(env.state_space)
        for s in env.states:
            maximun=0
            probs=policies.retrieve(s)
            for i,action in enumerate(env.actions):
                env.set_state(s)
                _,done,reward,state=env.step(act=action)
                v=reward[0]+env.gamma*values.get(state)[0]
                #print('v={}'.format(v))
                maximun=max(maximun,v)
            tempv[s]=maximun
            #print('maximun={}'.format(maximun))
            delta=max(delta,abs(maximun-values.get(s)))
        for s in env.states: 
            values.update(s=s,value=tempv[s])
        iteration += 1
    print('\r> delta={},iteration={}'.format(delta,iteration), flush=True, end="")
        
    
                    
                    



In [4]:
class Env:
    def __init__(self):
        self._states = set()
        self._state = None
        self._actions = []
        self._gamma = None
        
    @property
    def states(self):
        return self._states
    
    @property
    def state_space(self):
        return self._state_shape
    
    @property
    def actions(self):
        return self._actions
    
    @property
    def action_space(self):
        return len(self._actions)
    
    @property
    def gamma(self):
        return self._gamma
    
    def _world_init(self):
        raise NotImplementedError
        
    def reset(self):
        raise NotImplementedError
    
    def step(self, state, action):
        """Return distribution and next states"""
        raise NotImplementedError
        
    def set_state(self, state):
        self._state = state


class MatrixEnv(Env):
    def __init__(self, height=4, width=4):
        super().__init__()
        
        self._action_space = 4
        self._actions = list(range(4))
        
        # self._state_shape = (2,)
        self._state_shape = (height, width)
        self._states = [(i, j) for i in range(height) for j in range(width)]
        
        self._gamma = 0.9
        self._height = height
        self._width = width

        self._world_init()
        
    @property
    def state(self):
        return self._state
    
    @property
    def gamma(self):
        return self._gamma
    
    def set_gamma(self, value):
        self._gamma = value
        
    def reset(self):
        self._state = self._start_point
        
    def _world_init(self):
        # start_point
        self._start_point = (0, 0)
        self._end_point = (self._height - 1, self._width - 1)
        
    def _state_switch(self, act):
        # 0: h - 1, 1: w + 1, 2: h + 1, 3: w - 1
        if act == 0:  # up
            self._state = (max(0, self._state[0] -1), self._state[1])
        elif act == 1:  # right
            self._state = (self._state[0], min(self._width - 1, self._state[1] + 1))
        elif act == 2:  # down
            self._state = (min(self._height - 1, self._state[0] + 1), self._state[1])
        elif act == 3:  # left
            self._state = (self._state[0], max(0, self._state[1] - 1))

    def step(self, act):

        ## Wrong here
        assert 0 <= act <= 3
        
        done = False
        reward = 0.

        self._state_switch(act)
        
        if self._state == self._end_point:
            reward = 1.
            done = True

        return None, done, [reward], [self._state]

In [5]:
class ValueTable:
    def __init__(self, env):
        self._values = np.zeros(env.state_space)
        
    def update(self, s, value):
        self._values[s] = value
        
    def get(self, state):
        if type(state) == list:
            # loop get
            res = [self._values[s] for s in state]
            return res
        elif type(state) == tuple:
            # return directly
            return self._values[state]

In [6]:
from collections import namedtuple

Pi = namedtuple('Pi', 'act, prob')


class Policies:
    def __init__(self, env: Env):
        self._actions = env.actions
        self._default_policy = [1 / env.action_space] * env.action_space
        self._policies = dict.fromkeys(env.states, Pi(self._actions, self._default_policy))
    
    def sample(self, state):
        if self._policies.get(state, None) is None:
            self._policies[state] = Pi(self._actions, self._default_policy)

        policy = self._policies[state]
        return np.random.choice(policy.act, p=policy.prob)
    
    def retrieve(self, state):
        return self._policies[state].prob
    
    def update(self, state, policy):
        self._policies[state] = self._policies[state]._replace(prob=policy)

In [7]:
# import time

# env = MatrixEnv(width=8, height=8)  # TODO(ming): try different word size
# policies = Policies(env)
# values = ValueTable(env)
# upper_bound = 1e-4

# stable = False

# policy_eval(env, values, policies, upper_bound)

# start = time.time()
# # while not stable:
# #     policy_eval(env, values, policies, upper_bound)
# #     break
# #     stable = policy_improve(env, values, policies)
# # end = time.time()
# # print('\n[time consumpution]: {} s'.format(end - start))

# if 1 == 1:

#     done = False
#     rewards = 0
#     env.reset()
#     step = 0

#     while not done:
#         act_index = policies.sample(env.state)
#         _, done, reward, next_state = env.step(env.actions[act_index])
#         rewards += sum(reward)
#         step += 1

#     print('Evaluation: [reward] {} [step] {}'.format(rewards, step))
import time

env = MatrixEnv(width=8, height=8)  # TODO(ming): try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4

stable = False

start = time.time()
while not stable:
    policy_eval(env, values, policies, upper_bound)
    stable = policy_improve(env, values, policies)
end = time.time()

print('\n[time consumpution]: {} s'.format(end - start))

done = False
rewards = 0
env.reset()
step = 0
print('start->',end="")
while not done:
    act_index = policies.sample(env.state)
    _, done, reward, next_state = env.step(env.actions[act_index])
    rewards += sum(reward)
    print('{}->'.format(next_state), end='') if not done else print('end')
    step += 1
# print('\b'*8 + 'end')
print()
print('Evaluation: [reward] {} [step] {}'.format(rewards, step))

> delta=[9.35741733e-05],iteration=87
[time consumpution]: 0.46455955505371094 s
start->[(0, 1)]->[(0, 2)]->[(0, 3)]->[(0, 4)]->[(0, 5)]->[(0, 6)]->[(0, 7)]->[(1, 7)]->[(2, 7)]->[(3, 7)]->[(4, 7)]->[(5, 7)]->[(6, 7)]->end

Evaluation: [reward] 1.0 [step] 14


In [15]:
env = MatrixEnv(width=8, height=8)  # try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4

start = time.time()

value_iter(env, values, upper_bound)
_ = policy_improve(env, values, policies)

end = time.time()

print('\n[time consumption] {}s'.format(end - start))
# print("===== Render =====")
env.reset()
done = False
rewards = 0
step = 0
while not done:
    act_index = policies.sample(env.state)
    _, done, reward, next_state = env.step(env.actions[act_index])
    rewards += sum(reward)
    #print('{}->'.format(next_state), end='') if not done else print('end')
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))

> delta=9.404610870067387e-05,iteration=89
[time consumption] 0.08078312873840332s
Evaluation: [reward] 1.0 [step] 14
