In [2]:
import gym
import time
import numpy as np 
import torch
# import dill

In [4]:
class QLearningAgent(object):
    def __init__(self, obs_n, act_n, learning_rate=0.01, gamma=0.9, e_greed=0.1):
        self.act_n = act_n      # 动作维度，有几个动作可选
        self.lr = learning_rate # 学习率
        self.gamma = gamma      # reward的衰减率
        self.epsilon = e_greed  # 按一定概率随机选动作
        self.Q = np.zeros((obs_n, act_n))
        self.algo_name = 'Q-learning'  # 算法名称，我们使用Q学习算法
        self.env_name = 'CliffWalking-v0'  # 环境名称，悬崖行走
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 检测GPU，如果没装CUDA的话默认为CPU
        self.sample_count = 0

    # 智能体选择动作(带探索)
    def sample(self, obs):
        # e-greedy 策略
        if np.random.uniform(0, 1) < (1.0 - self.epsilon): 
            #根据table的Q值选择动作
            action = self.predict(obs)
        else:
            action = np.random.choice(self.act_n) 
            #有一定概率随机探索选取一个动作
        return action

    # 根据输入观察值，依据Q-table预测输出的动作值
    def predict(self, obs):
        Q_list = self.Q[obs, :]
        # 选择Q(s,a)最大对应的动作
        maxQ = np.max(Q_list)
        action_list = np.where(Q_list == maxQ)[0]  
        # maxQ可能对应多个action
        action = np.random.choice(action_list)
        return action

    # Q学习算法更新Q表格的方法
    def learn(self, obs, action, reward, next_obs, done):
        """ 采用off-policy
            obs: 交互前的状态
            action: 本次交互选择的action
            reward: 本次动作获得的奖励
            next_obs: 本次交互后的状态
            done: 布尔值,episode是否结束
        """
        
        predict_Q = self.Q[obs, action]  # 读取预测价值
        if done: # 终止状态判断
            target_Q = reward  # 终止状态下获取不到下一个动作，直接将 Q_target 更新为对应的奖励
        else:
            target_Q = reward + self.gamma * np.max(self.Q[next_obs, :]) # Q-learning
        self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q

    # 把Q表格的数据保存到文件中
    def save(self):
        npy_file = './q_table.npy'
        np.save(npy_file, self.Q)
        print(npy_file + ' saved.')

    # 从文件中读取数据到Q表格
    def restore(self, npy_file='./q_table.npy'):
        self.Q = np.load(npy_file)
        print(npy_file + ' loaded.')

    # def save(self, path):
    #     torch.save(
    #         obj=self.Q_table,
    #         f=path + "Qlearning_model.pkl",
    #         pickle_module=dill
    #     )
    #     print("保存模型成功！")
 
    # def load(self, path)
    # self.Q= torch.load(f=path + 'Qlearning_model.pkl', pickle_module=dill)
    # print("加载模型成功！")



In [5]:

# train.py 一个回合
def run_episode(env, agent, render=False):
    # print('开始一个回合的训练')
    # print(f'环境:{agent.env_name}, 算法:{agent.algo_name}, 设备:{agent.device}')
    total_steps = 0 # 记录每个回合走了多少step
    total_reward = 0  # 记录每个回合的全部奖励
    rewards = []  # 记录每回合的奖励，用来记录并分析奖励的变化
    ma_rewards = []  # 由于得到的奖励可能会产生振荡，使用一个滑动平均的量来反映奖励变化的趋势
    obs = env.reset() # 重置环境, 重新开一个回合（即开始新的一个episode）
    # 开始当前回合的行走，直至走到终点
    while True:
        action = agent.sample(obs)  # 根据算法选择一个动作
        next_obs, reward, done, _ = env.step(action) # 与环境进行一次动作交互
        # 训练Q-learning算法，更新Q-table
        agent.learn(obs, action, reward, next_obs, done)
        obs = next_obs  # 更新状态
        total_reward += reward
        total_steps += 1 # 计算step数
        if render:
            env.render() #渲染新的一帧图形,返回一个图像
        if done:
            break
    if ma_rewards:  # 用新的奖励与上一个奖励计算出一个平均的奖励加入到列表中，反映奖励变化的趋势
            ma_rewards.append(ma_rewards[-1] * 0.9 + total_reward * 0.1)
    else:
            ma_rewards.append(total_reward)
    return total_reward, total_steps


In [6]:
# 模型测试与训练的方法基本一致，唯一的区别只是不用再进行 Q表格的更新
def test_episode(env, agent):
    total_reward = 0
    obs = env.reset()
    while True:
        action = agent.predict(obs) # greedy
        #推进一个时间步长，返回observation为对环境的一次观察，reward奖励，done代表是否要重置环境，info用于调试的诊断信息
        next_obs, reward, done, _ = env.step(action)
        total_reward += reward
        obs = next_obs
        time.sleep(0.5)
        env.render()
        if done:
            break
    return total_reward

In [10]:
# 使用gym创建悬崖环境
env = gym.make("CliffWalking-v0")  # 0 up, 1 right, 2 down, 3 left

# 创建一个agent实例，输入超参数(Hyperparameter),tuning parameters需要人为设定
agent = QLearningAgent(
    obs_n=env.observation_space.n,  # 状态维度，即4*12的网格中的 48 个状态
    act_n=env.action_space.n,  # 动作维度， 即 4 个动作
    learning_rate=0.1,  # 学习率
    gamma=0.9,  # 折扣因子
    e_greed=0.1)  # ε-贪心策略中的终止epsilon，越小学习结果越逼近

# 训练500个episode，打印每个episode的分数
for episode in range(500):
    ep_reward, ep_steps = run_episode(env, agent, False)
    print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps, ep_reward))

# agent.save(path='./')  # 保存模型

# 全部训练结束，查看算法效果
test_reward = test_episode(env, agent)
print('test reward = %.1f' % (test_reward))


Episode 0: steps = 160 , reward = -754.0
Episode 1: steps = 943 , reward = -1834.0
Episode 2: steps = 110 , reward = -110.0
Episode 3: steps = 180 , reward = -279.0
Episode 4: steps = 412 , reward = -808.0
Episode 5: steps = 414 , reward = -1305.0
Episode 6: steps = 112 , reward = -310.0
Episode 7: steps = 145 , reward = -145.0
Episode 8: steps = 204 , reward = -501.0
Episode 9: steps = 137 , reward = -137.0
Episode 10: steps = 117 , reward = -315.0
Episode 11: steps = 147 , reward = -147.0
Episode 12: steps = 66 , reward = -66.0
Episode 13: steps = 108 , reward = -108.0
Episode 14: steps = 86 , reward = -86.0
Episode 15: steps = 122 , reward = -122.0
Episode 16: steps = 103 , reward = -301.0
Episode 17: steps = 175 , reward = -274.0
Episode 18: steps = 60 , reward = -60.0
Episode 19: steps = 84 , reward = -84.0
Episode 20: steps = 53 , reward = -53.0
Episode 21: steps = 93 , reward = -93.0
Episode 22: steps = 212 , reward = -509.0
Episode 23: steps = 49 , reward = -49.0
Episode 24: st