# Asychronous Advantage Actor-Critic for Kung-Fu

## Part-0 Installing the required packages and importing the libraries

### Installing gymnasium

In [None]:
! pip install gymnasium
!pip install "gymnasium[atari,accept-rom-license]"
!pip install ale-py
!apt-get install -y swig
!pip install gymnasium[box2d]


Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting ale-py>=0.9 (from gymnasium[accept-rom-license,atari])
  Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Downloading ale_py-0.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling colle

### Importing the libraries

In [None]:
import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import ale_py
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium import ObservationWrapper

import torch.optim as optim
from collections import deque
from torch.utils.data import DataLoader,TensorDataset

## Part-1 Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):

  def __init__(self, action_size):
    super(Network, self).__init__()
    self.conv1 = torch.nn.Conv2d(in_channels = 4,  out_channels = 32, kernel_size = (3,3), stride = 2)
    self.conv2 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
    self.conv3 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
    self.flatten = torch.nn.Flatten()
    self.fc1  = torch.nn.Linear(512, 128)
    self.fc2a = torch.nn.Linear(128, action_size)
    self.fc2s = torch.nn.Linear(128, 1)

  def forward(self, state):
    x = self.conv1(state)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = self.conv3(x)
    x = F.relu(x)
    x = self.flatten(x)
    x = self.fc1(x)
    x = F.relu(x)
    action_values = self.fc2a(x)
    state_value = self.fc2s(x)[0]
    return action_values, state_value

## Part-2 Training the AI

### Setting up the environment

In [None]:
class PreprocessAtari(ObservationWrapper):

  def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4):
    super(PreprocessAtari, self).__init__(env)
    self.img_size = (height, width)
    self.crop = crop
    self.dim_order = dim_order
    self.color = color
    self.frame_stack = n_frames
    n_channels = 3 * n_frames if color else n_frames
    obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
    self.observation_space = Box(0.0, 1.0, obs_shape)
    self.frames = np.zeros(obs_shape, dtype = np.float32)

  def reset(self):
    self.frames = np.zeros_like(self.frames)
    obs, info = self.env.reset()
    self.update_buffer(obs)
    return self.frames, info

  def observation(self, img):
    img = self.crop(img)
    img = cv2.resize(img, self.img_size)
    if not self.color:
      if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = img.astype('float32') / 255.
    if self.color:
      self.frames = np.roll(self.frames, shift = -3, axis = 0)
    else:
      self.frames = np.roll(self.frames, shift = -1, axis = 0)
    if self.color:
      self.frames[-3:] = img
    else:
      self.frames[-1] = img
    return self.frames

  def update_buffer(self, obs):
    self.frames = self.observation(obs)

def make_env():
  env = gym.make("KungFuMasterDeterministic-v0", render_mode = 'rgb_array')
  env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
  return env

env = make_env()

state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("State shape:", state_shape)
print("Number actions:", number_actions)
print("Action names:", env.env.env.env.get_action_meanings())

  logger.deprecation(


State shape: (4, 42, 42)
Number actions: 14
Action names: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'DOWNRIGHT', 'DOWNLEFT', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


### Initializing the hyperparameters

In [None]:
learning_rate = 1e-4
discount_factor = 0.99
number_environments = 10 # 10 agenttai l gesen ug ym baina.

The policy network outputs the probabilities of each action.


The critic outputs a single value for the current state, not per action.

### Implementing the A3C class

In [None]:
class Agent():
    def __init__(self,action_size):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.action_size = action_size
        self.network = Network(action_size).to(self.device)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr = learning_rate)

    def act(self,state):
        if state.ndim == 3 : # hervee uguu 4n inputtai bsn shit in 1dimensionoo aldaad 3 bolchuul,
            state = [state]  # ene ni dimension nemeed ugchij baigaan bn . Aimr sonin shaazgaaz ahaha.
        state = torch.tensor(state,dtype=torch.float32,device = self.device) # tegeed teriigee tensor bolgoj convert hiigeed.
        action_values,_ = self.network(state)  # yurn ni l nn.Module-iin forward func- ni automatoor ajillaad yvchdag yum baina. ene shit bol forward- func--iin l utgig avmaar baigaa shit.
        policy = F.softmax(action_values,dim=-1) # -1 gesneeree softmax ni applied across the last dimension(action gesen ug manaid bol)
         # tegeed bas softmax function ni q_value-nuudiig probability distribution bolgoj uurchilj baigaa.
        return np.array([np.random.choice(len(p),p = p) for p in policy.detach().cpu().numpy()])
         # Detaches the tensor from the computation graph, samples an action based on the probabilities stored in policy.
          # tegheer bid nart heden state orj irsen , ter toonii size-tai array butsaana. butsaahdaa(hamgiin ih probability-tai action-uudiin index-uudiig)

    def step(self,state,action,reward,next_state,done):
        batch_size = state.shape[0] # state-iin first dimension ni represents the number of state oservations in the batch
        state = torch.tensor(state,dtype=torch.float32,device=self.device)
        next_state = torch.tensor(next_state,dtype=torch.float32,device=self.device)
        reward = torch.tensor(reward,dtype=torch.float32,device=self.device)
        done = torch.tensor(done,dtype=torch.bool,device=self.device).to(dtype = torch.float32)

        action_values,state_values = self.network(state)
            # action value ni gives expected return for doing certain action
            # state value ni gives the expected return for being in a certain state

        _,next_state_value = self.network(next_state)
        target_state_value = reward+ discount_factor*next_state_value*(1-done) # target_state_value represents what the value of the current state should be, based on the reward and the expected value of the next state.
        advantage = target_state_value - state_values # Advantage measures how much better (or worse) the action taken was compared to the expected value of the current state.

        probs = F.softmax(action_values,dim=-1)   # nuguu lalriin actor's loss-iin tomyogoo l shaajin.
        log_probs = F.log_softmax(action_values,dim=-1)  # bas critic loss.  ted nariign ni olchood l weight ntree optimize shaahgeed baigaan ug ni
        entropy = -torch.sum(probs*log_probs,axis=-1)   # Measures the randomness in the policy. Higher entropy means the policy is more exploratory (less deterministic).
        batch_idx = np.arange(batch_size)  # mnai case ni 10n agent baigaa tul 0-9 hurtelh index too.
        logp_actions = log_probs[batch_idx,action]  # ene bol agent bolgonii songoson action-ii prob uudiig return.
        actor_loss = -(logp_actions*advantage.detach()).mean() - 0.001*entropy.mean()

        critic_loss = F.mse_loss(target_state_value.detach(),state_values)  # ene der detach hiigeed bga ni gradient-iig ni l ashiglahgui shaay gejgaan.
        total_loss = actor_loss + critic_loss

        self.optimizer.zero_grad() # gradient accumulation bolohoos sergiilj baigaa(umnuh iteration-uudiin)
        total_loss.backward()
        self.optimizer.step()  # Updates network weights with the gradients.


