This is the Part-10 of the Deep Reinforcement Learning Notebook series. In this Notebook I have introduced Asynchronous Advantage Actor-Critic Algorithm (A3C).


The Notebook series is about Deep RL algorithms so it excludes all other techniques that can be used to learn functions in reinforcement learning and also the Notebook Series is not exhaustive i.e. it contains the most widely used Deep RL algorithms only.

'''
  Few things in this notebook are taken from https://blog.tensorflow.org/2018/07/deep-reinforcement-learning-keras-eager-execution.html
'''

##What is Asynchronous Advantage Actor Critic?

Asynchronous Advantage Actor Critic is quite a mouthful! Let’s start by breaking down the name, and then the mechanics behind the algorithm itself.

Asynchronous: The algorithm is an asynchronous algorithm where multiple worker agents are trained in parallel, each with their own copy of the model and environment. This allows our algorithm to not only train faster as more workers are training in parallel, but also to attain a more diverse training experience as each workers’ experience is independent.

Advantage: Advantage is a metric to judge both how good its actions were, but also how they turned out. This allows the algorithm to focus on where the network’s predictions were lacking. Intuitively, this allows us to measure the advantage of taking action, a, over following the policy π at the given time step.

Actor-Critic: The Actor-Critic aspect of the algorithm uses an architecture that shares layers between the policy and value function.


One of the most exciting thing about the paper, is that you don’t need to rely on a GPU for speed. In fact, the whole idea is to use multiple cores of a CPU, run in parallel, which gives a speedup proportional to the number of cores used.



A2C and A3C differ only in asynchronous part. A3C consists of multiple independent agents(networks) with their own weights, who interact with a different copy of the environment in parallel. Thus, they can explore a bigger part of the state-action space in much less time 

##Asynchronous Advantage Actor Critic Algorithm
Each worker performs the following workflow cycle:

1) Workers reset to global network

2) Workers interact with the environment

3) Workers calculate value and policy loss

4) Workers get gradients from losses

5) Workers update the global network with the gradients

6) Repeat

##IMPLEMENTING Asynchronous Advantage Actor Critic Algorithm

Below code setups the environment required to run and record the game and also loads the required library.

In [None]:
!sudo apt install cmake libboost-all-dev libsdl2-dev libfreetype6-dev libgl1-mesa-dev libglu1-mesa-dev libpng-dev libjpeg-dev libbz2-dev libfluidsynth-dev libgme-dev libopenal-dev zlib1g-dev timidity tar nasm
!pip3 install vizdoom

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense,Dropout,Conv2D, Flatten,BatchNormalization ,Activation,Input
from tensorflow.keras.models import Sequential,Model
import gym
import numpy as np
import threading
from queue import Queue
import imageio
import multiprocessing
from vizdoom import *
import random
from collections import deque
from tensorflow.keras.utils import normalize as normal_values
import cv2
from gym import logger as gymlogger
from skimage.transform import resize
import matplotlib
import matplotlib.pyplot as plt
 %matplotlib inline

This part setups the environment.

In [None]:
def get_game():
  game= DoomGame()
  game.load_config("defend_the_center.cfg")
  game.set_doom_scenario_path("defend_the_center.wad")
  
  game.init()
  return game

In [None]:
def record(episode,
           episode_reward,
           worker_idx,
           global_ep_reward,
           result_queue):
  """Helper function to store score and print statistics.
  Args:
    episode: Current episode
    episode_reward: Reward accumulated over the current episode
    worker_idx: Which thread (worker)
    global_ep_reward: The moving average of the global reward
    result_queue: Queue storing the moving average of the scores
    total_loss: The total loss accumualted over the current episode
    num_steps: The number of steps the episode took to complete
  """
  if global_ep_reward == 0:
    global_ep_reward = episode_reward
  else:
    global_ep_reward = global_ep_reward * 0.99 + episode_reward * 0.01
  print(
      f"Episode: {episode} | "
      f"Moving Average Reward: {int(global_ep_reward)} | "
      f"Episode Reward: {int(episode_reward)} | "
      f"Worker: {worker_idx}"
  )
  result_queue.put(global_ep_reward)
  return global_ep_reward

