# A3C for Kung Fu

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.


### Importing the libraries

In [None]:
import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import gymnasium as gym
from gymnasium import ObservationWrapper
from gymnasium.spaces import Box

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):

  def __init__(self, action_size): #Input action_size only for Convolutional Neural Networks as in DeepQconv.
    super(Network, self).__init__() #Network inherits from nn.Module super activates the inheretance
    self.conv1 = torch.nn.Conv2d(in_channels = 4,  out_channels = 32, kernel_size = (3,3), stride = 2) #intitlized with 4 greyscale frame for KungfuEnvironment
    self.conv2 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2) #These values are selected for optimal tuning
    self.conv3 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
    self.flatten = torch.nn.Flatten() # Creating Flatening Layer
    self.fc1  = torch.nn.Linear(512, 128)
    self.fc2a = torch.nn.Linear(128, action_size) # Final Output Layer 1 -- Q values
    self.fc2s = torch.nn.Linear(128, 1) # Final Output Layer 2 -- Critic

  def forward(self, state): # state as in input frames
    x = self.conv1(state)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = self.conv3(x)
    x = F.relu(x)
    x = self.flatten(x)
    x = self.fc1(x)
    x = F.relu(x)
    action_values = self.fc2a(x) # Q values that the AI can play
    state_value = self.fc2s(x)[0] # State value V(s) (0) just gives the value not in any vector or array.
    return action_values, state_value

## Part 2 - Training the AI

### Setting up the environment

In [None]:
class PreprocessAtari(ObservationWrapper): #Preprocessed Atari Class

  def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4): #Defines Properties of the Environment
    super(PreprocessAtari, self).__init__(env)
    self.img_size = (height, width)
    self.crop = crop
    self.dim_order = dim_order
    self.color = color
    self.frame_stack = n_frames
    n_channels = 3 * n_frames if color else n_frames
    obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
    self.observation_space = Box(0.0, 1.0, obs_shape)
    self.frames = np.zeros(obs_shape, dtype = np.float32)

  def reset(self): #Metod to Reset the environment
    self.frames = np.zeros_like(self.frames)
    obs, info = self.env.reset()
    self.update_buffer(obs)
    return self.frames, info

  def observation(self, img): #Preprocess Images of the environment
    img = self.crop(img)
    img = cv2.resize(img, self.img_size)
    if not self.color:
      if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = img.astype('float32') / 255.
    if self.color:
      self.frames = np.roll(self.frames, shift = -3, axis = 0)
    else:
      self.frames = np.roll(self.frames, shift = -1, axis = 0)
    if self.color:
      self.frames[-3:] = img
    else:
      self.frames[-1] = img
    return self.frames

  def update_buffer(self, obs): #Updates the buffer
    self.frames = self.observation(obs)

def make_env(): #Creating the environment variable using Preprocessed Atariclass
  env = gym.make("KungFuMasterDeterministic-v0", render_mode = 'rgb_array')
  env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
  return env

env = make_env() #Created Environment

state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("State shape:", state_shape) # 4 greyscale images of dimensions 42x42
print("Number actions:", number_actions) # 14 possible actions AI can play
print("Action names:", env.env.env.get_action_meanings()) #Prints action names in the environment.

