In [None]:
import gym

env = gym.make('CartPole-v1')

In [None]:
for _ in range(10):
    t = 0
    env.reset()
    while True:
        action = env.action_space.sample()
        observation, reward, done, _ = env.step(action)
        t += 1
        if done:
            print("Episode finished after {} timesteps".format(t))
            break

In [None]:
"""Monte-Carlo 策略梯度算法
"""

def mc_policy_gradient(env, theta, lr, episodes):
    """
    参数:
    env -- 环境
    theta -- 参数
    lr -- 学习率
    episodes -- 迭代次数

    返回: 
    episodes -- 参数
    """
    for episode in range(episodes):  # 迭代 episode
        episode = []
        start_observation = env.reset()  # 初始化环境
        t = 0
        while True:
            policy = np.dot(theta, start_observation)  # 计算策略值
            # 这里的 action_space 为 2, 故使用 Sigmoid 激活函数处理策略值
            pi = 1 / (1 + np.exp(-policy))
            if pi >= 0.5:
                action = 1  # 向右施加力
            else:
                action = 0  # 向左施加力
            next_observation, reward, done, _ = env.step(action)  # 执行动作
            # 将环境返回结果添加到 episode 中
            episode.append([next_observation, action, pi, reward])
            start_observation = next_observation  # 将返回 observation 作为下一次迭代 observation
            t += 1
            if done:
                print("Episode finished after {} timesteps".format(t))
                break
        # 根据上一次 episode 更新参数 theta
        for timestep in episode:
            observation, action, pi, reward = timestep
            theta += lr * (1 - pi) * np.transpose(-observation) * reward
    
    return theta

In [None]:
import numpy as np
lr = 0.001
theta = np.random.rand(4)
episodes=10

mc_policy_gradient(env, theta, lr, episodes)

In [None]:
"""Actor-Critic 策略梯度算法
"""
def ac_policy_gradient(env, theta, w, lr, gamma, episodes):
    done = True
    for _ in range(episodes):
        t = 0
        while True:
            if done:  # 根据 done 值判断是否重新设定环境
                start_observation = env.reset()  # 初始化环境
                # 根据策略选择相应的动作
                policy = np.dot(theta, start_observation)
                start_pi = 1 / (1 + np.exp(-policy))
                if start_pi >= 0.5:
                    start_action = 1
                else:
                    start_action = 0
                start_q = np.dot(w, start_observation)  # 计算价值 Q

            observation, reward, done, _ = env.step(start_action)  # 执行动作
            # 根据新策略选择相应的动作
            policy = np.dot(theta, observation)
            pi = 1 / (1 + np.exp(-policy))
            if pi >= 0.5:
                action = 1
            else:
                action = 0
            q = np.dot(w, observation)
            # 更新参数
            delta = reward + gamma * q - start_q
            theta += lr * (1 - start_pi) * np.transpose(-start_observation) * start_q
            w += lr * delta * np.transpose(start_observation)
            start_pi, start_observation, start_q, start_action = pi, observation, q, action

            t += 1
            if done:
                print("Episode finished after {} timesteps".format(t+1))
                break
    return theta, w

In [None]:
gamma = 1
theta = np.random.rand(4)
w = np.random.rand(4)
lr = 0.001
episodes=10

ac_policy_gradient(env, theta, w, lr, gamma, episodes)