# A3C for Kung Fu

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [1]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!pip install ale-py
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 35 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 0s (8,997 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 126371 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Unpacking swig4.0 (4.0.2-1ubuntu1) ...
Selecting previously unselected package swig.
Preparing to unpack .../swig_4.0.2-1ubu

### Importing the libraries

In [2]:
# Make sure no GUI backend is used anywhere
import matplotlib
matplotlib.use("Agg")

import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import ale_py
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium import ObservationWrapper

# Use console tqdm only (no GUI) + disable monitor thread
from tqdm import tqdm, trange
tqdm.monitor_interval = 0

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [3]:
class Network(nn.Module):
  def __init__(self,action_size):
    super(Network,self).__init__()
    self.conv1=torch.nn.Conv2d(in_channels=4, out_channels=32, kernel_size=(3,3), stride=2)
    self.conv2=torch.nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3,3), stride=2)
    self.conv3=torch.nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3,3), stride=2)
    self.flatten=torch.nn.Flatten()
    self.fc1=torch.nn.Linear(512,128)
    self.fc2a=torch.nn.Linear(128,action_size)
    self.fc2s=torch.nn.Linear(128,1)

  def forward(self,state):
    x=F.relu(self.conv1(state))
    x=F.relu(self.conv2(x))
    x=F.relu(self.conv3(x))
    x=self.flatten(x)
    x=F.relu(self.fc1(x))
    action_value=self.fc2a(x)
    state_value=self.fc2s(x)[0]
    return action_value,state_value




## Part 2 - Training the AI

### Setting up the environment

In [4]:
class PreprocessAtari(ObservationWrapper):

  #Below method is used define properties of environment
  def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4):
    super(PreprocessAtari, self).__init__(env)
    self.img_size = (height, width)
    self.crop = crop
    self.dim_order = dim_order
    self.color = color
    self.frame_stack = n_frames
    n_channels = 3 * n_frames if color else n_frames
    obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
    self.observation_space = Box(0.0, 1.0, obs_shape)
    self.frames = np.zeros(obs_shape, dtype = np.float32)

  def reset(self):
    self.frames = np.zeros_like(self.frames)
    obs, info = self.env.reset()
    self.update_buffer(obs)
    return self.frames, info

  def observation(self, img):#to pre process images of the envirment
    img = self.crop(img)
    img = cv2.resize(img, self.img_size)
    if not self.color:
      if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = img.astype('float32') / 255.
    if self.color:
      self.frames = np.roll(self.frames, shift = -3, axis = 0)
    else:
      self.frames = np.roll(self.frames, shift = -1, axis = 0)
    if self.color:
      self.frames[-3:] = img
    else:
      self.frames[-1] = img
    return self.frames

  def update_buffer(self, obs):
    self.frames = self.observation(obs)

def make_env():
  env = gym.make("KungFuMasterNoFrameskip-v0", render_mode = 'rgb_array') # Corrected environment name
  env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
  return env

env = make_env()