Defining the A3C Worker Class. At initiation, the object sets a few parameters like, action and state space,create Actor,Critic,and remember(that records the observations of each step.)

In [None]:
class A3C_worker(threading.Thread):

  # Set up global variables across different threads
  global_episode = 0
  # Moving average reward
  global_moving_average_reward = 0
  best_score = 0
  save_lock = threading.Lock()

  def __init__(self,global_model,idx,res_queue,no_of_episodes):
    super(A3C_worker, self).__init__()
    self.result_queue = res_queue
    self.env = get_game()
    self.no_of_episodes = no_of_episodes
    self.worker_idx = idx
    self.global_Actor_model = global_model
    self.state_shape= 100,160, 4 # the state space
    self.action_shape = 3 # the action space
    self.actions_=np.identity(self.action_shape,dtype=bool).tolist()
    self.gamma=[0.99] # decay rate of past observations
    self.learning_rate= 1e-5 # learning rate in deep learning
    self.lambda_=0.90       #λ is a hyperparameter for GAE(Generalized Advantage Estimation)
    self.alpha=1e-4
    self.epsilon = 1.0
  
    self.Actor_model=self._create_model('Actor')    #Target Model is model used to calculate target values
    self.Critic_model=self._create_model('Critic')  #Training Model is model to predict q-values to be used.
   
        # record observations
    self.states=[]
    self.rewards=[]
    self.actions=[]
    self.last_state=np.zeros((1,100,160,4))

  def _create_model(self,model_type):
    ''' builds the model using keras'''
    inputs=Input(shape=(self.state_shape))
    layer_1=(Conv2D(32, (8, 8),strides=(4, 4)))(inputs)  
    layer_2=(BatchNormalization())(layer_1)
    layer_3=(Activation('relu'))(layer_2)
    layer_4=(Conv2D(64, (4, 4),strides=(2, 2)))(layer_3)
    layer_5=(BatchNormalization())(layer_4)
    layer_6=(Activation('relu'))(layer_5)
    layer_7=(Conv2D(128, (4, 4),strides=(2, 2)))(layer_6)
    layer_8=(BatchNormalization())(layer_7)
    layer_9=(Activation('relu'))(layer_8)
    layer_10=(Flatten())(layer_9)
    layer_11=(Dense(512))(layer_10)
    layer_12=(Activation('relu'))(layer_11)
    layer_13=(Dense(self.action_shape))(layer_12)
    layer_14=(Activation('softmax'))(layer_13)    
    layer_15=(Dense(1))(layer_12)

    if model_type=='Actor':
      model=Model(inputs,layer_14)
    else:
       model=Model(inputs,layer_15)
    model.compile(loss='mse',optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
    return model

This is the preprocessing we do to the image we obtained by interacting with the environment. Here I have done grayscaling and also cropped the image to to speed up the training process.

In [None]:
  def get_crop_and_grayscale_frame(self,frame):
    frame=cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    frame=frame[40:,:]
    frame=frame/255.
    frame= resize(frame,(100,160))
    return frame

Action Selection

The get_action method guides its action choice. It uses the neural network to generate a normalized probability distribution for a given state. Then, it samples its next action from this distribution.

In [None]:
  def get_action(self, state,status='Training'):

    '''samples the next action based on the policy probabilty distribution 
      of the actions'''
    if status=='Testing':
        return np.argmax(self.Actor_model.predict(state).flatten())

    action_probability_distribution=self.Actor_model.predict(state).flatten()    
    action = np.random.choice([0,1,2],p=action_probability_distribution.ravel())

    return action

The remember method records the observations of each step.

In [None]:
  def remember(self, state, reward,action,last_state,done):
    '''stores observations'''
    self.states.append(state)
    self.rewards.append(reward)
    self.actions.append(action)
    if done:
      self.last_state[0]=last_state

The get_GAEs method calculates Generalized advantage estimation (GAE) which later is used to calculate Advantage.

In [None]:
  def get_GAEs(self,v_preds):
    '''
    Advantage Estimation with GAE
    '''
    gaes = np.zeros((len(self.rewards),1))
    future_gae = 0.0
    for t in reversed(range(len(self.rewards))):
      delta = self.rewards[t] + np.asarray(self.gamma) * v_preds[t + 1] - v_preds[t]
      gaes[t] = future_gae = delta + np.asarray(self.gamma) * np.asarray(self.lambda_) *np.asarray(future_gae)
    return gaes

Updating the models

The update_models method calculates gradients w.r.t local network and update global network and also sets weights of local network to global network  

In [None]:
  def update_models(self):
    '''
    Updates the network.
    '''
    #get V_preds and V_tar from critic model
    print('updating')
    states = (np.array(self.states)).reshape((-1,100,160,4))
    actions = (np.array(self.actions)).reshape((-1,1))
   
    V_s = self.Critic_model.predict(states)
    V_last_state = self.Critic_model.predict(self.last_state)
    v_all = np.concatenate((V_s,V_last_state),axis=0)

    Advantages=self.get_GAEs(v_all) #Calculating the Advantage
  
    critic_targets = Advantages +  V_s
    
    self.Critic_model.fit(states, critic_targets,epochs=3) #Training the Local Critic Model
    def train_step(states,Advantages,actions):
      optimizer = tf.keras.optimizers.Adam(learning_rate=3e-3)
      with tf.GradientTape() as tape:
        Advantages=tf.stop_gradient(Advantages)
        logits=self.Actor_model(states)
        negative_likelihoods = tf.nn.softmax_cross_entropy_with_logits(labels=actions,logits=logits)
        weighted_negative_likelihoods = tf.multiply(negative_likelihoods,Advantages)
        loss = tf.reduce_mean(weighted_negative_likelihoods)
      grads = tape.gradient(loss,self.Actor_model.trainable_variables)
      grads = [(tf.clip_by_value(grad, -1.0, 1.0)) for grad in grads]
      optimizer.apply_gradients(zip(grads, self.global_Actor_model.trainable_variables))    #Training the Global Actor Model
      self.Actor_model.set_weights(self.global_Actor_model.get_weights())                   #Setting the local Actor Model weights to global Newtork
    train_step(tf.cast(states,tf.float32),tf.cast(Advantages,tf.float32),tf.cast(actions,tf.float32))  
    self.states=[];self.rewards=[];self.actions=[];

Training the model 


This method creates a training environment for the model. Iterating through a set number of episodes, it uses the model to sample actions and play them. When such a timestep ends, the model is using the observations to update the policy. We know that in a dynamic game we cannot predict action based on 1 observation(which is 1 frame of the game in this case) so we will use a stack of 4 frames to predict the output.

In [None]:
  def run(self):
    while A3C_worker.global_episode < self.no_of_episodes:
      # each episode is a new game env
      self.env.new_episode();
      state=self.env.get_state().screen_buffer
      done=False
      state= self.get_crop_and_grayscale_frame(state)
      stacked_frames = np.stack((state,state,state,state),axis=2)
      state_ = stacked_frames.reshape(1,stacked_frames.shape[0],stacked_frames.shape[1],stacked_frames.shape[2])
      episode_reward=0 #record episode reward
      print("Episode Started")
      while not done:
        # play an action and record the game state & reward per episode
        action=self.get_action(state_)
        reward=self.env.make_action(self.actions_[action])
        done = self.env.is_episode_finished()
        if not done:
          next_state = self.env.get_state().screen_buffer
          next_state = self.get_crop_and_grayscale_frame(next_state)
          next_state_ = next_state.reshape(1,next_state.shape[0],next_state.shape[1],1)
          next_state_ = np.append(next_state_, state_[:, :, :, :3], axis=3)
        else:
          next_state_ = state_
        if self.env.is_player_dead():
          reward=reward-1
        self.remember(state_, reward,action,next_state_,done)
        state_=next_state_
        episode_reward+=reward
      self.update_models()
      if A3C_worker.global_episode%50==0 and A3C_worker.global_episode!=0:
        self.evaluate(A3C_worker.global_episode,self.worker_idx)
      self.epsilon = self.epsilon - 0.01
      A3C_worker.global_moving_average_reward = record(self.epsilon,A3C_worker.global_episode, episode_reward, self.worker_idx,A3C_worker.global_moving_average_reward, self.result_queue)
      if episode_reward > A3C_worker.best_score:
        with A3C_worker.save_lock:
          print("Saving best model", "Episode score: {}".format(episode_reward))
          self.global_Actor_model.save('model_{}_{}.h5'.format(self.worker_idx,A3C_worker.global_episode))
          A3C_worker.best_score = episode_reward
      A3C_worker.global_episode += 1
    self.result_queue.put(None)

Defining the A3C Global Class. 


In [None]:
class A3C_Global_Network():

  def __init__(self):
    self.state_shape= 100,160, 4 # the state space
    self.action_shape = 3 # the action space
    self.global_model = self.create_global_actor()

  def create_global_actor(self):
    ''' builds the model using keras'''
    inputs=Input(shape=(self.state_shape))
    layer_1=(Conv2D(32, (8, 8),strides=(4, 4)))(inputs)  
    layer_2=(BatchNormalization())(layer_1)
    layer_3=(Activation('relu'))(layer_2)
    layer_4=(Conv2D(64, (4, 4),strides=(2, 2)))(layer_3)
    layer_5=(BatchNormalization())(layer_4)
    layer_6=(Activation('relu'))(layer_5)
    layer_7=(Conv2D(128, (4, 4),strides=(2, 2)))(layer_6)
    layer_8=(BatchNormalization())(layer_7)
    layer_9=(Activation('relu'))(layer_8)
    layer_10=(Flatten())(layer_9)
    layer_11=(Dense(512))(layer_10)
    layer_12=(Activation('relu'))(layer_11)
    layer_13=(Dense(self.action_shape))(layer_12)
    layer_14=(Activation('softmax'))(layer_13)

    model=Model(inputs,layer_14)
   
    return model
  
  def train(self,no_of_episodes):
    res_queue  = Queue()
    workers = [A3C_worker(self.global_model,i,res_queue,no_of_episodes) for i in range(multiprocessing.cpu_count())]
    for i, worker in enumerate(workers):
      print("Starting worker {}\n".format(i))
      worker.start()

    moving_average_rewards = []  # record episode reward to plot
    while True:
      reward = res_queue.get()
      if reward is not None:
        moving_average_rewards.append(reward)
      else:
        break
    [w.join() for w in workers]
    plt.plot(moving_average_rewards)
    plt.ylabel('Moving average ep reward')
    plt.xlabel('Step')
    plt.show()

In [None]:
no_of_episodes=1000

Agent = A3C_Global_Network()
Agent.train(no_of_episodes)

With the help of below code we run our algorithm and see the success of it

In [None]:
class tester:

  def __init__(self,global_actor_path):
      self.Actor_model = load_model(global_actor_path)     #import model
      self.action_shape = 3
      self.actions_ = np.identity(self.action_shape,dtype=bool).tolist()

  def get_action(self, state):
    '''samples the next action based on the policy probabilty distribution 
      of the actions'''
    action_probability_distribution=(self.Actor_model.predict(state))[0]
    action=np.argmax(action_probability_distribution)
    return action
    

  def get_crop_and_grayscale_frame(self,frame):
    frame=cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    frame=frame[40:,:]
    frame=frame/255.
    frame=resize(frame,(100,160))
    return frame

In [None]:
writer = imageio.get_writer("test_video.mp4", fps=20)
env = get_game()
env.new_episode();
state = env.get_state().screen_buffer
writer.append_data(state)
done=False
test=tester("Global_Actor.h5")
state=test.get_crop_and_grayscale_frame(state)
stacked_frames = np.stack((state,state,state,state),axis=2)
stacked_frames = stacked_frames.reshape(1,stacked_frames.shape[0],stacked_frames.shape[1],stacked_frames.shape[2]) 
state_=stacked_frames
episode_reward = 0
while True:
  action=test.get_action(state_)
  reward=env.make_action(test.actions_[action]) 
  episode_reward+=reward
  done = env.is_episode_finished()
  if not done:
    next_state = env.get_state().screen_buffer
    writer.append_data(next_state)
    next_state=test.get_crop_and_grayscale_frame(next_state)
    next_state_ = next_state.reshape(1,next_state.shape[0],next_state.shape[1],1)
    stacked_frames_1 = np.append(next_state_, stacked_frames[:, :, :, :3], axis=3)
    next_state_=stacked_frames_1
  else:
    next_state_=stacked_frames
  state_=next_state_
  if done:
    break
writer.close()
print('Reward:',episode_reward)