In [16]:
import numpy as np
import gym
import copy


def run_episode(env, policy, gamma, n, show):
    """ Runs an episode and return the total reward """
    total_reward = 0
    for i in range(n):
        if show:
            print("Episode %d running..." % (i+1))
        obs, info = env.reset()
        episode_reward = 0
        step_idx = 0
        while True:
            if show:
                env.render()
            obs, reward, terminated, truncated, info = env.step(int(policy[obs]))
            episode_reward += (gamma ** step_idx * reward)
            step_idx += 1
            if terminated or truncated:
                if show:
                    env.render()
                    print("Episode %d finished." % (i+1))
                break
        total_reward += episode_reward
    return total_reward / n


def policy_improvement(env, policy, value_func, gamma):
    action_value_func = np.zeros((env_.observation_space.n, env_.action_space.n))
    for state in range(env.observation_space.n):
        for action in range(env.action_space.n):
            action_value_func_update = 0
            for prob, next_state, reward, _ in env.P[state][action]:
                action_value_func_update += prob * (reward + gamma * value_func[next_state])
            action_value_func[state][action] = action_value_func_update
        policy[state] = np.argmax(action_value_func[state])
    return policy


def policy_evaluation(env, policy, value_func, gamma):
    # 用于判断状态值函数是否收敛
    eps = 1e-10
    # 不断迭代状态值函数直到收敛
    i = 0
    while True:
        # 定义上一个状态值函数
        prev_value_func =  copy.deepcopy(value_func)
        # 遍历每一个状态
        for state in range(env.observation_space.n):
            # 定义状态值函数更新量
            value_func_update = 0
            action = policy[state]
            for prob, next_state, reward, _ in env.P[state][action]:
                value_func_update += prob * (reward + gamma * prev_value_func[next_state])
            # 更新当前状态下的状态值函数
            value_func[state] = value_func_update
        # 判断状态值函数是否收敛
        if np.sum((np.fabs(prev_value_func - value_func))) <= eps:
            # 已收敛
            print('valie-Iteration converged at step %d.' % (i+1))
            break
        i += 1
    return value_func


def policy_iteration(env, gamma):
    # 随机初始化策略
    # policy = np.random.choice(env.action_space.n, size=(env.observation_space.n))
    # 初始化策略函数
    policy = np.zeros(env_.observation_space.n)
    # 初始化状态值函数 V(s)=0
    value_func = np.zeros(env_.observation_space.n)
    i = 0
    while True:
        new_value_func = policy_evaluation(env, policy, copy.deepcopy(value_func), gamma)
        new_policy = policy_improvement(env, copy.deepcopy(policy), new_value_func, gamma)
        if np.all(policy == new_policy):
            print('Policy-Iteration converged at step %d.' % (i+1))
            break
        policy = copy.deepcopy(new_policy)
        value_func = copy.deepcopy(new_value_func)
        i += 1
    return new_policy

In [17]:
env_ = gym.make('FrozenLake-v1', desc=None, map_name="8x8", is_slippery=False, render_mode="human")
# 将环境重置为初始状态
env_.reset()
optimal_policy = policy_iteration(env_, gamma=0.9)
scores = run_episode(env_, optimal_policy, gamma=0.9, n=2, show=True)
print('Average scores = ', scores)


TypeError: __init__() got an unexpected keyword argument 'render_mode'

In [4]:

# 定义环境
env_ = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False, render_mode="human")
# 将环境重置为初始状态
env_.reset()

TypeError: __init__() got an unexpected keyword argument 'render_mode'

In [None]:
env_.observation_space.n

In [None]:
env_.P[0][0]

In [None]:
np.fabs

In [18]:
import numpy as np
import gym


def run_episode(env, policy, gamma, n, show):
    """ Runs an episode and return the total reward """
    total_reward = 0
    for i in range(n):
        if show:
            print("Episode %d running..." % (i+1))
        obs, info = env.reset()
        episode_reward = 0
        step_idx = 0
        while True:
            if show:
                env.render()
            obs, reward, terminated, truncated, info = env.step(int(policy[obs]))
            episode_reward += (gamma ** step_idx * reward)
            step_idx += 1
            if terminated or truncated:
                if show:
                    env.render()
                    print("Episode %d finished." % (i+1))
                break
        total_reward += episode_reward
    return total_reward / n


def value_iteration(env, gamma):
    # 随机初始化策略
    # policy = np.random.choice(env.action_space.n, size=(env.observation_space.n))
    # 初始化策略函数
    policy = np.zeros(env_.observation_space.n)
    # 初始化状态值函数 V(s)=0
    value_func = np.zeros(env_.observation_space.n)
    i = 0
    # 用于判断状态值函数是否收敛
    eps = 1e-10
    while True:
        # 定义上一个状态值函数
        prev_value_func = value_func.copy()
        # 遍历每一个状态
        for state in range(env.observation_space.n):
            value_func_max = 0
            for action in range(env.action_space.n):
                # 获取当前状态和动作下环境的下一个状态的信息
                for prob, next_state, reward, _ in env.P[state][action]:
                    # 当前状态下状态值函数所要更新的量
                    value_func_max = max(prob * (reward + gamma * prev_value_func[next_state]), value_func_max)
            # 更新当前状态下的状态值函数
            value_func[state] = value_func_max
        # 判断状态值函数是否收敛
        if np.sum((np.fabs(prev_value_func - value_func))) <= eps:
            # 已收敛
            print('Value-Iteration converged at step %d.' % (i + 1))
            break
        i += 1
    # 遍历每一个状态
    action_value_func = np.zeros((env_.observation_space.n, env_.action_space.n))
    for state in range(env.observation_space.n):
        for action in range(env.action_space.n):
            action_value_func_update = 0
            for prob, next_state, reward, _ in env.P[state][action]:
                action_value_func_update += prob * (reward + gamma * value_func[next_state])
            action_value_func[state][action] = action_value_func_update
        policy[state] = np.argmax(action_value_func[state])
    return policy


In [21]:

# 定义环境
env_ = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False, render_mode="human")
# 将环境重置为初始状态
env_.reset()
optimal_policy = value_iteration(env_, gamma=0.8)
scores = run_episode(env_, optimal_policy, gamma=0.8, n=2, show=True)
print('Average scores = ', scores)

Value-Iteration converged at step 7.
Episode 1 running...
Episode 1 finished.
Episode 2 running...
Episode 2 finished.
Average scores =  0.3276800000000001


In [None]:
env_.close()

In [8]:
import gym
env = gym.make('CartPole-v0')
env.reset()
for i in range(1000):
    env.render()
    env.step(env.action_space.sample()) # take a random action
env.close()

In [9]:
import gym
env = gym.make("LunarLander-v2")
env.reset()
env.action_space.seed(42)

observation, info = env.reset(seed=42, return_info=True)

for _ in range(1000):
    env.render(mode='human')
    observation, reward, done, info = env.step(env.action_space.sample())

    if done:
        observation, info = env.reset(return_info=True)

env.close()

AttributeError: module 'gym.envs.box2d' has no attribute 'LunarLander'