CartPole-v0游戏比较简单，基本要求就是控制下面的cart移动使上面的pole保持垂直不倒。这个任务只有两个离散动作，要么向左用力，要么向右用力。而state状态就是这个cart的位置和速度， pole的角度和角速度，4维的特征。坚持到200分的奖励则为过关。

In [1]:
import gym
import tensorflow as tf
import numpy as np
import random
from collections import deque

In [2]:
# 衰减因子GAMMA
GAMMA = 0.9

# EPSILON的初始值
INITIAL_EPSILON = 0.5
# EPSILON的最终值
FINAL_EPSILON = 0.01

#经验回放表的大小
REPLAY_SIZE = 10000

# 批量梯度下降的样本数m
BATCH_SIZE = 32


# 迭代轮次T
EPISODE = 3000

# 更新Q网络的频率
REPLACE_TARGET_FREQ = 10

# Step limitation in an episode
STEP = 300

TEST = 5

In [3]:
class DQN():
    def __init__(self, env):
        # 初始化回放队列
        self.replay_buffer = deque()
        
        self.time_step = 0
        self.epsilon = INITIAL_EPSILON
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n

        self.create_Q_network()
        self.create_training_method()

        # 初始化session
        self.session = tf.InteractiveSession()
        self.session.run(tf.global_variables_initializer())
        
    def create_Q_network(self):
        # 初始化
        self.state_input = tf.placeholder("float", [None, self.state_dim])
        # network weights
        with tf.variable_scope('current_net'):
            W1 = self.weight_variable([self.state_dim, 20])
            b1 = self.bias_variable([20])
            W2 = self.weight_variable([20, self.action_dim])
            b2 = self.bias_variable([self.action_dim])
            # hidden layers
            h_layer = tf.nn.relu(tf.matmul(self.state_input, W1) + b1)
            # Q Value layer
            self.Q_value = tf.matmul(h_layer, W2) + b2

        with tf.variable_scope('target_net'):
            W1t = self.weight_variable([self.state_dim, 20])
            b1t = self.bias_variable([20])
            W2t = self.weight_variable([20, self.action_dim])
            b2t = self.bias_variable([self.action_dim])

            # hidden layers
            h_layer_t = tf.nn.relu(tf.matmul(self.state_input, W1t) + b1t)
            # Q Value layer
            self.target_Q_value = tf.matmul(h_layer_t, W2t) + b2t

        t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='target_net')
        e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='current_net')

        with tf.variable_scope('soft_replacement'):
            self.target_replace_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]
    def weight_variable(self,shape):
        initial = tf.truncated_normal(shape)
        return tf.Variable(initial)
    def bias_variable(self,shape):
        print(shape)
        initial = tf.constant(0.01, shape = shape)
        return tf.Variable(initial)
    def create_training_method(self):
        self.action_input = tf.placeholder("float", [None, self.action_dim])  # one hot presentation
        self.y_input = tf.placeholder("float", [None])
        Q_action = tf.reduce_sum(tf.multiply(self.Q_value, self.action_input), reduction_indices=1)
        self.cost = tf.reduce_mean(tf.square(self.y_input - Q_action))
        self.optimizer = tf.train.AdamOptimizer(0.0001).minimize(self.cost)
    def egreedy_action(self, state):
        Q_value = self.Q_value.eval(feed_dict={self.state_input: [state]})[0]
        if random.random() <= self.epsilon:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000
            return random.randint(0, self.action_dim - 1)
        else:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000
            return np.argmax(Q_value)
    def perceive(self, state, action, reward, next_state, done):
        one_hot_action = np.zeros(self.action_dim)
        one_hot_action[action] = 1
        self.replay_buffer.append((state, one_hot_action, reward, next_state, done))
        if len(self.replay_buffer) > REPLAY_SIZE:
            self.replay_buffer.popleft()

        if len(self.replay_buffer) > BATCH_SIZE:
            self.train_Q_network()


    def train_Q_network(self):
        self.time_step += 1
        # Step 1: obtain random minibatch from replay memory
        minibatch = random.sample(self.replay_buffer, BATCH_SIZE)
        state_batch = [data[0] for data in minibatch]
        action_batch = [data[1] for data in minibatch]
        reward_batch = [data[2] for data in minibatch]
        next_state_batch = [data[3] for data in minibatch]

        # Step 2: calculate y
        y_batch = []
        Q_value_batch = self.target_Q_value.eval(feed_dict={self.state_input: next_state_batch})
        for i in range(0, BATCH_SIZE):
            done = minibatch[i][4]
            if done:
                y_batch.append(reward_batch[i])
            else:
                y_batch.append(reward_batch[i] + GAMMA * np.max(Q_value_batch[i]))

        self.optimizer.run(feed_dict={self.y_input: y_batch,self.action_input: action_batch,self.state_input: state_batch})


    def action(self, state):
        return np.argmax(self.Q_value.eval(feed_dict={self.state_input: [state]})[0])


    def update_target_q_network(self, episode):
        # update target Q netowrk
        if episode % REPLACE_TARGET_FREQ == 0:
            self.session.run(self.target_replace_op)


In [4]:
import matplotlib.pyplot as plt
%matplotlib inline
# 初始化DQN
env = gym.make('CartPole-v0')
agent = DQN(env)
# 循环T次
for episode in range(EPISODE):
    # 初始化states
    state = env.reset()
    # 训练
    for step in range(STEP):
        # 利用e-greedy更新选择动作
        action = agent.egreedy_action(state)
        next_state,reward,done,_ = env.step(action)
        reward = -1 if done else 0.1
        agent.perceive(state,action,reward,next_state,done)
        state = next_state
        if done:
            break
    # 每100 episodes测试一次
    if episode % 100 == 0:
        total_reward = 0
        for i in range(TEST):
            state = env.reset()
            for j in range(STEP):
                render = lambda : plt.imshow(env.render(mode='rgb_array'))
                action = agent.action(state)
                state,reward,done,_ = env.step(action)
                total_reward += reward
                if done:
                    break
        ave_reward = total_reward/TEST
        print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)
    agent.update_target_q_network(episode)


[20]
[2]
[20]
[2]
episode:  0 Evaluation Average Reward: 9.4
episode:  100 Evaluation Average Reward: 16.0
episode:  200 Evaluation Average Reward: 31.8
episode:  300 Evaluation Average Reward: 71.8
episode:  400 Evaluation Average Reward: 181.4
episode:  500 Evaluation Average Reward: 200.0
episode:  600 Evaluation Average Reward: 200.0
episode:  700 Evaluation Average Reward: 200.0
episode:  800 Evaluation Average Reward: 200.0
episode:  900 Evaluation Average Reward: 200.0
episode:  1000 Evaluation Average Reward: 200.0
episode:  1100 Evaluation Average Reward: 200.0
episode:  1200 Evaluation Average Reward: 200.0
episode:  1300 Evaluation Average Reward: 200.0
episode:  1400 Evaluation Average Reward: 199.2
episode:  1500 Evaluation Average Reward: 200.0
episode:  1600 Evaluation Average Reward: 200.0
episode:  1700 Evaluation Average Reward: 200.0
episode:  1800 Evaluation Average Reward: 194.8
episode:  1900 Evaluation Average Reward: 200.0
episode:  2000 Evaluation Average Rewar