# 基于模型的动态规划算法

In [1]:
import gym
import random
from gym.envs.classic_control.grid_mdp import GridEnv

In [2]:
class Test:
    def __init__(self, grid_mdp_ob: GridEnv):
        self.pi = dict()
        self.v = dict()
        actions = grid_mdp_ob.getAction()
        # 初始化各状态值为0，评估均匀随机策略
        for state in grid_mdp_ob.getStates():
            self.v[state] = 0
            self.pi[state] = actions[int(random.random() * len(actions))]

    def policy_iterate(self, grid_mdp_ob: GridEnv):
        for i in range(100):
            # 策略评估
            self.policy_evaluate(grid_mdp_ob)
            # 策略改进
            self.policy_improve(grid_mdp_ob)

    def policy_evaluate(self, grid_mdp_ob: GridEnv):
        for i in range(1000):
            delta = 0.0
            for state in grid_mdp_ob.states:
                if state in grid_mdp_ob.terminate_states:
                    continue
                action = self.pi[state]
                s, r, t, _ = grid_mdp_ob.transform(state,action)
                new_v = r + grid_mdp_ob.gamma * self.v[s] # 为什么和评估公式不一样
                delta += abs(self.v[state] - new_v)
                self.v[state] = new_v
            if delta < 1e-6:
                break

    def policy_improve(self, grid_mdp_ob: GridEnv):
        for state in grid_mdp_ob.states:
            if state in grid_mdp_ob.terminate_states:
                continue
            a1 = grid_mdp_ob.actions[0]
            s,r,t,_ = grid_mdp_ob.transform(state,a1)
            v1 = r + grid_mdp_ob.gamma * self.v[state]
            for action in grid_mdp_ob.actions:
                s,r,t,_ = grid_mdp_ob.transform(state,action)
                if v1 < r + grid_mdp_ob.gamma * self.v[s]:
                    a1 = action
                    v1 = r + grid_mdp_ob.gamma * self.v[s]
            self.pi[state] = a1

    def identify(self,grid_mdp_ob: GridEnv):
        import time
        state = grid_mdp_ob.reset()
        grid_mdp_ob.render()
        time.sleep(2)
        t = True if state in grid_mdp_ob.terminate_states else False
        while t is False:
            action = self.pi[state]
            s,r,t,_ = grid_mdp_ob.step(action)
            grid_mdp_ob.render()
            state = s
            time.sleep(2)
        grid_mdp_ob.render(close=True)
        return True

In [3]:
# make environment
env = gym.make('GridWorld-v0')

In [4]:
# make Test instance
test = Test(env)
test.pi,test.v

({1: 'e', 2: 'e', 3: 'n', 4: 'n', 5: 'w', 6: 'w', 7: 'e', 8: 'n'},
 {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0})

In [5]:
# test evaluation function
test.policy_evaluate(env)
test.v

{1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0, 7: 0, 8: 0}

In [6]:
# test improve function
test.policy_improve(env)
test.pi

{1: 'n', 2: 'n', 3: 's', 4: 'n', 5: 'n', 6: 'w', 7: 'e', 8: 'n'}

In [7]:
test.policy_iterate(env)
print(test.v)
print(test.pi)

{1: 0.6400000000000001, 2: 0.8, 3: 1.0, 4: 0.8, 5: 0.6400000000000001, 6: 0, 7: 0, 8: 0}
{1: 'e', 2: 'e', 3: 's', 4: 'w', 5: 'w', 6: 'w', 7: 'e', 8: 'n'}


In [8]:
# use the optimized strategy to test
test.identify(env)

True