In [1]:
# 使用tf2 的一些demo

In [2]:
import tensorflow as tf
import numpy as np
import gym
import random
from collections import deque

num_episodes = 500              # 游戏训练的总episode数量
num_exploration_episodes = 100  # 探索过程所占的episode数量
max_len_episode = 1000          # 每个episode的最大回合数
batch_size = 32                 # 批次大小
learning_rate = 1e-3            # 学习率
gamma = 1.                      # 折扣因子
initial_epsilon = 1.            # 探索起始时的探索率
final_epsilon = 0.01            # 探索终止时的探索率

In [3]:
class QNetwork(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=2)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, inputs):
        q_values = self(inputs)
        return tf.argmax(q_values, axis=-1)

In [4]:
env = gym.make('CartPole-v1')       # 实例化一个游戏环境，参数为游戏名称
model = QNetwork()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
replay_buffer = deque(maxlen=10000) # 使用一个 deque 作为 Q Learning 的经验回放池
epsilon = initial_epsilon
for episode_id in range(num_episodes):
    state = env.reset()             # 初始化环境，获得初始状态
    epsilon = max(                  # 计算当前探索率
        initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
        final_epsilon)
    for t in range(max_len_episode):
        env.render()                                # 对当前帧进行渲染，绘图到屏幕
        if random.random() < epsilon:               # epsilon-greedy 探索策略，以 epsilon 的概率选择随机动作
            action = env.action_space.sample()      # 选择随机动作（探索）
        else:
            action = model.predict(np.expand_dims(state, axis=0)).numpy()   # 选择模型计算出的 Q Value 最大的动作
            action = action[0]

        # 让环境执行动作，获得执行完动作的下一个状态，动作的奖励，游戏是否已结束以及额外信息
        next_state, reward, done, info = env.step(action)
        # 如果游戏Game Over，给予大的负奖励
        reward = -10. if done else reward
        # 将(state, action, reward, next_state)的四元组（外加 done 标签表示是否结束）放入经验回放池
        replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
        # 更新当前 state
        state = next_state

        if done:                                    # 游戏结束则退出本轮循环，进行下一个 episode
            print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
            break

        if len(replay_buffer) >= batch_size:
            # 从经验回放池中随机取一个批次的四元组，并分别转换为 NumPy 数组
            batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
                *random.sample(replay_buffer, batch_size))
            batch_state, batch_reward, batch_next_state, batch_done = \
                [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
            batch_action = np.array(batch_action, dtype=np.int32)

            q_value = model(batch_next_state)
            y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done)  # 计算 y 值
            with tf.GradientTape() as tape:
                loss = tf.keras.losses.mean_squared_error(  # 最小化 y 和 Q-value 的距离
                    y_true=y,
                    y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
                )
            grads = tape.gradient(loss, model.variables)
            optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))       # 计算梯度并更新参数



episode 0, epsilon 1.000000, score 38
episode 1, epsilon 0.990000, score 14
episode 2, epsilon 0.980000, score 11


W0221 15:38:02.449100 4563113408 base_layer.py:1790] Layer q_network is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



episode 3, epsilon 0.970000, score 17
episode 4, epsilon 0.960000, score 11
episode 5, epsilon 0.950000, score 29
episode 6, epsilon 0.940000, score 13
episode 7, epsilon 0.930000, score 12
episode 8, epsilon 0.920000, score 13
episode 9, epsilon 0.910000, score 31
episode 10, epsilon 0.900000, score 32
episode 11, epsilon 0.890000, score 12
episode 12, epsilon 0.880000, score 36
episode 13, epsilon 0.870000, score 17
episode 14, epsilon 0.860000, score 37
episode 15, epsilon 0.850000, score 18
episode 16, epsilon 0.840000, score 12
episode 17, epsilon 0.830000, score 20
episode 18, epsilon 0.820000, score 11
episode 19, epsilon 0.810000, score 10
episode 20, epsilon 0.800000, score 18
episode 21, epsilon 0.790000, score 15
episode 22, epsilon 0.780000, score 31
episode 23, epsilon 0.770000, score 17
episode 24, epsilon 0.760000, score 47
episode 25, epsilon 0.750000, score 13
episode 26, epsilon 0.740000, score 12
episode 27, epsilon 0.730000, score 29
episode 28, epsilon 0.720000, sc

KeyboardInterrupt: 