# A3C  (Asynchronous Advantage Actor Critic) Neural Network

I'm using this network to train a kangaroo mother in an Atari game. She has to find her baby and fight different obstacles showing up in her way. The environment is provided at https://gymnasium.farama.org/. A3C neural networks achieve great results through two innovations: one is adding a critic to the usual setup with an actor (agent); another is training several actors at the same time and combining thier results together (that's the "asynchronous" part). The actor network is used to choose actions and the critic network evaluates them. Thus, at the end, the loss functions is a sum of two losses: one for the actors and one for the critic. The actor's loss uses so called advantage (the last of three A's in the name). The formula is advantage = target_state_value - state_value, where the former comes from the Bellman equation and the latter is the result of performing the forward pass of a current state through the network.

### Installing packages and importing the libraries

---



In [1]:
# gymnasium:

!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting shimmy<1.0,>=0.1.0 (from shimmy[atari]<1.0,>=0.1.0; extra == "atari"->gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl.metadata (2.3 kB)
Collecting autorom~=0.4.2 (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Collecting AutoROM.acce

In [2]:
# Importing libraries:

import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import gymnasium as gym
from gymnasium import ObservationWrapper
from gymnasium.spaces import Box

### Creating the architecture of the Network:

In [115]:
class Network(nn.Module):
  def __init__(self,action_size, seed = 42):
    super(Network,self).__init__()
    self.seed = torch.manual_seed(seed)
    self.conv_layers = nn.Sequential(
            nn.Conv2d(4, 16, kernel_size=3, stride=1, padding=1),  # Output: (16, 42, 42)
            nn.BatchNorm2d(16),
            nn.ReLU(),

            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),  # Output: (32, 42, 42)
            nn.BatchNorm2d(32),
            nn.ReLU(),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),  # Output: (64, 42, 42)
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),  # Output: (128, 42, 42)
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.MaxPool2d(kernel_size=2, stride=2),  # Output: (128, 21, 21)
        )

    self.flatten = nn.Flatten()

        # Calculating the input dimension and the first dense layer:
    self.fc_input_dim = 128 * 21 * 21  # (128 channels, 21 height, 21 width)
    self.fc1 = nn.Linear(self.fc_input_dim, 128)

        # Dense layers:
    self.fc2actor = torch.nn.Linear(128, action_size) # actor
    self.fc2critic = torch.nn.Linear(128, 1)   # critic

  def forward(self, state):
    x = self.conv_layers(state)
    x = self.flatten(x)
    x = self.fc1(x)
    x = F.relu(x)
    actions_values = self.fc2actor(x)
    state_value = self.fc2critic(x)[0]
    return actions_values, state_value

  and should_run_async(code)


## Training the AI:

### Environment:

In [116]:
# The code in this cell is provided by the Udemy course Artificial Intelligence A-Z 2024:

class PreprocessAtari(ObservationWrapper):

  def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4):
    super(PreprocessAtari, self).__init__(env)
    self.img_size = (height, width)
    self.crop = crop
    self.dim_order = dim_order
    self.color = color
    self.frame_stack = n_frames
    n_channels = 3 * n_frames if color else n_frames
    obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
    self.observation_space = Box(0.0, 1.0, obs_shape)
    self.frames = np.zeros(obs_shape, dtype = np.float32)

  def reset(self):
    self.frames = np.zeros_like(self.frames)
    obs, info = self.env.reset()
    self.update_buffer(obs)
    return self.frames, info

  def observation(self, img):
    img = self.crop(img)
    img = cv2.resize(img, self.img_size)
    if not self.color:
      if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = img.astype('float32') / 255.
    if self.color:
      self.frames = np.roll(self.frames, shift = -3, axis = 0)
    else:
      self.frames = np.roll(self.frames, shift = -1, axis = 0)
    if self.color:
      self.frames[-3:] = img
    else:
      self.frames[-1] = img
    return self.frames

  def update_buffer(self, obs):
    self.frames = self.observation(obs)

def make_env():
  env = gym.make("KangarooDeterministic-v0", render_mode = 'rgb_array')
  env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
  return env

env = make_env()

state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("Observation shape:", state_shape)
print("Number actions:", number_actions)
print("Action names:", env.env.env.get_action_meanings())

