# Implementation of Async. one-step Q-learning

In [1]:
import itertools
import shutil
import threading
import multiprocessing
import gym
import numpy as np
import tensorflow as tf
import tensorflow.contrib.slim as slim
import matplotlib.pyplot as plt
%matplotlib inline
import random
import sys
import cv2
import os
import time
from tqdm import tqdm
import Gym_Auxiliar as Aux_Gym

from inspect import getsourcefile
current_path = os.path.dirname(os.path.abspath(getsourcefile(lambda:0)))
import_path = os.path.abspath(os.path.join(current_path, ".."))

if import_path not in sys.path:
    sys.path.append(import_path)

import commonOps as cops

In [2]:
conf_parameters = {
    'checkpoint_dir': 'Models/',
    'log_dir': 'Logs/',
    'env_name': 'Breakout-v0',
    't_max': 5,          # Update frequency
    'max_global_episodes': 1000000,
    
    'update_target': 2000,
    
    'eval_steps': 300,             # Evaluate the policy every N seconds
    'reset': False,                # If set, delete the existing model directory and start training from scratch.
    'num_threads': 4,              # Number of threads to run.
    
    # Input size
    'screen_width': 84,
    'screen_height': 84,
    'history_length': 4,
    
    # Gym environment
    'pool_frame_size': 1,
    'random_start': 30,           # Maximum number of 'do nothing' actions at the start of an episode
    'action_repeat': 1,           # How many times should the same action be taken
    
    'batch_size': 32,             # Number of training cases ove which SGD update is computed
    'gamma': 0.99,                # Discount factor
    'learning_rate': 0.00025,     # Learning rate
}

# Set the number of workers
num_workers = multiprocessing.cpu_count()
if conf_parameters['num_threads']:
    num_workers = conf_parameters['num_threads']

# Optionally empty model directory
if conf_parameters['reset']:
    shutil.rmtree(conf_parameters['checkpoint_dir'], ignore_errors=True)
    shutil.rmtree(conf_parameters['log_dir'], ignore_errors=True)

if not os.path.exists(conf_parameters['checkpoint_dir']):
    os.makedirs(conf_parameters['checkpoint_dir'])
    
if not os.path.exists(conf_parameters['log_dir']):
    os.makedirs(conf_parameters['log_dir'])

## Class defining global shared networks

In [3]:
class ConvNet_Estimator():
    '''Q-value estimator neural network
       This architecture will be used both for the Q-network and the Target network.
    '''
    
    def __init__(self,conf,num_actions,net_type=0):
        if net_type == 0:
            self.scope = 'Global_Training'
            self.collection = 'Normal' 
        else:
            self.scope = 'Global_Target'
            self.collection = 'Target'
            
        self.num_actions = num_actions
        self.screen_width = conf['screen_width']
        self.screen_height = conf['screen_height']
        self.history_length = conf['history_length']
        self.batch_size = conf['batch_size']
        self.learning_rate = conf['learning_rate']
        self.gamma = conf['gamma']
      
        with tf.variable_scope(self.scope):
            # Build the graph
            self._build_model()
                
    def _build_model(self):
        '''
        Building the network architecture
        '''
        self.state_ph = tf.placeholder(tf.float32,[None, self.screen_height, self.screen_width, self.history_length],name='X')
        
        self.conv1 = slim.conv2d(activation_fn=tf.nn.elu,
                inputs=self.state_ph,num_outputs=16,
                kernel_size=[8,8],stride=[4,4],padding='VALID', scope="conv1")
        self.conv2 = slim.conv2d(activation_fn=tf.nn.elu,
                inputs=self.conv1,num_outputs=32,
                kernel_size=[4,4],stride=[2,2],padding='VALID', scope="conv2")
        hidden = slim.fully_connected(slim.flatten(self.conv2),256,activation_fn=tf.nn.elu, scope="FC")

        self.Q_output = slim.fully_connected(hidden,self.num_actions,
                activation_fn=tf.nn.softmax,
                biases_initializer=None, scope='Output')
        
        with tf.name_scope("Best_Action"):
            self.best_action = tf.argmax(self.Q_output,dimension=1)
        
        if self.scope != 'Global_Target':  
            with tf.name_scope("loss"):
                with tf.name_scope("Inputs"):
                    # The TD target value
                    self.QT_ph = tf.placeholder(tf.float32,[None],name='QT_ph')
                    # Integer id of selected action
                    self.action_ph = tf.placeholder(tf.int32,[None],name='action_ph')

                    self.reward_ph = tf.placeholder(tf.float32, [None], name="reward_ph")
                    self.terminal_ph = tf.placeholder(tf.float32, [None], name="terminal_ph")
                
                with tf.name_scope("Acted_Q"):
                    # One hot of the action which was taken
                    action_one_hot = tf.one_hot(self.action_ph,self.num_actions, 1., 0., name='action_one_hot')
                    # Get the prediction of the chosen actions only
                    acted_Q = tf.reduce_sum(self.Q_output * action_one_hot, reduction_indices=1, name='DQN_acted')

                with tf.name_scope("Target_Q"):
                    Y = self.reward_ph + self.gamma * self.QT_ph * (1 - self.terminal_ph)
                    Y = tf.stop_gradient(Y)

                loss_batch = cops.clipped_l2(Y, acted_Q)
                loss = tf.reduce_sum(loss_batch, name="loss")
        
                self.train_op, grads = cops.graves_rmsprop_optimizer(loss, self.learning_rate, 0.95, 0.01, 1)
        
            with tf.name_scope("Summaries"):
                # Summaries for Tensorboard
                self.summaries = tf.merge_summary([
                        tf.scalar_summary('losses/loss', loss),
                        tf.scalar_summary("losses/loss_max", tf.reduce_max(loss_batch)),
                        tf.scalar_summary('Q/avg_q',tf.reduce_mean(self.Q_output)),
                        tf.histogram_summary('Q/q_values_hist',self.Q_output),
                        tf.scalar_summary('Q/max_q_value',tf.reduce_max(self.Q_output)),
                        tf.scalar_summary('Others/reward_max', tf.reduce_max(self.reward_ph))
                        ])  
                
    def predict(self,sess,state):
        '''
        Predicts action values.
        '''
        return sess.run(self.Q_output,{self.state_ph:state})
    
    def determine_action(self,sess,state,epsilon):
        '''
        Predicts action based on q values for current state and exploration strategy
        '''
        if random.random() < epsilon:
            # Explore: random action
            action = random.randrange(self.num_actions)
        else:            
            action = sess.run(self.best_action,{self.state_ph:[state]})[0]

        return action
    
    def update(self,sess,state,action,reward,target,terminal):
        '''
        Updates the estimator towards the given targets
        '''
        feed_dict = {self.state_ph:state, self.QT_ph:target, 
                     self.action_ph:action, self.reward_ph:reward,
                     self.terminal_ph:terminal}
        _, summaries = sess.run([self.train_op, self.summaries],feed_dict)

        return summaries