State shape: (4, 42, 42)
Number actions: 14
Action names: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'DOWNRIGHT', 'DOWNLEFT', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


  logger.warn(


### Initializing the hyperparameters

In [None]:
learning_rate = 1e-4 #Of Adam
discount_factor = 0.99
number_environments = 10 # To Train Multiple Agents in Multiple Environments in Parallel. Conversion will be very fast

### Implementing the A3C class

In [None]:
class Agent():

  def __init__(self, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.action_size = action_size
    self.network = Network(action_size).to(self.device) #only one network
    self.optimizer = torch.optim.Adam(self.network.parameters(), lr = learning_rate)

  def act(self, state):
    if state.ndim == 3: # In 3 dimensions 1 for which frame and 42x42 are 2  4 greyscales images here , if it is not in a batch
      state = [state] #extra dimsnsion 4 ie batch
    state = torch.tensor(state, dtype = torch.float32, device = self.device) #converting to a torch tensor.
    action_values, _ = self.network(state) #Callinng network to get the action Values, self.network.foreward(state) is what we are calling which is understood by the pytorch by default
    policy = F.softmax(action_values, dim = -1) #Softmax Policy insted in epsilon greedy because it is better here. -1 incicates last dimension
    return np.array([np.random.choice(len(p), p = p) for p in policy.detach().cpu().numpy()]) #Several actions to be returned in a numpy array choice allows to select from each probablity distribution (length(p),policy) detach --> detaches policy tensor from computational graph

  def step(self, state, action, reward, next_state, done): #updates model parameters after taking a step
    batch_size = state.shape[0] #First index used as a extra dimension
    state = torch.tensor(state, dtype = torch.float32, device = self.device) #Converting all to torch tensors
    next_state = torch.tensor(next_state, dtype = torch.float32, device = self.device)
    reward = torch.tensor(reward, dtype = torch.float32, device = self.device)
    done = torch.tensor(done, dtype = torch.bool, device = self.device).to(dtype = torch.float32) # dtype float 32 to change it to float again because all computations are in float.
    action_values, state_value = self.network(state) # returns the q value and critic
    _, next_state_value = self.network(next_state) # returning net state vlaue (critic)
    target_state_value = reward + discount_factor * next_state_value * (1 - done) #Bellman equation
    advantage = target_state_value - state_value # Advantage of A3C (Part of Actor's Loss) Here we will compute two losses
    probs = F.softmax(action_values, dim = -1) #Entropy for Actors Loss Probablities over action values and log probablities over same action values
    logprobs = F.log_softmax(action_values, dim = -1)
    entropy = -torch.sum(probs * logprobs, axis = -1) #Entropy -1 Represnts Last dimension
    batch_idx = np.arange(batch_size) #Selecting log probablities of the actions that are taken in the batch.
    logp_actions = logprobs[batch_idx, action] #Selection of only the indexes that are selected
    actor_loss = -(logp_actions * advantage.detach()).mean() - 0.001 * entropy.mean()  # -ve mean of the product of log probablities of the actions played and the advantages - 0.001*entropy(exploration feature) which balances importance of entropy
    critic_loss = F.mse_loss(target_state_value.detach(), state_value) # MSE btw target state value and the state value
    total_loss = actor_loss + critic_loss
    self.optimizer.zero_grad() #Reseting Optimizer zeroing out gradients
    total_loss.backward() #BackPropogation
    self.optimizer.step() #Step from Adam which updates the neural networks.

### Initializing the A3C agent

In [None]:
agent = Agent(number_actions)

### Evaluating our A3C agent on a certain number of episodes

In [None]:
def evaluate(agent, env, n_episodes = 1): #Evaluates A3C on certain number of episodes
  episodes_rewards = [] #list of episode rewards
  for _ in range(n_episodes): #looping on number of episodes
    state, _ = env.reset() # resetting environment (initilized state) returns state and some variables which are discarded
    total_reward = 0
    while True:
      action = agent.act(state) # Playing a action in first time step
      state, reward, done, info, _ = env.step(action[0]) #step metod which calcaulates all these parameters info is not requiered but taken and one more value discarded aciton[0] beacuse it is a numpy array.
      total_reward += reward
      if done:
        break
    episodes_rewards.append(total_reward) # Appending rewards of each episodes.
  return episodes_rewards # Returns rewards in each episode (list)

### Managing multiple environments simultaneously

In [None]:
# Asynchronous
class EnvBatch: #Handeling multiple enviornments (Parallelized Reinforcemnet Learning)

  def __init__(self, n_envs = 10): # By default Creating 10 environments.
    self.envs = [make_env() for _ in range(n_envs)] # Making environments using make_env() function with for loop in one shot. (0 to 9 environments)

  def reset(self): # Resets multiple enviroments at the same time.
    _states = [] # new list of initilized states from self.envs object containing differnent environments.
    for env in self.envs:
      _states.append(env.reset()[0]) #(0) index that stores only states wand returns.
    return np.array(_states)

  def step(self, actions): # Allows to step in multiple environments simultaneously. Inputs actions by different agents.
    next_states, rewards, dones, infos, _ = map(np.array, zip(*[env.step(a) for env, a in zip(self.envs, actions)])) # calling stepmethod from multiple environments which returns all these parameters
    # 2 loops runnig here env.step(a) -> step in each environment and for env (each environment) , a (each action in actions) zip(self.env() , actions) takes looping from two varible simultaneously ie self.env and actions which are arrays
    # a zip * allows to group all the env variables next_states, rewards etc.
    # map function converts all the  elements to numpy arrays. to zip of grouped elements.
    for i in range(len(self.envs)): # for all environments.
      if dones[i]: # If a perticular environment si done then reset the environment last step berfore returning elements.
        next_states[i] = self.envs[i].reset()[0]
    return next_states, rewards, dones, infos

### Training the A3C agent

In [None]:
import tqdm # Displays progress bar for loops

env_batch = EnvBatch(number_environments) # Creating 10 environments
batch_states = env_batch.reset() #Reset all states in every environments

with tqdm.trange(0, 3001) as progress_bar: #tqdm shows progress bar and prints the average score of the batch
  for i in progress_bar: # as progress bar sets the range as 3000.
    batch_actions = agent.act(batch_states) # all actions
    batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions) #all next states , rewards , dones calling multi environment step.
    batch_rewards *= 0.01 #Reduce the batch rewards to stabalize the training (we still need to optimize reduced rewards, which will not affect training)
    agent.step(batch_states, batch_actions, batch_rewards, batch_next_states, batch_dones)# coputes active loss and critic loss and combines these losses together and backpropogating the total loss in the NN with Adam optimizer that will update the parameters. For all batches.
    batch_states = batch_next_states #Update the batch states.
    if i % 1000 == 0: # Printing every 1000 iterations.
      print("Average agent reward: ", np.mean(evaluate(agent, env, n_episodes = 10))) # Mean of output of the evaluate Function.

  critic_loss = F.mse_loss(target_state_value.detach(), state_value) # MSE btw target state value and the state value
  0%|          | 5/4001 [00:41<6:53:33,  6.21s/it] 

Average agent reward:  440.0


 25%|██▌       | 1008/4001 [01:41<1:02:16,  1.25s/it]

Average agent reward:  570.0


 50%|█████     | 2008/4001 [02:40<58:36,  1.76s/it]  

Average agent reward:  730.0


 75%|███████▌  | 3008/4001 [03:35<26:40,  1.61s/it]

Average agent reward:  550.0


100%|██████████| 4001/4001 [04:27<00:00, 14.93it/s]

Average agent reward:  500.0





## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env):
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action[0])
  env.close()
  imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