state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("State shape:", state_shape)
print("Number actions:", number_actions)
print("Action names:", env.env.env.env.get_action_meanings())

  logger.deprecation(


State shape: (4, 42, 42)
Number actions: 14
Action names: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'DOWNRIGHT', 'DOWNLEFT', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


### Initializing the hyperparameters

In [5]:
learnig_rate = 1e-4
discount_factor = 0.99
number_environments = 10#it is the A3C benifit that multiple agent can run in multiple enviroment simultaneously


### Implementing the A3C class

In [6]:
class Agent():
  def __init__(self, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.action_size = action_size
    self.network = Network(action_size).to(self.device)
    self.optimizer = optim.Adam(self.network.parameters(), lr = learnig_rate)#update weight of nn

  def act(self,state):
    #this for batch bcz nn always expect that
    if state.ndim == 3:#this check that dimensions is 3 then it add one more dimension
      state = [state]
    state = torch.tensor(state, dtype = torch.float32, device=self.device)
    action_values, _ = self.network(state)#its a shortcut of self.network.forword(state)
    policy = F.softmax(action_values, dim = -1)#it ia a policy like epsilon greedy and dim =-1 is define that it has to perform this dunction untill all dimensions cover
    return np.array([np.random.choice(len(p), p = p.detach().cpu().numpy()) for p in policy])#in for loop the policy start as a tensor then it detech from computational then we move back to the cpu and convert tensor into the numpy


  def step(self, state, action, reward, next_state, done):
    batch_size = state.shape[0]
    state = torch.tensor(state, dtype=torch.float32, device=self.device)
    action = torch.tensor(action, dtype=torch.long, device=self.device)# Changed dtype to long
    reward = torch.tensor(reward, dtype=torch.float32, device=self.device)
    next_state = torch.tensor(next_state, dtype=torch.float32, device=self.device)
    done = torch.tensor(done, dtype=torch.bool, device=self.device).to(dtype=torch.float32)

    action_values, state_values = self.network(state)
    _, next_state_values = self.network(next_state)

    # Bellman target
    target_state_values = reward + discount_factor * next_state_values * (1 - done)#bellman equation # Corrected line

    # Advantage (normalized)
    advantage = target_state_values - state_values
    advantage = (advantage - advantage.mean()) / (advantage.std() + 1e-8)

    probs = F.softmax(action_values, dim=-1)
    logprobs = F.log_softmax(action_values, dim=-1)

    # Fixed entropy sign
    entropy = -(probs * logprobs).sum(axis=-1)

    batch_idx = torch.arange(batch_size)
    logs_actions = logprobs[batch_idx, action]

    actor_loss = -(logs_actions * advantage.detach()).mean() - 0.001 * entropy.mean()
    critic_loss = F.mse_loss(target_state_values.detach(), state_values)
    total_loss = actor_loss + critic_loss

    self.optimizer.zero_grad()
    total_loss.backward()
    self.optimizer.step()

### Initializing the A3C agent

In [7]:
agent = Agent(number_actions)

### Evaluating our A3C agent on a single episode

In [8]:
def evaluate(agent, env, n_episodes = 1):
  episode_rewards = []
  for _ in range(n_episodes):
    state, _ = env.reset()#reset the state and other variables
    total_reward = 0
    done = False
    while not done:
      action = agent.act(state)
      state, reward, done, info, _ = env.step(action[0])
      total_reward += reward
    episode_rewards.append(total_reward)
  return episode_rewards

### Testing multiple agents on multiple environments at the same time

In [9]:
class EnvBatch:
  def __init__(self, n_envs = 10):
    self.envs = [make_env() for _ in range(n_envs)]#make_env function is used to make environments

  def reset(self):#allow us to reset multiple env at a same time
    _states = []
    for env in self.envs:
      _states.append(env.reset()[0])
    return np.array(_states)

  def step(self, actions):
    next_states, reward, dones, infos, _ = map(np.array, zip(*[env.step(a) for env, a in zip(self.envs, actions)]))
    for i in range(len(dones)):# this check if any env is done or not if it is done it resets it
      if dones[i]:
        next_states[i] = self.envs[i].reset()[0]# this reset particular env
    return next_states, reward, dones, infos

### Training the A3C agent

In [10]:
env_batch = EnvBatch(number_environments)
batch_states = env_batch.reset()

# ✅ Console progress bar only; no GUI, no background monitor thread
for i in trange(3001):
  batch_actions = agent.act(batch_states)
  batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions)
  batch_rewards *= 0.01
  agent.step(batch_states, batch_actions, batch_rewards, batch_next_states, batch_dones)
  batch_states = batch_next_states
  if i % 1000 == 0:
    avg_rew = np.mean(evaluate(agent, env, n_episodes = 10))
    print("Average agent reward :", avg_rew)

  logger.deprecation(
  critic_loss = F.mse_loss(target_state_values.detach(), state_values)
  state = torch.tensor(state, dtype = torch.float32, device=self.device)
  0%|          | 7/3001 [02:00<10:30:52, 12.64s/it]  

Average agent reward : 570.0


 34%|███▎      | 1010/3001 [04:19<2:19:00,  4.19s/it]

Average agent reward : 680.0


 67%|██████▋   | 2007/3001 [06:43<1:12:12,  4.36s/it]

Average agent reward : 640.0


100%|██████████| 3001/3001 [09:07<00:00,  5.48it/s]

Average agent reward : 880.0





## Part 3 - Visualizing the results

In [11]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env):
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action[0])
  env.close()
  imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
  mp4list = glob.glob('*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    display(HTML(data='''<video alt="test" autoplay
            loop controls style="height: 400px;">
            <source src="data:video/mp4;base64,{0}" type="video/mp4" />
         </video>'''.format(encoded.decode('ascii'))))
  else:
    print("Could not find video")

show_video()