Observation shape: (4, 42, 42)
Number actions: 18
Action names: ['NOOP', 'FIRE', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'UPRIGHT', 'UPLEFT', 'DOWNRIGHT', 'DOWNLEFT', 'UPFIRE', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


  logger.deprecation(
  logger.warn(


### Hyperparameters:

In [117]:
learning_rate = 3e-4
discount_factor = 0.9
# Number of actors that will be trained:
number_of_environments = 30

### Implementing the A3C class

In [118]:
class Actor():
  def __init__(self, action_size):
    self.device =torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    self.action_size = action_size
    self.network = Network(action_size).to(self.device)
    self.optimizer =torch.optim.Adam(self.network.parameters(), lr = learning_rate)

  # I use the softmax strategy for choosing action, insead of epsilon greedy policy:
  def act(self, state):
  # State needs to be in the batch (extra dimension upfront):
    if state.ndim == 3:
      state = [state]
    state  = torch.tensor(state, dtype = torch.float32, device = self.device)

  # Calling the network on the state to get action values:
    action_values, _ = self.network.forward(state)

  # Softmax returns the probabilistic distribution, then, to get one especific action,
  # we simply sample the action from this distribution using choise function:
    policy = F.softmax(action_values, dim = -1)
    return np.array([np.random.choice(len(p), p = p) for p in policy.detach().cpu().numpy()])

  def step(self, state, action, reward, next_state, done):  # inputs are in batches
    batch_size = state.shape[0]
  # All inputs are numpy arrays, we want them to be tensors:
    state = torch.tensor(state, dtype = torch.float32, device = self.device )
    next_state = torch.tensor(next_state, dtype = torch.float32, device = self.device )
    reward = torch.tensor(reward, dtype = torch.float32, device = self.device )
    done = torch.tensor(done, dtype = torch.bool, device = self.device ).to(dtype = torch.float32)
    action_values, state_value = self.network(state)
    _, next_state_value =self.network(next_state)

  # Bellman equation gets us the target state value:
    target_state_value = reward + discount_factor * next_state_value * (1 - done)

  # Advantage feature:
    advantage = target_state_value - state_value

  # Actor's loss requires computing entropy:
    probs = F.softmax(action_values, dim = -1)
    logprobs = F.log_softmax(action_values, dim = -1)
    entropy = -torch.sum(probs * logprobs, dim = -1)
    batch_idx = np.arange(batch_size)

  # Select log probs of the actions
    logp_actions = logprobs[batch_idx, action]

  # Actor's loss:
    actor_loss = -(logp_actions * advantage.detach()).mean() - 0.001 * entropy.mean()

  # Critic's loss:
    critic_loss = F.mse_loss(target_state_value.detach(), state_value)

  # Total loss:
    total_loss = actor_loss + critic_loss

    self.optimizer.zero_grad()
    total_loss.backward()
    self.optimizer.step()






### Initializing the A3C actor:

In [119]:
actor = Actor(number_actions)

### Evaluating our A3C agent on a single episode

In [120]:
# Returning accumulated rewards over some number of episodes:

def evaluate(actor,env, n_episodes = 1):
  episodes_rewards =[];
  for _ in range(n_episodes):
    state, _ = env.reset()
    total_reward = 0
    while True:
      action = actor.act(state)

# State below is the next state the agent reaches after this state, using the env method, step()
      state, reward, done, info, _ = env.step(action[0])
      total_reward += reward
      if done:
        break
    episodes_rewards.append(total_reward)
  return episodes_rewards

### Testing multiple actors on multiple environments at the same time:

In [121]:
class EnvBatch:

  def __init__(self, n_envs = 25):
    self.envs = [make_env() for _ in range(n_envs)]

  def reset(self):
    _states =[]
    for env in self.envs:
      _states.append(env.reset()[0])
    return np.array(_states)

# Defining a multienvironment step method that takes in mutiple actors:

  def step(self, actions):
    next_states, rewards, dones, infos, _ = map(np.array, zip(*[env.step(a) for env, a in zip(self.envs, actions)]))
    for i in range(len(self.envs)):
      if dones[i]:
        next_states[i] = self.envs[i].reset()[0]
    return next_states, rewards, dones, infos



### Training the A3C actors:

In [122]:
import tqdm

env_batch = EnvBatch(n_envs = number_of_environments)
batch_states = env_batch.reset()
with tqdm.trange(4001) as progress_bar:
  for i in progress_bar:

    # Taking action:
    batch_actions = actor.act(batch_states)
    batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions)

    # Stabilizing training:
    batch_rewards *= 0.01

    # Training:
    actor.step(batch_states, batch_actions, batch_rewards, batch_next_states, batch_dones)

    # Updating batch_states:
    batch_states = batch_next_states
    if i % 1000 ==0:
      print("Average actor reward: ", np.mean(evaluate(actor, env, n_episodes = 10)))

  critic_loss = F.mse_loss(target_state_value.detach(), state_value)
  state  = torch.tensor(state, dtype = torch.float32, device = self.device)
  0%|          | 1/4001 [01:13<82:04:18, 73.86s/it]

Average actor reward:  60.0


 25%|██▌       | 1001/4001 [27:19<12:54:57, 15.50s/it]

Average actor reward:  0.0


 50%|█████     | 2001/4001 [54:55<17:25:13, 31.36s/it]

Average actor reward:  0.0


 75%|███████▌  | 3001/4001 [1:22:13<6:14:01, 22.44s/it]

Average actor reward:  20.0


100%|██████████| 4001/4001 [1:49:42<00:00,  1.65s/it]

Average actor reward:  400.0





### Visualizing the results:

In [123]:
# The code in this cell is provided by the Udemy course Artificial Intelligence A-Z 2024:

import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(actor, env):
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = actor.act(state)
    state, reward, done, _, _ = env.step(action[0])
  env.close()
  imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(actor, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

  logger.warn(


By all means, mama kangaroo did not succeed. I tried  convolutional layers with different number of nodes and different sizes of kernels, as well as different learning rate, number of actors, and discount factor. 400 points is my personal best.


I think that this environment is spacially very complex. Actors have to learn to get to a ladder, on several levels, while avoiding falling bullets and hostile creatures. If you have any comments on how to improve this architecture, please message me on linkedin: https://www.linkedin.com/in/elzbieta-polak/