In [4]:
class Monitoring_Worker():
    '''
    Monitors the global step, updates target network and test progress.
    '''
    def __init__(self, name, conf, Q_net, Target_net, global_episodes, summary_writer=None):
        self.name = name
        self.max_global_episodes = conf['max_global_episodes']
        self.update_target_freq = conf['update_target']
        self.global_Q_net = Q_net
        self.global_Target_net = Target_net
        self.global_episodes = global_episodes
        self.summary_writer = summary_writer
        self.eval_env = Aux_Gym.GymEnvironment(conf_parameters['env_name'],conf_parameters)
        
        with tf.name_scope("Copy_Parameters"):
            self.update_target_ops = self.update_target_graph(self.global_Q_net.scope,self.global_Target_net.scope) 
        
    def update_target_graph(self,from_scope,to_scope):
        from_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, from_scope)
        to_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, to_scope)

        op_holder = []
        for from_var,to_var in zip(from_vars,to_vars):
            op_holder.append(to_var.assign(from_var))
        return op_holder
        
    def run(self, sess, coord):
        with sess.as_default(), sess.graph.as_default():
            try:
                while not coord.should_stop():
                    episode_count = sess.run(self.global_episodes)
                
                    if episode_count % self.update_target_freq == 0:
                        sess.run(self.update_target_ops)
                        print('updated')
                        
                    if episode_count >= self.max_global_episodes:
                        print('finish')
                        coord.request_stop()
                        return

            except tf.errors.CancelledError:
                return

In [None]:
Q_Network = ConvNet_Estimator(conf_parameters,6,net_type=0)
Target_Network = ConvNet_Estimator(conf_parameters,6,net_type=1)

with tf.device("/cpu:0"):
    global_episodes = tf.Variable(0,dtype=tf.int32,name='global_episodes',trainable=False)
    #summary_writer = tf.train.SummaryWriter(conf_parameters['log_dir'], flush_secs=30)
    
    Eval_worker = Monitoring_Worker(
            name='Worker_0',
            conf=conf_parameters,
            Q_net=Q_Network,
            Target_net=Target_Network,
            global_episodes=global_episodes,
            summary_writer = [])
    
    Eval_worker2 = Monitoring_Worker(
            name='Worker_1',
            conf=conf_parameters,
            Q_net=Q_Network,
            Target_net=Target_Network,
            global_episodes=global_episodes,
            summary_writer = [])
        