state_value represents how "good" the current state is in terms of future expected rewards.

next_state_value represents how "good" the next state is expected to be.

The Critic minimizes the difference between target_state_value and state_value.
target_state_value represents what the value of the current state should be, based on the reward and the expected value of the next state.

The Actor optimizes actions to maximize Advantage.

Actor: Optimizes action selection.

Critic: Optimizes state value estimation.

Entropy Bonus: Encourages exploration.

Backpropagation: Updates both Actor and Critic together.

### Initializing the A3C agent

In [None]:
agent = Agent(number_actions)

### Evaluating our A3C agent on a certain number of episodes

In [None]:
def evaluate(agent, env, n_episodes=1):
    episodes_rewards = []
    for _ in range(n_episodes):
        state, _ = env.reset()
        total_reward = 0
        while True:
            action = agent.act(state)
            state, reward, done, info, _ = env.step(action[0]) # action ni numpy array orj ireh tul , bas bid nar evualating the agent in a non batch mode, ingeh ystoi.
                # non batch gedeg ni bulgeere bish buyu, one sample at a time gesen ug. zaza yurn 2 bish [2] gej orj ireh tul l ingeh ystoi.
            total_reward += reward
            if done:
                break
        episodes_rewards.append(total_reward)
    return episodes_rewards

# heden episode gej oruulj ugnu , ternii toogor ni value-nuudiin list hiij yvulj baina.
# actually bur ehnees ni duustal hiij2 bgad buh value-nuud ni gesen ug.

### Managing multiple environments simultaneously

In [None]:
# buh environment-uudiig asynchronously create , reset,step hiideg function..
class EnvBatch:
    def __init__(self,n_envs=10):
        self.envs = [make_env() for _ in range(n_envs)]  # 10 tusdaa env buyu 10 agent l ymdaa.

    def reset(self):
        _states = []
        for env in self.envs :
            _states.append(env.reset()[0])  # ted nariigaa bugdiign ni states list ruu shaaj bn.
        return np.array(_states)

    def step(self,actions):   # bid nariin hiisen step bish , env step shuu. neg action-g execute hiigeeteh l gesen ug.

        next_states,rewards,dones,infos,_ = map(np.array,zip(*[env.step(a) for env,a in zip(self.envs,actions)]))
                                            # Agent bolgond hargalzah action-g ni tuunii env-tai holbood, uildel hiilguuleed shaana.tegeed garsan ur dun ni tus tusdaa hadgalagdahaar map.. ntr
        for i in range(len(self.envs)):
            if dones[i]:
                next_states[i] = self.envs[i].reset()[0]  # hervee tegeed agent ni env-aa duusgatsan baival reset hiigeed urgeljluuleed shaajin.
        return next_states,rewards,dones,infos

### Training the A3C agent

In [None]:
import tqdm  # just for visualising the progress bar.

env_batch = EnvBatch(number_environments)
batch_states = env_batch.reset()

with tqdm.trange(0,3001) as progress_bar:  # manai training for loop-tei adilhan hemjeetei baih yostoi.
    for i in progress_bar :
        batch_actions = agent.act(batch_states)
        batch_next_states,batch_rewards,batch_dones,_ = env_batch.step(batch_actions)
        batch_rewards*=0.01 # stablize our training
        agent.step(batch_states,batch_actions,batch_rewards,batch_next_states,batch_dones)
        batch_states = batch_next_states
        if i%1000 == 0:
            print("Average agent reward: ",np.mean(evaluate(agent,env,n_episodes=10)))


  critic_loss = F.mse_loss(target_state_value.detach(),state_values)  # ene der detach hiigeed bga ni gradient-iig ni l ashiglahgui shaay gejgaan.
  0%|          | 5/3001 [00:42<5:19:48,  6.40s/it] 

Average agent reward:  950.0


 33%|███▎      | 1005/3001 [01:47<1:19:37,  2.39s/it]

Average agent reward:  460.0


 67%|██████▋   | 2005/3001 [02:51<39:13,  2.36s/it]  

Average agent reward:  680.0


100%|██████████| 3001/3001 [03:53<00:00, 12.84it/s]

Average agent reward:  560.0





## Part-3 Visualising the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env):
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action[0])
  env.close()
  imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()