with tf.Session() as sess: 
    summary_writer = tf.train.SummaryWriter(conf_parameters['log_dir'],sess.graph, flush_secs=30)
    
    sess.run(tf.initialize_all_variables())
    coord = tf.train.Coordinator()
    
    # Start a thread
    worker_threads = []
    eval_thread = threading.Thread(target=lambda: Eval_worker.run(sess, coord))
    eval_thread.start()
    worker_threads.append(eval_thread)
    
    eval_thread2 = threading.Thread(target=lambda: Eval_worker2.run(sess, coord))
    eval_thread2.start()
    worker_threads.append(eval_thread2)

    # Wait for all workers to finish
    coord.join(worker_threads)    

## Define the workers

Each worker will run in a different thread

In [None]:
class Worker():
    '''
    An A3C worker thread. Runs episodes locally and updates global shared value and policy nets.
    '''
    def __init__(self, name, conf, Q_net, Target_net, global_counter, summary_writer=None):
        self.name = name
        self.gamma = conf['gamma']
        self.max_global_episodes = conf['max_global_episodes']
        # self.global_step = tf.contrib.framework.get_global_step()
        self.global_Q_net = Q_net
        self.global_Target_net = Target_net
        self.global_counter = global_counter
        self.local_counter = itertools.count()
        self.summary_writer = summary_writer
        self.env = Aux_Gym.GymEnvironment(conf_parameters['env_name'],conf_parameters)
        
    def run(self, sess, coord, t_max):
        with sess.as_default(), sess.graph.as_default():
            # Initial state
            self.state = self.env.new_game()
            try:
                while not coord.should_stop():
                    # Copy Parameters from the global networks
                    sess.run(self.copy_params_op)

                    # Collect some experience
                    transitions, local_t, global_t = self.run_n_steps(t_max, sess)

                    if self.max_global_steps is not None and global_t >= self.max_global_steps:
                        tf.logging.info("Reached global step {}. Stopping.".format(global_t))
                        coord.request_stop()
                        return

                    # Update the global networks
                    self.update(transitions, sess)

            except tf.errors.CancelledError:
                return

## Set up the threads and global networks

In [None]:
env = gym.make(conf_parameters['env_name'])
action_size = env.action_space.n
env.close()

with tf.device("/cpu:0"):
    # Keeps track of the number of updates we've performed
    global_step = tf.Variable(0, name="global_step", trainable=False)

    # Global shared networks
    with tf.variable_scope("global"):
        Q_Network = ConvNet_Estimator(conf_parameters,action_size,net_type=0)
        Target_Network = ConvNet_Estimator(conf_parameters,action_size,net_type=1)

    # Global step iterator
    global_counter = itertools.count()

    # Create worker graphs
    workers = []
    for worker_id in range(num_workers):
        # We only write summaries in one of the workers because they're pretty much identical.
        worker_summary_writer = None
        if worker_id == 0:
            worker_summary_writer = summary_writer

        # TODO: Define Worker class
        worker = Worker(
            name="worker_{}".format(worker_id),
            env=gym.make(conf_parameters['env']),
            q_estimator=q_estimator,
            target_estimator=target_estimator,
            global_counter=global_counter,
            discount_factor = 0.99,
            summary_writer=worker_summary_writer,
            max_global_steps=conf_parameters['max_steps'])
        workers.append(worker)

    saver = tf.train.Saver(keep_checkpoint_every_n_hours=2.0, max_to_keep=10)

    # Used to occasionally write episode rewards to Tensorboard
    # TODO: Define monitor
    pe = PolicyMonitor(
        env= gym.make(conf_parameters['env']),
        q_estimator=q_estimator,
        summary_writer=summary_writer,
        saver=saver)

## Execute the threads

In [None]:
with tf.Session() as sess:
    sess.run(tf.initialize_all_variables())
    coord = tf.train.Coordinator()

    # Load a previous checkpoint if it exists
    latest_checkpoint = tf.train.latest_checkpoint(conf_parameters['checkpoint_dir'])
    if latest_checkpoint:
        print("Loading model checkpoint: {}".format(latest_checkpoint))
        saver.restore(sess, latest_checkpoint)

    # Start worker threads
    worker_threads = []
    for worker in workers:
        worker_fn = lambda: worker.run(sess, coord, conf_parameters['t_max'])
        t = threading.Thread(target=worker_fn)
        t.start()
        worker_threads.append(t)

    # Start a thread for policy eval task
    monitor_thread = threading.Thread(target=lambda: pe.continuous_eval(conf_parameters['eval_steps'], sess, coord))
    monitor_thread.start()

    # Wait for all workers to finish
    coord.join(worker_threads)